从Flink 1.5发布公告中,我们知道Flink现在支持“广播状态”,并描述为“广播状态解除了Flink的CEP库的“动态模式”功能的实现。”。
这是否意味着目前我们可以在没有Flink CEP的情况下使用“广播状态”来实现“动态模式”?我也不知道在有或没有广播状态的情况下为Flink CEP实现“动态模式”有什么区别?我将不胜感激如果有人能举一个带有代码的例子来解释区别。
=============
使用键控数据流更新operator broadcast()测试广播数据流
在Flink 1.4.2中进行测试后,我发现广播数据流(由old operater broadcast()提供)可以与键控数据流连接,下面是测试代码,我们发现所有的控制流事件都广播到所有的operator实例。因此,旧的broadcast()似乎可以实现与新的“broadcast state”相同的功能。
public static void ConnectBroadToKeyedStream() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
List<Tuple1<String>>
controlData = new ArrayList<Tuple1<String>>();
controlData.add(new Tuple1<String>("DROP"));
controlData.add(new Tuple1<String>("IGNORE"));
DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);
List<Tuple1<String>>
dataStreamData = new ArrayList<Tuple1<String>>();
dataStreamData.add(new Tuple1<String>("data"));
dataStreamData.add(new Tuple1<String>("DROP"));
dataStreamData.add(new Tuple1<String>("artisans"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
dataStreamData.add(new Tuple1<String>("IGNORE"));
// DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);
DataStream<String> result = control
.broadcast()
.connect(keyedDataStream)
.flatMap(new MyCoFlatMap());
result.print();
env.execute();
}
private static final class MyCoFlatMap
implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
HashSet blacklist = new HashSet();
@Override
public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
blacklist.add(control_value);
out.collect("listed " + control_value);
}
@Override
public void flatMap2(Tuple1<String> data_value, Collector<String> out) {
if (blacklist.contains(data_value)) {
out.collect("skipped " + data_value);
} else {
out.collect("passed " + data_value);
}
}
}
以下是测试结果。
1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)
https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement
下面是一个代码示例,它实现了flink的原始广播方法(无参数)和flink 1.5.0上新引入的广播状态。https://gist.github.com/syhily/932e0d1e0f12b3e951236d6c36e5a7ed
据我所知,广播状态可以在没有flink cep的情况下实现,就像上面显示的代码一样。
原始的DataStream
的广播
方法将创建一个DataStream
而不是BroadcastConnectedStream
。这将是最初的coGroup设计方案。在将指标流与广播规则流连接后,我们可以使用ConnectedStreams
中定义的更多流转换函数。例如keyBy
函数,这将使具有相同密钥的广播流和连接流被进程
编辑并粘贴在相同的并行CoProcessFunction
上。因此CoProcessFunction
可以有自己的本地存储。进程函数可以在其字段上具有自定义数据结构,而不是从ReadOnlyContext
访问的映射状态。
广播状态可以通过一组映射状态描述符(MapStateDescriptor)的广播(Broadcast)方法来实现,这意味着广播流可以多次与其他流连接。不同的已连接的BroadcastConnectedStream可以与进程中唯一的MapStateDescriptor共享自己的广播状态。
我认为这些将是广播与on参数和广播状态之间的关键区别。
如果没有广播状态,两个Flink数据流不能以有状态的方式一起处理,除非它们以完全相同的方式进行键控。广播流可以连接到键控流,但如果然后尝试在RichCoFlatMap中使用键控状态,则会失败。
经常需要的是能够使一个流具有动态“规则”,这些规则将应用于另一个流上的每个事件,而不管键是什么。需要一种新的托管Flink状态来存储这些规则。使用广播状态,现在可以以一种简单的方式完成此操作。
现在有了这个特性,就可以开始在CEP中支持动态模式了。
我想创建一个
我想这意味着查询已经执行,MySQL正在向客户机发送“结果”数据,但我想知道为什么要花这么多时间(长达一个小时)。 谢谢你。
在广播模式的文档中,提到没有RocksDB状态后端: 如果应用程序使用rocksdb作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存点期间不存储,因此不会恢复?
看着新的Azure cosmos数据库,我对它的多模型特性有点困惑。具体而言,这是否意味着: a)相同的底层数据库/存储可以以多种方式并发查询,以便我可以对相同的集合使用gremlin图查询和mongodb api。 或 b)这是否意味着您可以在预配Cosmos DB时选择不同的模型(图、键值、列、文档),这就是从那时起存储数据的方式。 小册子让它听起来像a),但使用Azure仪表板创建cosmo