当前位置: 首页 > 知识库问答 >
问题:

使用Flink使用DateStreamSource分配水印的正确方法

程峻
2023-03-14

我有一个为Kafka主题生成的持续JSONArray数据,我想处理具有EventTime特性的记录。为了达到这个目标,我必须为JSONArray中包含的每个记录分配水印。

我没有找到一种方便的方法来实现这个目标。我的解决方案是消耗来自DataStreamSource的数据

主要代码如下所示:

<代码>DataStreamSource

毫无疑问,代码似乎没有问题,运行时也没有错误。但ProcessWindowFunction从未触发。我跟踪了Flink源代码,发现EventTimeTrigger从不返回TriggerResult。火灾,由环境引发。getCurrentWatermark返回Long。始终保持最小值。

在事件发生时处理列表的正确方法是什么?任何建议都将不胜感激。

共有1个答案

晏望
2023-03-14

问题是您正在将keyBy和窗口操作应用于转换ToPojo流,而不是带有时间戳和水印的流(您没有将其分配给变量)。

如果您或多或少是这样编写代码的,那么它应该可以工作:

listDataStreamSource = KafkaSource ...
convertToPojo = listDataStreamSource.process ...
pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
countStream = pojoPlusWatermarks.keyBy ...

在convertToPojo流上调用assignTimestampsAndWatermarks不会修改该流,而是创建一个包含时间戳和水印的新datastream对象。您需要将窗口应用于新的数据流。

 类似资料:
  • 我有两个流,流A和流B。两个流都包含具有ID和时间戳的相同类型的事件。现在,我希望闪烁作业所要做的就是在1分钟的窗口内加入具有相同ID的事件。水印是在事件上分配的。 在我的测试中,我尝试发送以下内容: 对于并行度=1,我可以看到从时间0开始的事件连接在一起。 但是,对于parallelism=2,打印不会显示任何正在加入的内容。为了解决这个问题,我尝试在每个流的keyBy之后打印事件,我可以看到它

  • 在Flink中,我发现了2种设置水印的方法, 第一个是 第二个是 我想知道哪个最终会生效。

  • 我看到关于为每个密钥添加水印支持的讨论很多。但是flink支持每个分区的水印吗? 当前-然后考虑所有水印(非空闲分区)的最小值。因此,窗口中最后挂起的记录也被卡住了。(使用periodicemit增加水印时) 任何关于这方面的信息都非常感谢!

  • 问题内容: 这个想法是使用更少的连接和更好的性能。连接是否随时终止? 对于另一个问题,是否打开新连接? 问题答案: 不,多路复用器不会过期。没有GetDatabase不会打开新连接。basics.md涵盖了所有内容 -特别是: 从GetDatabase返回的对象是便宜的直通对象,不需要存储。

  • 我有一个使用jquery mobile的应用程序,它由几个html页面组成,每个页面中都有几个jquery页面元素。在桌面浏览器上,一切正常,但当我把它加载到我的android设备(运行2.3)上时,第一个页面看起来很好,但只要你点击一个链接(比如从index.html)- 那么,是否有正确的方法在不同的html页面之间移动呢?我没有得到任何浏览器错误,所以一切似乎都工作正常,但没有jqm的样式或

  • 问题内容: 我正在尝试对表单中的某些字段使用get_or_create,但尝试这样做时却出现500错误。 其中一行如下所示: 对于以上代码,我得到的错误是: 问题答案: 从文档get_or_create中: 说明: 要评估相似性的字段必须在外部提及。其余字段必须包含在中。如果发生CREATE事件,则会考虑所有字段。 看起来你需要返回一个元组,而不是单个变量,请执行以下操作: