当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Flink DataStream API输出事件对的流?

韩楷
2023-03-14

鞋子(事件)被定义为它的颜色并且是Left(如果鞋子是左腿的,那么是Left=true,否则是的)。

Tuple2<String, Boolean> leftBlueShoe  = Tuple2.of("blue", true);
Tuple2<String, Boolean> rightBlueShoe = Tuple2.of("blue", false);
// unbounded stream of shoes is as follows
DataStream<Tuple2<String, Boolean>> streamOfShoes = ... 
// somthing like - env.fromElements(leftBlueShoe, rightRedShoe, leftGreenShoe, rightBlueShoe, ...);

如何形成一双相同颜色的鞋,并期望匹配的鞋立即发出,不匹配的鞋等待其配对,直到窗口结束。

DataStream<Tuple5<String, Boolean, String, Boolean, String>> shoePairs = ...
// few events from shoePairs stream:
Tuple5<> shoePair   = Tuple5.of("blue", true, "blue", false, "pairFound");
Tuple5<> notShoePair= Tuple5.of("red", true, "red", false, "pairNotFound"); // Even if pair not found in window we tagged and kept in stream

尝试的方法(忽略此项以避免混淆):

>

  • 通过将流拆分为左和右,并在连接上加窗(会产生成本吗?)

    >

  • TumblingWindowJoin:窗口函数不能处理不匹配的对。窗户开过后,那只鞋就丢了。

    CoGroupFunction:窗口不会为最后一个事件触发。完整代码

    低级连接:即 CoProcessFunction()。。不确定它是否会有所帮助?

    使用 TumblingProcessingTimeWindowapply() 中的自定义连接逻辑在同一流上开窗。即使所有事件都已配对,此窗口也不会立即触发。

  • 共有1个答案

    贺正祥
    2023-03-14

    Flink训练中的一个练习是关于寻找事件对;精神上和你要求的差不多。参见乘车和票价练习,该练习使用< code > RichCoFlatMapFunction 进行配对。

    那里的解决方案假设完美配对总是可能的,所以它没有处理不匹配配对的情况。但是你可以在这里找到一个更进一步的变体。此示例使用< code >协处理函数中的计时器来检测不匹配的对。

    其他要点:

    将流分成左子流和右子流应该具有可以忽略的成本。

    我认为,cogroup函数应该可以工作。如果您尝试了此操作,但似乎不起作用,则可能是您使用了事件时间窗口,最终水印丢失,从而阻止了窗口关闭。

    更新:

    查看了您的代码后,我发现实现中存在一个问题。时间戳提取器使用的是系统时钟,而不是事件中的时间戳。这将为您提供类似于(但比)使用处理时间的东西。我之所以说“比处理时间还要糟糕”,是因为您允许事件无序,这会增加延迟,并且它会阻止窗口关闭,直到事件到达窗口的endpoint。这意味着最后一个窗口永远不会被触发。

    作为测试,请尝试将时间特征切换到处理时间,删除 assignTimestampsAndWatermarks,并查看 CoGroupFunction 是否正常工作。你也可以使用摄取时间,只要你删除你的水印并让 Flink 处理它(处理时间水印无关紧要;对于摄取时间,Flink 会为你做水印,除非你覆盖它)。

    如果您想在应用程序中使用事件时间,请在测试中使用有限的源。当有限的源(例如从文件或集合中读取)到达其输入的结尾时,它们通过作业发送一个非常大的水印,这将关闭所有打开的窗口。

     类似资料:
    • 我是泽西岛使用SSE的新手,有以下情况。 我有一个JAXB注释类,它表示Raspberry Pi的I/O并对其起作用(类GpioSymation)。 客户端类通过返回类的XML对象表示的方法getUpdate()访问I/O的状态。 使用getUpdate()的客户端是HomeResource类,方法getPiStatusStream()。这是一个JAX-RS注释方法,并提供远程客户端服务器发送的事

    • 在我看来,Flink以三种方式处理后期事件: 窗口过期时删除延迟事件(默认)。 通过使用“允许延迟”机制包含延迟事件来更新窗口。 使用“侧输出”机制将延迟事件重定向到另一个DataStream。 让我们假设我有一个事件时间作业,它使用来自Kafka的数据,并每5分钟处理一个窗口。现在,假设我将延迟事件重定向到另一个数据流中。 这个新的数据流是独立的吗 谢谢大家!

    • 输出。 我想写的输出文件与inputfile完全相似,但顺序不是问题。我怎样才能做到,请帮帮我。

    • 当用户在输入字段中键入内容时,我试图消除onChange事件的影响。 我引用这些线程: 在React.js中执行去盎司 使用取消公告的onChange处理程序设置输入值 我在下面的代码片段中尝试复制上述线程中提供的解决方案: 错误: 错误:处理程序不是一个函数 当用户在输入字段中键入时,如何对的进行去抖动?

    • 问题内容: 我在Go中有一个REPL应用,该应用应该对键盘按下事件(每个按下的按键的动作有所不同)做出反应,但是希望在阅读之前按下返回键: 我如何对Go中的按键事件做出反应? 问题答案: 游戏引擎通常实现这种功能。它们通常也与平台无关(通常至少是Windows,Linux,Mac OS X)。例如尝试Azul3D的键盘库。 逻辑就像我的头上一样 要获得当前当前按下的键的列表,只需遍历地图并列出va

    • 我正在尝试从事件中心读取数据,但结果它只返回空值。 我将一个数据帧转换为json以发送到eventhub 这是数据的模式 我正在尝试使用从eventhub读取数据 从 pyspark.sql.类型导入数组类型, 双类型, 结构类型, 结构字段, 字符串类型, 长类型, 布尔类型