当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中连接2个以上的流?

杜河
2023-03-14

我有3个不同类型的键控数据流。

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
first.connect(second).process(<CoProcessFunction>)

我不能使用联合(允许多个数据流),因为类型不同。我希望避免创建包装器,并将所有流转换为相同的类型。

共有1个答案

商皓
2023-03-14

包装器方法并不是太坏,真的。您可以创建一个eitherofthree 包装类,它类似于Flink现有的either ,然后在单个函数中处理这些记录的流。类似于:

    DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
        .union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
        .union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
    combo.process(new MyProcessFunction());

Flink的或者类有一个更优雅的实现,但对于您的用例来说,应该使用一些简单的实现。

 类似资料:
  • 我试图用SPARK SQL编写一个查询,执行三个表的联接。但是查询输出实际上是。它适用于单人餐桌。我的联接查询是正确的,因为我已经在oracle数据库中执行了它。我需要在这里附加什么更正?Spark版本是2.0.0。 我在一些网页上发现了下面的说明,但它仍然不起作用:

  • 例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。

  • 问题#1:我正在研究一个案例场景,在这个场景中,我们需要融合来自多个传感器(例如8个传感器)的数据,并以树的形式将它们连接起来。例如,将[s1、s2、s3和s4]连接起来形成流A,然后将[s5、s6、s7和s8]连接起来形成流B,然后对流A和B执行CEP。如何实现这一点? 问题#2:是否可以对多个流执行CEP,即多个流?。flink 1.3.2 API中明确提到,模式将应用于一个流 如果模式不能应

  • 提前道谢!

  • 我有两条左右流。就在同一时间窗口 左侧流包含元素L1、L2(数字为键) 右流包含元素R1、R3 我想知道如何在Apache Flink中实现LEFT OUTER JOIN,以便处理此窗口时获得的结果如下: L1、R1通过键(1)匹配,L2、R3不匹配。L2包括在内,因为它位于左侧