我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码
val articleEventStream: DataStream[ArticleEvent] = env.addSource(articleEventSource)
.assignTimestampsAndWatermarks(new ArticleEventAssigner)
val feedbackEventStream: DataStream[FeedbackEvent] = env.addSource(feedbackEventSource)
.assignTimestampsAndWatermarks(new FeedbackEventAssigner)
articleEventStream
.keyBy(article => article.id)
.intervalJoin(feedbackEventStream.keyBy(feedback => feedback.article.id))
.between(Time.seconds(-5), Time.seconds(10))
.process(new ProcessJoinFunction[ArticleEvent, FeedbackEvent, String] {
override def processElement(left: ArticleEvent, right: FeedbackEvent, ctx: ProcessJoinFunction[ArticleEvent, FeedbackEvent, String]#Context, out: Collector[String]): Unit = {
out.collect(left.name + " got feedback: " + right.feedback);
}
});
});
class ArticleEventAssigner extends AssignerWithPunctuatedWatermarks[ArticleEvent] {
val bound: Long = 5 * 1000
override def checkAndGetNextWatermark(lastElement: ArticleEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - bound)
}
override def extractTimestamp(element: ArticleEvent, previousElementTimestamp: Long): Long = {
element.occurredAt
}
}
class FeedbackEventAssigner extends AssignerWithPunctuatedWatermarks[FeedbackEvent] {
val bound: Long = 5 * 1000
override def checkAndGetNextWatermark(lastElement: FeedbackEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - bound)
}
override def extractTimestamp(element: FeedbackEvent, previousElementTimestamp: Long): Long = {
element.occurredAt
}
}
但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?
在检查了不同的部分(时间戳/水印、触发器)之后,我注意到我犯了一个错误,即我使用的窗口大小
之间(Time.seconds(-5),Time.seconds(10))
只是太小了,无法从两个流中找到要连接的元素。这听起来可能很明显,但是因为我是Flink的新手,我不知道去哪里检查。所以,我的教训是,如果连接不输出,可能有必要检查窗口大小。感谢所有的评论!
来自Flink的官方文件: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-join 示例代码是: 从上面的代码中,我想知道如何指定执行此间隔连接的开始时间(例如,从今天开始)(开始时间之前的数据将不考虑在内)。 例如,我已经运行了3天的程
我在两个无界流之间有一个简单的间隔连接。这适用于较小的工作负载,但对于较大的(正式生产环境),它不再有效。通过观察输出,我可以看到FlinkSQL作业仅在扫描整个主题(并因此读入内存?)后才触发/发出记录,但我希望作业在找到ingle匹配后立即触发记录。因为在我的正式生产环境中,作业无法承受将整个表读入内存。 我正在做的间隔连接与这里提供的示例非常相似:https://github.com/ver
我试图通过在Local上连接两个数据流来运行Flink上的基本连接。源流的数据类型是相同的(Tuple4(String,String,Long,Long))。在多次运行下面提到的函数后,我随机收到了两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到了相同的调试日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。 源函数和其他定义都来自本教程。还从Flink官方文件
在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。
我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。
我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id