当前位置: 首页 > 知识库问答 >
问题:

KeyedProcessFunction实现引发空指针异常?

萧繁
2023-03-14

第一个示例:“源代码”https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html“”

我正在尝试重写KeyedProcessFunction类的processElement()。ProcessElement有3个参数,其中一个参数是上下文对象。当我试图从上下文对象检索时间戳时,它抛出空指针异常。

在第一个示例代码中引发空指针异常的一行是

现在的lastModified=ctx。时间戳();

第二个示例:“使用Apache Flink进行流处理”一书的示例6.5。

我在扩展KeyedProcessFunction类的类中声明了两个ValueState变量。当我尝试检索状态中更新的最后一个值时,它返回一个空值。

在第一个示例代码中引发空指针异常的一行是

双prevTemp=lastTemp。值();如果(prevTemp==0.0 | | r.温度

第一个示例代码

public class KeyedProcessFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple2<String, String>> stream =
                environment.socketTextStream("localhost",9090)
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String s) throws Exception {
                                String[] words= s.split(",");

                                return new Tuple2<>(words[0],words[1]);
                            }
                        });

        DataStream<Tuple2<String, Long>> result = stream
                .keyBy(0)
                .process(new CountWithTimeoutFunction());

        result.print();

        environment.execute("Keyed Process Function Example");

    }
    public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }

        @Override
        public void processElement(
                Tuple2<String, String> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // retrieve the current count
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }

            // update the state's count
            current.count++;

            // set the state's timestamp to the record's assigned event time timestamp
            current.lastModified = ctx.timestamp();

            // write the state back
            state.update(current);

            // schedule the next timer 60 seconds from the current event time
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // get the state for the key that scheduled the timer
            CountWithTimestamp result = state.value();

            // check if this is an outdated timer or the latest timer
            if (timestamp == result.lastModified + 60000) {
                // emit the state on timeout
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }
}

class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

第二例

public class KeyedProcessFunctionTimerExample {
    public static void main(String[] args) throws Exception{
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // use event time for the application
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        DataStream<String> sensorData=
                env.addSource(new SensorSource())
                .keyBy(r -> r.id)
                .process(new TempIncreaseAlertFunction());

        sensorData.print();
        env.execute("Keyed Process Function execution");
    }

    public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {

        private ValueState<Double> lastTemp;
        private ValueState<Long> currentTimer;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
            currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
        }

        @Override
        public void processElement(
                SensorReading r,
                Context ctx,
                Collector<String> out) throws Exception {

            // get previous Temp
            Double prevTemp = lastTemp.value();

            // update last temp
            lastTemp.update(r.temperature);

            Long curTimerTimestamp = currentTimer.value();

            if(prevTemp==0.0 || r.temperature < prevTemp) {
                ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            }
            else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
                Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
                ctx.timerService().registerProcessingTimeTimer(timerTs);
                currentTimer.update(timerTs);

            }
        }

        @Override
        public void onTimer(
                long ts,
                OnTimerContext ctx,
                Collector<String> out) throws Exception {

            out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
            currentTimer.clear();

        }
    }

}

它不应引发空指针异常。我们将感谢您的帮助。谢谢

共有1个答案

毋玺
2023-03-14

在Flink中使用事件时间时,必须安排事件具有时间戳,并且流具有水印。您可以通过实现一个时间戳提取器和水印生成器来实现这一点,如下所述。

另请参见教程。

 类似资料:
  • 我正在尝试通过Maven运行cucumber+testng+selenium。 当我在Eclipse中通过TestNG测试运行它时,它工作得很好,但当我使用mvn测试时,它会抛出空指针异常 我是不是漏掉了什么?

  • 当我们尝试用Null值获取数据时 IN子句获得空指针异常。 也许是因为这个。 在数据库中,我们可以提供null in IN子句。 jooq中存在一个“无法修复”的问题https://github.com/jOOQ/jOOQ/issues/3867 有一些替代方案: 在输入前检查null(在我的情况下,这是一个非常大的select语句) 所以如果我想让这成为可能,还有其他的解决方法吗。 注:类似的情

  • 我正在Kura/osgi网站上的“Hello World Example”上开发我的第一个osgi包。 当我想按所述导出项目(导出-可部署插件和片段)时,我会得到以下结果: “导出插件”期间发生内部错误。java.lang.NullPointerException 不幸的是,没有更多的信息。 这似乎与这里描述和“解决”(?)相同,但我真的不知道该怎么做才能解决问题。我正在使用:Win 7(64),

  • 我正在研究leetcode问题编号876,其中表示: 给定一个非空的、带有头节点头的单链表,返回链表的一个中间节点。如果有两个中间节点,则返回第二个中间节点。 到目前为止,这就是我写的,但是它在时循环中抛出了一个空指针异常。我想既然时循环每次都在做任何事情之前检查node.next.next是否为空,它就不会抛出异常。我做错了什么?

  • 让我们考虑一个<代码>父< /代码>类,它只包含一个<代码>整数< /代码>属性。我用一个空变量创建了6个父类对象。然后我将这些对象添加到列表中。 我想通过属性的值检索相应的对象。我使用了Java8流。 但是我得到了,所以我编辑了代码: 但是如果任何对象为null,我想抛出一个异常。如果列表中没有对象为null,那么我想从列表中检索相应的对象。 如何使用Java 8 Streams使用一条语句实现

  • 问题内容: 有可能这可能是一个双重问题。我将String变量初始化为null。我可能会或可能不会使用一个值更新它。现在我想检查此变量是否不等于null以及我尝试执行的操作是否会得到null指针异常。空指针异常,因为它代价高昂。是否有任何有效的解决方法.TIA 问题答案: 如果您使用 你 不会 得到。 我怀疑你在做什么: 这是因为null 而引发,而不是因为null。 如果仍然无法解释,请发布您用于