当前位置: 首页 > 知识库问答 >
问题:

KeyedProcessFunction实现引发空指针异常?

萧繁
2023-03-14

第一个示例:“源代码”https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html“”

我正在尝试重写KeyedProcessFunction类的processElement()。ProcessElement有3个参数,其中一个参数是上下文对象。当我试图从上下文对象检索时间戳时,它抛出空指针异常。

在第一个示例代码中引发空指针异常的一行是

现在的lastModified=ctx。时间戳();

第二个示例:“使用Apache Flink进行流处理”一书的示例6.5。

我在扩展KeyedProcessFunction类的类中声明了两个ValueState变量。当我尝试检索状态中更新的最后一个值时,它返回一个空值。

在第一个示例代码中引发空指针异常的一行是

双prevTemp=lastTemp。值();如果(prevTemp==0.0 | | r.温度

第一个示例代码

public class KeyedProcessFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple2<String, String>> stream =
                environment.socketTextStream("localhost",9090)
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String s) throws Exception {
                                String[] words= s.split(",");

                                return new Tuple2<>(words[0],words[1]);
                            }
                        });

        DataStream<Tuple2<String, Long>> result = stream
                .keyBy(0)
                .process(new CountWithTimeoutFunction());

        result.print();

        environment.execute("Keyed Process Function Example");

    }
    public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }

        @Override
        public void processElement(
                Tuple2<String, String> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // retrieve the current count
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }

            // update the state's count
            current.count++;

            // set the state's timestamp to the record's assigned event time timestamp
            current.lastModified = ctx.timestamp();

            // write the state back
            state.update(current);

            // schedule the next timer 60 seconds from the current event time
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // get the state for the key that scheduled the timer
            CountWithTimestamp result = state.value();

            // check if this is an outdated timer or the latest timer
            if (timestamp == result.lastModified + 60000) {
                // emit the state on timeout
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }
}

class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

第二例

public class KeyedProcessFunctionTimerExample {
    public static void main(String[] args) throws Exception{
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // use event time for the application
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        DataStream<String> sensorData=
                env.addSource(new SensorSource())
                .keyBy(r -> r.id)
                .process(new TempIncreaseAlertFunction());

        sensorData.print();
        env.execute("Keyed Process Function execution");
    }

    public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {

        private ValueState<Double> lastTemp;
        private ValueState<Long> currentTimer;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
            currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
        }

        @Override
        public void processElement(
                SensorReading r,
                Context ctx,
                Collector<String> out) throws Exception {

            // get previous Temp
            Double prevTemp = lastTemp.value();

            // update last temp
            lastTemp.update(r.temperature);

            Long curTimerTimestamp = currentTimer.value();

            if(prevTemp==0.0 || r.temperature < prevTemp) {
                ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            }
            else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
                Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
                ctx.timerService().registerProcessingTimeTimer(timerTs);
                currentTimer.update(timerTs);

            }
        }

        @Override
        public void onTimer(
                long ts,
                OnTimerContext ctx,
                Collector<String> out) throws Exception {

            out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
            currentTimer.clear();

        }
    }

}

它不应引发空指针异常。我们将感谢您的帮助。谢谢

共有1个答案

毋玺
2023-03-14

在Flink中使用事件时间时,必须安排事件具有时间戳,并且流具有水印。您可以通过实现一个时间戳提取器和水印生成器来实现这一点,如下所述。

另请参见教程。

 类似资料:
  • 我正在尝试通过Maven运行cucumber+testng+selenium。 当我在Eclipse中通过TestNG测试运行它时,它工作得很好,但当我使用mvn测试时,它会抛出空指针异常 我是不是漏掉了什么?

  • 当我们尝试用Null值获取数据时 IN子句获得空指针异常。 也许是因为这个。 在数据库中,我们可以提供null in IN子句。 jooq中存在一个“无法修复”的问题https://github.com/jOOQ/jOOQ/issues/3867 有一些替代方案: 在输入前检查null(在我的情况下,这是一个非常大的select语句) 所以如果我想让这成为可能,还有其他的解决方法吗。 注:类似的情

  • 我正在Kura/osgi网站上的“Hello World Example”上开发我的第一个osgi包。 当我想按所述导出项目(导出-可部署插件和片段)时,我会得到以下结果: “导出插件”期间发生内部错误。java.lang.NullPointerException 不幸的是,没有更多的信息。 这似乎与这里描述和“解决”(?)相同,但我真的不知道该怎么做才能解决问题。我正在使用:Win 7(64),

  • 问题内容: 有可能这可能是一个双重问题。我将String变量初始化为null。我可能会或可能不会使用一个值更新它。现在我想检查此变量是否不等于null以及我尝试执行的操作是否会得到null指针异常。空指针异常,因为它代价高昂。是否有任何有效的解决方法.TIA 问题答案: 如果您使用 你 不会 得到。 我怀疑你在做什么: 这是因为null 而引发,而不是因为null。 如果仍然无法解释,请发布您用于

  • 我已经更新了我的项目中的一些依赖关系之后,我的Hibernate配置类显示Nullpointerx的。 我将SpringDataJPA存储库与hibernate一起使用,已经超过24小时了,仍然没有找到任何关于小问题的适当解决方案。 我已经尝试过的一些解决方案:- 使用@bean(name=“entityManagerFactory”)提供bean名称 我面临的问题 波姆。xml文件 配置类 db

  • 我正在研究leetcode问题编号876,其中表示: 给定一个非空的、带有头节点头的单链表,返回链表的一个中间节点。如果有两个中间节点,则返回第二个中间节点。 到目前为止,这就是我写的,但是它在时循环中抛出了一个空指针异常。我想既然时循环每次都在做任何事情之前检查node.next.next是否为空,它就不会抛出异常。我做错了什么?