第一个示例:“源代码”https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html“”
我正在尝试重写KeyedProcessFunction类的processElement()。ProcessElement有3个参数,其中一个参数是上下文对象。当我试图从上下文对象检索时间戳时,它抛出空指针异常。
在第一个示例代码中引发空指针异常的一行是
现在的lastModified=ctx。时间戳();
第二个示例:“使用Apache Flink进行流处理”一书的示例6.5。
我在扩展KeyedProcessFunction类的类中声明了两个ValueState变量。当我尝试检索状态中更新的最后一个值时,它返回一个空值。
在第一个示例代码中引发空指针异常的一行是
双prevTemp=lastTemp。值();如果(prevTemp==0.0 | | r.温度
第一个示例代码
public class KeyedProcessFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<String, String>> stream =
environment.socketTextStream("localhost",9090)
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] words= s.split(",");
return new Tuple2<>(words[0],words[1]);
}
});
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());
result.print();
environment.execute("Keyed Process Function Example");
}
public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<CountWithTimestamp> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
@Override
public void processElement(
Tuple2<String, String> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
}
class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
第二例
public class KeyedProcessFunctionTimerExample {
public static void main(String[] args) throws Exception{
// set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<String> sensorData=
env.addSource(new SensorSource())
.keyBy(r -> r.id)
.process(new TempIncreaseAlertFunction());
sensorData.print();
env.execute("Keyed Process Function execution");
}
public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {
private ValueState<Double> lastTemp;
private ValueState<Long> currentTimer;
@Override
public void open(Configuration parameters) throws Exception {
lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
}
@Override
public void processElement(
SensorReading r,
Context ctx,
Collector<String> out) throws Exception {
// get previous Temp
Double prevTemp = lastTemp.value();
// update last temp
lastTemp.update(r.temperature);
Long curTimerTimestamp = currentTimer.value();
if(prevTemp==0.0 || r.temperature < prevTemp) {
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
currentTimer.clear();
}
else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
ctx.timerService().registerProcessingTimeTimer(timerTs);
currentTimer.update(timerTs);
}
}
@Override
public void onTimer(
long ts,
OnTimerContext ctx,
Collector<String> out) throws Exception {
out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
currentTimer.clear();
}
}
}
它不应引发空指针异常。我们将感谢您的帮助。谢谢
在Flink中使用事件时间时,必须安排事件具有时间戳,并且流具有水印。您可以通过实现一个时间戳提取器和水印生成器来实现这一点,如下所述。
另请参见教程。
我正在尝试通过Maven运行cucumber+testng+selenium。 当我在Eclipse中通过TestNG测试运行它时,它工作得很好,但当我使用mvn测试时,它会抛出空指针异常 我是不是漏掉了什么?
当我们尝试用Null值获取数据时 IN子句获得空指针异常。 也许是因为这个。 在数据库中,我们可以提供null in IN子句。 jooq中存在一个“无法修复”的问题https://github.com/jOOQ/jOOQ/issues/3867 有一些替代方案: 在输入前检查null(在我的情况下,这是一个非常大的select语句) 所以如果我想让这成为可能,还有其他的解决方法吗。 注:类似的情
我正在Kura/osgi网站上的“Hello World Example”上开发我的第一个osgi包。 当我想按所述导出项目(导出-可部署插件和片段)时,我会得到以下结果: “导出插件”期间发生内部错误。java.lang.NullPointerException 不幸的是,没有更多的信息。 这似乎与这里描述和“解决”(?)相同,但我真的不知道该怎么做才能解决问题。我正在使用:Win 7(64),
我正在研究leetcode问题编号876,其中表示: 给定一个非空的、带有头节点头的单链表,返回链表的一个中间节点。如果有两个中间节点,则返回第二个中间节点。 到目前为止,这就是我写的,但是它在时循环中抛出了一个空指针异常。我想既然时循环每次都在做任何事情之前检查node.next.next是否为空,它就不会抛出异常。我做错了什么?
让我们考虑一个<代码>父< /代码>类,它只包含一个<代码>整数< /代码>属性。我用一个空变量创建了6个父类对象。然后我将这些对象添加到列表中。 我想通过属性的值检索相应的对象。我使用了Java8流。 但是我得到了,所以我编辑了代码: 但是如果任何对象为null,我想抛出一个异常。如果列表中没有对象为null,那么我想从列表中检索相应的对象。 如何使用Java 8 Streams使用一条语句实现
问题内容: 有可能这可能是一个双重问题。我将String变量初始化为null。我可能会或可能不会使用一个值更新它。现在我想检查此变量是否不等于null以及我尝试执行的操作是否会得到null指针异常。空指针异常,因为它代价高昂。是否有任何有效的解决方法.TIA 问题答案: 如果您使用 你 不会 得到。 我怀疑你在做什么: 这是因为null 而引发,而不是因为null。 如果仍然无法解释,请发布您用于