我不知道当您从Apache Kafka中摄取数据时,水印应该如何工作。
以下格式的事件:
hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...
Topickafka topics--引导服务器本地主机:9092--Topic testerino--分区1--复制因子1--创建
Kafka版本3.0.0,Flink 1.14.2
提前感谢
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("testerino")
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
System.out.println(record);
out.collect(new String(record.value()));
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
DataStreamSource<String> streamSource = env.fromSource(
stringKafkaSource,
WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return Long.parseLong(event.split(",")[1]);
}),
"source"
);
streamSource
.keyBy(k -> k.split(",")[0])
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new Trigger<String, TimeWindow>() {
@Override
public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
element, timestamp, window, ctx.getCurrentWatermark());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("proccesing time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("event time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("clear");
}
})
.process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
System.out.println("watermark: " + context.currentWatermark());
out.collect(s);
}
});
Flink从不自动提供水印,但KafkaSource确实从Kafka头中获取时间戳,并使用它来设置它生成的流记录的时间戳。这是传递给时间戳赋值器的时间戳。
我相信https://stackoverflow.com/a/70101290/2000823解释了为什么你没有得到任何结果。
我在连接数据源时遇到问题。 此错误显示API在API中运行此函数时: 2019-08-20 23:30:04.672错误20424---[nio-8080-exec-4]O.A.C.C.C.[.[.[/].[dispatcherServlet]:路径为[]的上下文中servlet[dispatcherServlet]的servlet.Service()引发异常[处理程序调度失败;嵌套异常为java
问题内容: 我想从数据库生成JPA (但我希望它是面向对象的)。例如 如果它还支持ManyToOne,OneToMany,Parent和ManyToMany,那将很酷。 PS我尝试了JBoss工具(hibernate工具),但没有为我工作。 问题答案: 使用JBoss工具(以前是hibernate工具)。 从他们的网站报价: 逆向工程:Hibernate Tools最强大的功能是数据库逆向工程工具
我正在寻找一种从数据库值生成类的方法,以便使用可用的最新值提供更新的生成器类。用例如下: 有一张像这样的桌子 我希望生成一个可以像这样使用的生成器类 目标是能够在将值添加到数据库表时(使用Liquibase)生成更新的生成器类。有人知道有哪些库可以帮助实现这一点,或者有什么从头开始实现这一点的策略吗?我们的项目大多是Java 11,带有Groovy和Spring Boot。
所以我必须检索存储在HDFS中的文件的内容,并对其进行某些分析。 问题是,我甚至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是Flink的新手,这只是一个测试,以确保我正确读取了文件) HDFS中的文件是纯文本文件。这是我的密码: 在我运行/tmp之后,它没有输出。 这是一个非常简单的代码,我不确定它是否有问题,或者我只是做了一些别的错误。正如我所说,我对Flink完全是新手 此
本文向大家介绍python从Oracle读取数据生成图表,包括了python从Oracle读取数据生成图表的使用技巧和注意事项,需要的朋友参考一下 初次学习python,连接Oracle数据库,导出数据到Excel,再从Excel里面读取数据进行绘图,生成png保存出来。 1、涉及到的python模块(模块安装就不进行解释了): 2、连接数据库 oracle客户端要根据自己python对应的版本进
问题内容: 如何使用Math.random生成随机整数? 我的代码是: 它显示的全部是0,我该如何解决? 问题答案: 将abc转换为整数。