当前位置: 首页 > 知识库问答 >
问题:

Flink间隔连接不输出

岳承悦
2023-03-14

我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码

val articleEventStream: DataStream[ArticleEvent] = env.addSource(articleEventSource)
  .assignTimestampsAndWatermarks(new ArticleEventAssigner) 
val feedbackEventStream: DataStream[FeedbackEvent] = env.addSource(feedbackEventSource)
  .assignTimestampsAndWatermarks(new FeedbackEventAssigner) 

articleEventStream
    .keyBy(article => article.id)
    .intervalJoin(feedbackEventStream.keyBy(feedback => feedback.article.id))
    .between(Time.seconds(-5), Time.seconds(10))
    .process(new ProcessJoinFunction[ArticleEvent, FeedbackEvent, String] {
        override def processElement(left: ArticleEvent, right: FeedbackEvent, ctx: ProcessJoinFunction[ArticleEvent, FeedbackEvent, String]#Context, out: Collector[String]): Unit = {
         out.collect(left.name + " got feedback: " + right.feedback); 
        }
      });
});

class ArticleEventAssigner extends AssignerWithPunctuatedWatermarks[ArticleEvent] {
  val bound: Long = 5 * 1000

  override def checkAndGetNextWatermark(lastElement: ArticleEvent, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp - bound)
  }

  override def extractTimestamp(element: ArticleEvent, previousElementTimestamp: Long): Long = {
    element.occurredAt
  }
}

class FeedbackEventAssigner extends AssignerWithPunctuatedWatermarks[FeedbackEvent] {
  val bound: Long = 5 * 1000

  override def checkAndGetNextWatermark(lastElement: FeedbackEvent, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp - bound)
  }

  override def extractTimestamp(element: FeedbackEvent, previousElementTimestamp: Long): Long = {
    element.occurredAt
  }
}

但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?

共有1个答案

陈开宇
2023-03-14

在检查了不同的部分(时间戳/水印、触发器)之后,我注意到我犯了一个错误,即我使用的窗口大小

之间(Time.seconds(-5),Time.seconds(10))

只是太小了,无法从两个流中找到要连接的元素。这听起来可能很明显,但是因为我是Flink的新手,我不知道去哪里检查。所以,我的教训是,如果连接不输出,可能有必要检查窗口大小。感谢所有的评论!

 类似资料:
  • 来自Flink的官方文件: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-join 示例代码是: 从上面的代码中,我想知道如何指定执行此间隔连接的开始时间(例如,从今天开始)(开始时间之前的数据将不考虑在内)。 例如,我已经运行了3天的程

  • 我在两个无界流之间有一个简单的间隔连接。这适用于较小的工作负载,但对于较大的(正式生产环境),它不再有效。通过观察输出,我可以看到FlinkSQL作业仅在扫描整个主题(并因此读入内存?)后才触发/发出记录,但我希望作业在找到ingle匹配后立即触发记录。因为在我的正式生产环境中,作业无法承受将整个表读入内存。 我正在做的间隔连接与这里提供的示例非常相似:https://github.com/ver

  • 我试图通过在Local上连接两个数据流来运行Flink上的基本连接。源流的数据类型是相同的(Tuple4(String,String,Long,Long))。在多次运行下面提到的函数后,我随机收到了两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到了相同的调试日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。 源函数和其他定义都来自本教程。还从Flink官方文件

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

  • 我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。

  • 我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id