当前位置: 首页 > 知识库问答 >
问题:

在Flink作业中通过两个流与主流进行操作

范兴文
2023-03-14

在Flink-Job中,我目前有两个流,一个是每分钟从Kafka主题更新的主数据流,另一个流(广播流)用于KeyedBroadcastProcessFunction的process元素函数中,用于对主流数据进行一些计算。

2)主数据可以有两个广播流吗?

3)由于流数据是完全不同的数据,广播,第三个数据流不经常变化,所以连接是不起作用的。它就像一个主数据,在计算中和主数据流一起使用,找不到任何解决方案,但请帮助。请分享一些我可以参考的链接。

共有1个答案

夏才
2023-03-14

Flink不提供任何具有三个输入的过程函数。

您可以将两个广播流合并在一起(在广播之前)。我很欣赏他们是非常不同的类型,但你总能找到一些方法让他们共存。如果没有更自然的方法来统一这两种类型,您可以使用任何一种。要将两个完全不同的类型联合到一个流中,可以执行如下操作:

java prettyprint-override">DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);

DataStream<Either<String, Integer>> stringsOnTheLeft = strings
        .map(new MapFunction<String, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(String s) throws Exception {
                return Either.Left(s);
            }
        });

DataStream<Either<String, Integer>> intsOnTheRight = ints
        .map(new MapFunction<Integer, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(Integer i) throws Exception {
                return Either.Right(i);
            }
        });

DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);

或者如果您可以将广播流以不同的阶段应用于主流,那么您可以有两个KeyedBroadcastProcessFunctions的序列,其中一个输出提供给另一个:

events
    .keyBy(x -> x.foo)
    .connect(broadcast1)
    .process(new process1())
    .keyBy(x -> x.foo)
    .connect(broadcast2)
    .process(new process2())

...[我]如何向[广播]流提供与这些相同的密钥?

钥匙不一定要是一样的,它们只要是相同的类型就可以了。

 类似资料:
  • 例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。

  • 我开始使用flink,看看官方教程之一。 据我所知,这个练习的目标是在时间属性上加入两个流。 任务: 此练习的结果是一个Tuple2记录的数据流,每个记录对应一个不同的rideId。您应该忽略结束事件,只在每次骑乘开始时加入事件,并提供相应的票价数据。 生成的流应打印到标准输出。 问:EnrichmentFunction如何连接这两个流aka。它如何知道参加哪个集市和哪个骑行?我希望它能够缓冲多个

  • 我试图创建和运行一个豆荚使用气流kubernetes豆荚操作员。下面的命令被尝试并确认有效,我正试图在本地使用kubernetes pod操作符复制相同的命令 有没有办法将serviceaccount标志传递给airflow kubernetes操作员? 谢了!

  • 我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于进行处理: 有没有办法在不更改代码的情况下解决这个问题?