当前位置: 首页 > 知识库问答 >
问题:

两个流之间的 Scala Flink Join 不起作用

郑承恩
2023-03-14

我想加入来自 kafka producer 的两个流,但该连接不起作用。我使用 AssignerWithPeriodicWatermark 来定义我的分配器,我尝试使用 3 分钟的窗口连接两个流。但我没有得到任何输出。我打印了这两个流,以确保它们的事件在时间上足够接近。

object Job {

class Assigner extends AssignerWithPeriodicWatermarks[String] {
  // 1 s in ms
  val bound: Long = 1000
  // the maximum observed timestamp
  var maxTs: Long = Long.MinValue

  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
   }

  override def extractTimestamp(r: String, previousTS: Long): Long = {    
    maxTs = Math.max(maxTs,previousTS)    
    previousTS
   }
}

 def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment//createLocalEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9093")

properties.setProperty("group.id", "test")
val consumerId = new FlinkKafkaConsumer[String]("topic_id", new SimpleStringSchema(), properties)

val streamId = env.addSource(consumerId).assignTimestampsAndWatermarks(new Assigner)

val streamIdParsed=streamId.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("m","w")) }


val consumerV = new FlinkKafkaConsumer[String]("topic_invoice", new SimpleStringSchema(), properties)


val streamV = env.addSource(consumerV).assignTimestampsAndWatermarks(new Assigner)

val streamVParsed = streamV.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("products")(0).toString().parseJson.asJsObject.getFields("id2", "id3")) }


    streamIdParsed.join(streamVParsed).where(_._1).equalTo(_._1).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(1))).apply { (e1, e2) => (e1._1,"test") }.print()
} }

共有2个答案

步胜
2023-03-14

可能出错的事情(由你来检查,因为你提供的信息太少而无法缩小范围)

    < li >两个主题都没有关于Kafka的事件 < li >两个主题都没有水印进度 < li >您的数据按分钟计算,代码按秒计算
刘高峯
2023-03-14

问题是您尚未设置autoWatermark Interval并且您正在使用周期分配器。您需要执行以下操作:

env.getConfig.setAutowatermarkInterval([someinterval])

这将解决未生成水印的问题。

 类似资料:
  • 我正在尝试使用Apache Flink流API加入两个流,但没有任何内容加入,并且在阅读文档后我不知道我做错了什么 关键功能是

  • 我的应用程序中有三个片段,其中需要传递和接收数据。我应该如何进行他们之间的沟通。我试图参考许多网站,但没有解决方案。 请给我推荐一些好的链接。 提前感谢。

  • 导入javax.swing.*; class Labels extensions JFrame{ JPanel pnl = new JPanel(); } 如果我想将其用作JApplet怎么办?必须做什么?很难更改吗? JFrame上运行的东西和JApplet上的东西是一样的吗?

  • 问题内容: 我们正在使用BigQuery Python API进行一些分析。为此,我们创建了以下适配器: 哪里是构建对象。 其主要目的是将数据流式传输到给定的表。如果该表已经存在并且“如何”输入作为“ WRITE_TRUNCATE”传递,则该表首先被删除并再次创建。之后,继续进行数据流。 当不一次又一次删除表时,一切工作正常。 举例来说,这是结果,当我们没有模拟写截断选项(一运行该脚本循环不断给你

  • 问题内容: 是否有一个跟踪用户某些事件的表。 总是有一个动作,之后可能会有一个动作。 现在,我想查询这两个动作之间的时间差,以获取用户和之间的time_diff 。 现在,您可以假定没有多个条目(例如,至少一个,最大另一个)。 我想要这样的结果: 问题答案: 您可以使用以下查询: 该子句过滤掉仅包含一个动作的组,例如OP中的with记录。 演示在这里

  • 我正在尝试将FBO的深度纹理和颜色纹理链接到GLSL着色器(版本4.0) 问题是,同时只有一个链接,这很奇怪,因为其他纹理可以很好地链接在一起(例如:漫反射贴图、法线贴图和镜面反射贴图) 以下是我的绑定RT代码: 我真的不知道这里怎么了。。。