当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:如何为键控的CoFlatMapFunction划分事件?

薛博艺
2023-03-14

这是关于连接键控流的一个非常基本的问题。

如果我有两个具有共享相同逻辑键的相关事件的流,并且这些流正在连接(使用该键进行逻辑连接),并且所有这些流都以并行方式运行

这是一个关于医院患者流的虚构示例——温度流和心跳流。我们希望使用ConnectedStreamCoFlatMapFunction通过患者的id加入这两个流。

DataStream<PatientTemperature> temperatureStream = ..
DataStream<HeartbeatStream> heartbeatStream = ..

temperatureStream
   .keyBy(pt -> pt.getPatientId())
   .connect (heartBeatStream.keyBy(hbt -> hbt.getPatientId() )
   .flatMap (new RichCoFlatMapFunction() {

         ValueState<PatientTemperatureAndHeartBeat> state = ...

         public void flatMap1(PatientTemperature value, Collector<PatientTemperatureAndHeartBeat> out) {
                state.value().setTemperature(value);  
         }

      public void flatMap2(PatentHeartbeat value, Collector<PatientTemperatureAndHeartBeat> out) {

               PatientTemperatureAndHeartBeat temperatureAndHeartBeat = state.value()
               temperatureAndHeartBeat.setHeartBeat(value)
               out.collect(temperatureAndHeartBeat);

      }

      });

假设它以并行度=3运行,操作员任务A、B、C,并且它们都在不同的物理机器中运行。

Flink将保证患者“JohnDoe”的所有温度事件都将在同一个并行操作符实例中结束。假设它在操作符B中结束。

但是当Flink接收到“JohnDoe”的HeartBeat事件时,它如何知道将它们发送到操作员B,在那里发送患者的温度事件。除非温度HeartBeat事件都发送到同一个并行实例运算符,否则连接将不起作用。

两个流都使用相同的逻辑密钥(即患者的id)这一事实是特定于应用程序的,Flink不知道。这两个连接的流可能使用彼此无关的自己的密钥。

共有1个答案

呼延鹏云
2023-03-14

当然,键的选择是特定于应用程序的。但是,Flink知道如何访问这些键,因为您提供了键选择器功能(pt-

因此,两个流的相同值被传送到同一个操作符实例。

 类似资料:
  • 我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了

  • 我的总体目标是能够使用TAB键在3个div之间导航,每个div都有#sectionA、#sectionB和#sectionC的CSS id。在每个div中,我有一个无序列表,我想使用左右箭头键来浏览列表。 我的超文本标记语言如下: 到目前为止,我能够得到以下jquery代码,但一旦我添加了第二个$(文档). keydown(函数()代码,就无法工作。

  • 如何在ApacheFlink中为会话窗口分配id? 最后,我希望在会话窗口打开时,使用会话窗口id逐个充实事件(我不希望等到窗口关闭后再发出充实事件)。 我尝试使用AggregateFunction来实现这一点,但是我认为merge()并没有像我所期望的那样工作。它似乎是用于合并窗口而不是窗格(触发触发)。在我的管道中似乎从未调用过它。因此,触发器之间似乎没有共享状态! 会话窗口ID将是落入窗口的

  • 假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。

  • 我正在实现一个KeyEventDispatcher。dispatchKeyEvent()将所有键盘操作传递给外部(Android)设备。将KeyEvents轻松转换为设备提供的协议的一个关键问题是将事件分为两种类型:Unicode事件和“动作事件”(杂项、编辑、导航键-F1、Del、箭头等)。 Java非常方便地告诉用户击键生成的Unicode字符:您只需检查是否,并且保证生成的字符为。无需处理修

  • 我的数据流来自一个定制的SourceFunction,它以确定性的顺序发出窗口大小的字符串序列。其目的是基于EventTime在keyedstream上创建滑动窗口,以便对累积的字符串进行处理。为了分配EventTime和水印,我将一个带有PeriodicWatermarks的赋值器附加到流。滑动窗口使用自定义ProcessWindowFunction进行处理。 My AssignerWithPe