我想加入来自 kafka producer 的两个流,但该连接不起作用。我使用 AssignerWithPeriodicWatermark 来定义我的分配器,我尝试使用 3 分钟的窗口连接两个流。但我没有得到任何输出。我打印了这两个流,以确保它们的事件在时间上足够接近。
object Job {
class Assigner extends AssignerWithPeriodicWatermarks[String] {
// 1 s in ms
val bound: Long = 1000
// the maximum observed timestamp
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: String, previousTS: Long): Long = {
maxTs = Math.max(maxTs,previousTS)
previousTS
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment//createLocalEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9093")
properties.setProperty("group.id", "test")
val consumerId = new FlinkKafkaConsumer[String]("topic_id", new SimpleStringSchema(), properties)
val streamId = env.addSource(consumerId).assignTimestampsAndWatermarks(new Assigner)
val streamIdParsed=streamId.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("m","w")) }
val consumerV = new FlinkKafkaConsumer[String]("topic_invoice", new SimpleStringSchema(), properties)
val streamV = env.addSource(consumerV).assignTimestampsAndWatermarks(new Assigner)
val streamVParsed = streamV.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("products")(0).toString().parseJson.asJsObject.getFields("id2", "id3")) }
streamIdParsed.join(streamVParsed).where(_._1).equalTo(_._1).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(1))).apply { (e1, e2) => (e1._1,"test") }.print()
} }
可能出错的事情(由你来检查,因为你提供的信息太少而无法缩小范围)
问题是您尚未设置autoWatermark Interval
并且您正在使用周期分配器
。您需要执行以下操作:
env.getConfig.setAutowatermarkInterval([someinterval])
这将解决未生成水印的问题。
我正在尝试使用Apache Flink流API加入两个流,但没有任何内容加入,并且在阅读文档后我不知道我做错了什么 关键功能是
我的应用程序中有三个片段,其中需要传递和接收数据。我应该如何进行他们之间的沟通。我试图参考许多网站,但没有解决方案。 请给我推荐一些好的链接。 提前感谢。
导入javax.swing.*; class Labels extensions JFrame{ JPanel pnl = new JPanel(); } 如果我想将其用作JApplet怎么办?必须做什么?很难更改吗? JFrame上运行的东西和JApplet上的东西是一样的吗?
问题内容: 我们正在使用BigQuery Python API进行一些分析。为此,我们创建了以下适配器: 哪里是构建对象。 其主要目的是将数据流式传输到给定的表。如果该表已经存在并且“如何”输入作为“ WRITE_TRUNCATE”传递,则该表首先被删除并再次创建。之后,继续进行数据流。 当不一次又一次删除表时,一切工作正常。 举例来说,这是结果,当我们没有模拟写截断选项(一运行该脚本循环不断给你
问题内容: 是否有一个跟踪用户某些事件的表。 总是有一个动作,之后可能会有一个动作。 现在,我想查询这两个动作之间的时间差,以获取用户和之间的time_diff 。 现在,您可以假定没有多个条目(例如,至少一个,最大另一个)。 我想要这样的结果: 问题答案: 您可以使用以下查询: 该子句过滤掉仅包含一个动作的组,例如OP中的with记录。 演示在这里
我正在尝试将FBO的深度纹理和颜色纹理链接到GLSL着色器(版本4.0) 问题是,同时只有一个链接,这很奇怪,因为其他纹理可以很好地链接在一起(例如:漫反射贴图、法线贴图和镜面反射贴图) 以下是我的绑定RT代码: 我真的不知道这里怎么了。。。