在Flink-Job中,我目前有两个流,一个是每分钟从Kafka主题更新的主数据流,另一个流(广播流)用于KeyedBroadcastProcessFunction的process元素函数中,用于对主流数据进行一些计算。
2)主数据可以有两个广播流吗?
3)由于流数据是完全不同的数据,广播,第三个数据流不经常变化,所以连接是不起作用的。它就像一个主数据,在计算中和主数据流一起使用,找不到任何解决方案,但请帮助。请分享一些我可以参考的链接。
Flink不提供任何具有三个输入的过程函数。
您可以将两个广播流合并在一起(在广播之前)。我很欣赏他们是非常不同的类型,但你总能找到一些方法让他们共存。如果没有更自然的方法来统一这两种类型,您可以使用任何一种。要将两个完全不同的类型联合到一个流中,可以执行如下操作:
java prettyprint-override">DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);
DataStream<Either<String, Integer>> stringsOnTheLeft = strings
.map(new MapFunction<String, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(String s) throws Exception {
return Either.Left(s);
}
});
DataStream<Either<String, Integer>> intsOnTheRight = ints
.map(new MapFunction<Integer, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(Integer i) throws Exception {
return Either.Right(i);
}
});
DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);
或者如果您可以将广播流以不同的阶段应用于主流,那么您可以有两个KeyedBroadcastProcessFunctions的序列,其中一个输出提供给另一个:
events
.keyBy(x -> x.foo)
.connect(broadcast1)
.process(new process1())
.keyBy(x -> x.foo)
.connect(broadcast2)
.process(new process2())
...[我]如何向[广播]流提供与这些相同的密钥?
钥匙不一定要是一样的,它们只要是相同的类型就可以了。
我有一个
Java8 API说:
例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。
我开始使用flink,看看官方教程之一。 据我所知,这个练习的目标是在时间属性上加入两个流。 任务: 此练习的结果是一个Tuple2记录的数据流,每个记录对应一个不同的rideId。您应该忽略结束事件,只在每次骑乘开始时加入事件,并提供相应的票价数据。 生成的流应打印到标准输出。 问:EnrichmentFunction如何连接这两个流aka。它如何知道参加哪个集市和哪个骑行?我希望它能够缓冲多个
我试图创建和运行一个豆荚使用气流kubernetes豆荚操作员。下面的命令被尝试并确认有效,我正试图在本地使用kubernetes pod操作符复制相同的命令 有没有办法将serviceaccount标志传递给airflow kubernetes操作员? 谢了!
我正在亚马逊的EMR集群上同时运行3个Spark流进程。问题是这三个Spark流作业中的一个基于进行处理: 有没有办法在不更改代码的情况下解决这个问题?