当前位置: 首页 > 知识库问答 >
问题:

在Flink中使用joinFunction时增加水印的正确方法是什么?

贡正诚
2023-03-14

我有两个流,流A和流B。两个流都包含具有ID和时间戳的相同类型的事件。现在,我希望闪烁作业所要做的就是在1分钟的窗口内加入具有相同ID的事件。水印是在事件上分配的。

sourceA = initialSourceA.map(parseToEvent)
sourceB = initialSourceB.map(parseToEvent)

streamA = sourceA
                .assignTimestampsAndWatermarks(CustomWatermarkStrategy())
                .keyBy(Event.Key)

streamB = sourceB
                .assignTimestampsAndWatermarks(CustomWatermarkStrategy())
                .keyBy(Event.Key)


streamA
                .join(streamB)
                .where(Event.Key)
                .equalTo(Event.Key)
                .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MINUTES)))
                .apply(giveMePairOfEvents)
                .print()

在我的测试中,我尝试发送以下内容:

sourceA.send(Event(ID_1, 0 seconds))
sourceB.send(Event(ID_1, 0 seconds))

//to increase the watermark
sourceA.send(Event(ID_1, 62 seconds))
sourceB.send(Event(ID_1, 62 seconds)) 

对于并行度=1,我可以看到从时间0开始的事件连接在一起。

但是,对于parallelism=2,打印不会显示任何正在加入的内容。为了解决这个问题,我尝试在每个流的keyBy之后打印事件,我可以看到它们都在同一个实例上运行。将打印放在水印之后,原因很明显,事件当前位于不同的实例上。

这让我相信,当涉及到水印时,我在某种程度上做得不对,因为对于高于1的并行度,它不会增加水印。下面是我问自己的几个问题:

  • 是否可能每个事件都有一个单独的水印生成器,并且我必须专门增加它们?
  • 我是否先运行keyBy然后再运行水印,以便我来自每个流的事件使用相同的水印生成器?

发送另一组事件,如下所示:

sourceA.send(Event(ID_1, 0 seconds))
sourceB.send(Event(ID_1, 0 seconds))

//to increase the watermark
sourceA.send(Event(ID_1, 62 seconds))
sourceB.send(Event(ID_1, 62 seconds)) 

sourceA.send(Event(ID_1, 122 seconds))
sourceB.send(Event(ID_1, 122 seconds))

最终发送了连接的第一个事件。进一步的检查表明,第三组事件使用了第二组没有使用的相同水印生成器。我不太清楚为什么会发生这种情况。在Flink中使用连接函数时,如何正确分配和增加水印?

编辑1:

自定义水印生成器:

class CustomWaterMarkGenerator(
        private val maxOutOfOrderness: Long,
        private var currentMaxTimeStamp: Long = 0,
)
    : WatermarkGenerator<EventType> {
    override fun onEvent(event: EventType, eventTimestamp: Long, output: WatermarkOutput) {
        val a = currentMaxTimeStamp.coerceAtLeast(eventTimestamp)
        currentMaxTimeStamp = a
        output.emitWatermark(Watermark(currentMaxTimeStamp - maxOutOfOrderness - 1));
    }

    override fun onPeriodicEmit(output: WatermarkOutput?) {
    }
}

水印策略:


class CustomWatermarkStrategy(
): WatermarkStrategy<Event> {
    override fun createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context?): WatermarkGenerator<Event> {
        return CustomWaterMarkGenerator(0)
    }

    override fun createTimestampAssigner(context: TimestampAssignerSupplier.Context?): TimestampAssigner<Event> {
        return TimestampAssigner{ event: Event, _: Long->
            event.timestamp
        }
    }

}

自定义来源:

sourceFunction当前是一个rsocket连接,它连接到mockstream,我可以通过mockstream发送事件。发送(事件)。我对事件所做的第一件事是使用映射函数(从字符串到我的事件类型)解析它们,然后分配水印等。

共有1个答案

林烨华
2023-03-14

>

  • 水印生成器的每个并行实例将单独基于其观察到的事件运行。在源文件之后立即进行水印处理是有意义的(尽管通常更好的做法是直接在源文件中进行水印处理)。

    具有多个输入通道(例如应用程序中的键控窗口连接)的运算符将其当前水印设置为它从其活动输入通道接收到的水印的最小值。这会导致任何空闲源实例都将导致水印在下游任务中停止——除非这些源明确将自己标记为空闲。(FLINK-18934意味着在Flink 1.14之前,空闲传播无法正确使用连接。)在您的情况下,空闲源可能是可疑的。

    调试此类问题的一种策略是调出Flink WebUI并观察所有任务中当前水印的行为。

    要获得更多帮助,请共享应用程序的其余部分,或者至少共享自定义源和水印策略。

  •  类似资料:
    • 我有一个为Kafka主题生成的持续JSONArray数据,我想处理具有EventTime特性的记录。为了达到这个目标,我必须为JSONArray中包含的每个记录分配水印。 我没有找到一种方便的方法来实现这个目标。我的解决方案是消耗来自DataStreamSource的数据 主要代码如下所示: <代码>DataStreamSource 毫无疑问,代码似乎没有问题,运行时也没有错误。但ProcessW

    • 问题内容: 我正在与我的CompSci教授交谈,他建议将所有String 方法编写为: 而不是: 这两行都可以编译,但是我想知道第一种方法的好处是什么?我一直都是后一种方式。错了吗 什么是普通/常规? 问题答案: 第一种方法确保执行比较时不会收到 NullPointerException 。当您尝试在不存在的对象上调用方法时,抛出(发生)此异常。 以下是一些相关的切线:仔细阅读风险自负 不过要注意

    • 问题内容: 我正在尝试使用一个使用大量jQuery的Layout / Template的项目。 我已经学会了将模板与ReactJS Project集成在一起,但是,我正在寻找一种可以完全替代jQuery的解决方案。 我的解决方案之一是在内部使用jQuery函数或React函数。 这种方法正确吗?这是正确的方法吗? 我在下面附上一个小例子: 这是我的职能。 问题答案: 这种方法正确吗?这是正确的方法

    • 这就是我目前拥有所有实体类的方式(此处仅显示一个以供参考) 来自lombok参考: @Data注释可能是项目Lombok工具集中最常用的注释。它结合了@ToString、@EqualsAndHashCode、@Getter和@Setter的功能。本质上,在类上使用@数据与使用默认的@ToString和@EqualsAndHashCode注释该类以及使用@Getter和@Setter注释每个字段是一

    • 如果有人用下面给出的Apache flink给出usecase示例来解释每个水印API之间的差异,这将是有帮助的 周期水印- 标点符号水印-

    • 问题内容: 即使在使用Java Swing一年以上之后,对我来说,它仍然像魔术一样。如何正确使用BufferStrategy,尤其是方法? 我想添加一个JFrame和一个Canvas,然后进行绘制。我还希望能够调整()画布的大小。每次我调整Canvas的大小时,似乎都会被浪费掉,或者变得毫无用处,因为在上使用并没有真正做任何事情。另外,它具有怪异的不确定性行为,我不知道如何正确同步它。 这就是我的