当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:如何处理三个流

冀冯浩
2023-03-14

我想在一个操作符中接收和处理三个流。例如,Storm中实现的代码如下:

<代码>生成器。setBolt(“C\u螺栓”,C\u螺栓(),parallelism\u提示)。字段分组(“A\u bolt”,“TRAINING”,新字段(“word”))。字段分组(“B\U螺栓”,“分析”,新字段(“word”))。所有分组(“A\U螺栓”、“总和”)

在Flink中,实现了SUM流(A_bolt的SideOutput)TRAINING流(A_bolt)的处理:

SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
                        .keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
                                    @Override
                                    public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
                                        return in.f0;
                                    }
                                })
                        .connect(Sum)
                        .flatMap(new Process())
                        .setParallelism(parallelism);

但我不知道如何添加分析流(B\u bolt)。谢谢你的帮助。

共有1个答案

楚昀
2023-03-14

Flink只支持一个输入和两个输入流操作符。您可以选择:

  1. 使用Union()创建一个合并流,其中包含来自所有三个流的所有元素(它们必须都是相同的类型,尽管您可以使用任何一种来协助执行此操作)。
  2. 使用coFlatMap合并两个流后,将该初步结果连接到第三个流,使用另一个coFlatMap(或coProcessFunction)完成处理。

或者在你的情况下,这两种技术的结合更可取。

 类似资料:
  • 我使用的是OpenGL,但我并不完全满意将每个三角形(或在我的例子中为四边形)的值传递给片段着色器的标准方法,即将它们分配给基本体的每个顶点,并将它们传递给顶点着色器,以推测不必要的插值(除非使用“flat”指令)在片段着色器中(也就是说,每个片段不变化)。 是否有某种方法可以存储每个三角形(或四边形)的值,这些三角形(或四边形)需要在片段着色器中访问,这样就不需要每个顶点的冗余副本?那么,这种方

  • 从主线程读取文件 将读取值存储到blockingqueue中,另一个线程将从中访问和处理。 有另一个线程要写入另一个文件。 但我不知道如何做到这一点。如果我声明了一个fixedthreadpool,我将无法控制哪个线程执行什么操作,但在这种方法中,这是否类似于顺序处理,因为线程属于不同的池? 如果有人能指导我如何执行此操作,这将对我有很大帮助。

  • 我试图将我的fetch查询的响应状态打印到控制台(以便以后处理那些边缘情况)。但是,唯一有效的console.log调用是“违规”函数中的调用。当帐户存在于HIBP数据库中时,我没有收到任何错误,但是当帐户不在数据库中时,我收到“请求失败:类型错误:响应. json不是json的函数”错误。我做错了什么?我从Google Web Dev文章中获得了错误处理代码。

  • 我是JPA的新手,有一个关于如何处理实体的问题。在我的例子中,我有3个实体:用户、组和事件。 一个事件总是属于一个组。这意味着有一个OneToMulti-Relation。一个用户可以订阅多个组,这意味着有一个ManyToMulti-Relation。现在我遇到麻烦的部分。一个用户也可以订阅多个事件,这意味着也有一个ManyToMulti-Relation。 现在我的问题是。我如何在我的组实体中列

  • 基于名字的虚拟主机 Nginx首先选定由哪一个虚拟主机来处理请求。让我们从一个简单的配置(其中全部3个虚拟主机都在端口*:80上监听)开始: server { listen 80; server_name example.org www.example.org; ... } server { listen 80; server_nam

  • 我是德国富特旺根大学的学生。 我在我的最后一个学期,我正在写我的论文。我对iBeacons及其背后的技术非常感兴趣。我目前的项目是将信标技术与其他技术进行比较,如GPS、无线定位、GSM和NFC。在我的论文中,我将创建不同的用例并比较结果。 在过去的几天里,我试图确定自己在房间里的位置。我使用三个信标的相对距离(精度),并给每个信标一个固定的位置。我得到三个圆,计算出6个交点。当弧度(精度)太低时