使用CoFlatMapFunction
时有点卡住了。如果我把它放在DataStream
before窗口上,它似乎可以正常工作,但如果放在窗口的“apply”函数之后,它就会失败。
我正在测试两个流,主功能在flatMap1
不断摄取数据和控制流模型在flatMap2
更改模型的要求。
我能够在flatMap2
中正确设置并查看b0/b1,但是flatMap1
始终可以看到b0和b1在初始化时设置为0。
我是不是漏了什么明显的东西?
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " + this + "\n");
}
@Override
public String toString(){
return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
}
}
以下是邮件列表中的答案。。。
CoFlatMapFunction是否打算并行执行?
如果是,则需要某种方法来确定地将哪个记录分配给哪个并行实例。在某种程度上,CoFlatMapFunction在模型和会话窗口的结果之间进行并行(分区)连接,因此您需要某种形式的键来选择元素转到哪个分区。这有意义吗?
如果不是,请尝试显式地将其设置为parallelism 1。
你好斯蒂芬
所有人都可以只读访问的全局状态可以通过广播()实现。
可供所有人读取和更新的全局状态目前不可用。一致的操作将是相当昂贵的,需要某种形式的分布式通信/共识。
相反,我鼓励您采用以下方法:
1)如果可以对状态进行分区,请使用keyBy()。mapBackState()-这可以本地化状态操作,并使其非常快。
2)如果你的状态不是按键组织的,你的状态很可能很小,你可能可以使用非并行操作。
3)如果一些操作更新了状态,另一个操作访问了状态,您通常可以通过迭代和CoFlatMapFunction来实现(一边是原始输入,另一边是反馈输入)。
所有的方法最终都将状态访问和修改本地化,如果可能的话,这是一个很好的模式。
你好斯蒂芬
这个问题在这里已经被问到了,但是两年过去了,我想知道是否有什么改变。 我有一个用例,我希望在两个Flink操作符之间共享状态: > A流是主流,它连续流动 流B只是富集数据的数据集。它很大(几个GBs),因此不能作为广播流。 流B有一个与之相关联的运算符(FlatMap,但可以是任何实际的),它充当状态加载器,并将浓缩数据作为列表状态加载到RocksDB中。 null
我想知道在Flink中是否可以在运营商之间共享状态。 比方说,我在一个操作符上按键进行分区,我需要一个分区的状态(无论出于什么原因)(图1.a),或者我需要下游操作符中操作符的状态(图1.b)。 我知道可以将记录广播到所有分区。因此,如果在记录中包含操作符的内部状态,则可以与下游操作符共享内部状态 然而,这可能是一个昂贵的操作,而不是简单地让op1专门请求op2状态。 可查询状态的最新发展是朝着这
有没有办法以可共享的格式(比如Json)指定Apache Flink CEP模式? 我的用例是:开发一个规则引擎,允许用户自定义CEP模式,而无需编写大量Java代码,并轻松地与其他人共享模式。
我计划使用cassandra作为我的应用程序的nosql数据存储。我的用例之一是更新用户的“余额”。假设每个用户的余额存储为一个关键UID_balance。现在,如果我的应用程序想要更新多个用户的余额,我将如何处理原子性? 我想,在某个时刻,应用程序基本上将执行以下操作: 现在,这里有几个问题: 与cassandra的连接可能会中断,导致代码只更新少数用户的余额 在步骤2和4之间,可能有另一个进程
看来我在路由之间共享信息时遇到了问题。 传递信息的骆驼模式是什么? 我查看了交易所属性,但我认为这在路线之间并没有停留...... 例如: 一个文件有一个文件有一些配置,我有一个路由来读取这个文件,还有几个其他路由将根据配置进行操作, 我该如何完成此操作? 我曾想过将值放入单例bean中,但这看起来有点难看。。。
虽然消息传递是一个很好的处理并发的方式,但并不是唯一一个。再一次思考一下 Go 编程语言文档中口号的这一部分:“通过共享内存通讯”: What would communicating by sharing memory look like? In addition, why would message passing enthusiasts not use it and do the opposi