当前位置: 首页 > 知识库问答 >
问题:

“广播状态”解除了Flink的CEP库的“动态模式”功能的实现,这意味着什么?

龙俭
2023-03-14
public static void ConnectBroadToKeyedStream() throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();
}

private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    }

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) {

        if (blacklist.contains(data_value)) {
            out.collect("skipped " + data_value);
        } else {
            out.collect("passed " + data_value);
        }
    }
}
1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

共有1个答案

上官锦
2023-03-14

在没有广播状态的情况下,两个Flink数据流不能以有状态的方式一起处理,除非它们以完全相同的方式进行键控。广播流可以连接到键控流,但如果您随后尝试在RichCoFlatMap中使用键控状态,则会失败。

经常需要的是能够使一个流具有动态的“规则”,这些规则将应用于另一个流上的每个事件,而不管关键字是什么。需要有一种新的托管Flink状态,可以在其中存储这些规则。对于广播状态,现在可以用一种简单的方式来完成。

有了这个特性,就可以开始支持CEP中的动态模式了。

 类似资料:
  • 从Flink 1.5发布公告中,我们知道Flink现在支持“广播状态”,并描述为“广播状态解除了Flink的CEP库的“动态模式”功能的实现。”。 这是否意味着目前我们可以在没有Flink CEP的情况下使用“广播状态”来实现“动态模式”?我也不知道在有或没有广播状态的情况下为Flink CEP实现“动态模式”有什么区别?我将不胜感激如果有人能举一个带有代码的例子来解释区别。 ==========

  • 在广播模式的文档中,提到没有RocksDB状态后端: 如果应用程序使用rocksdb作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存点期间不存储,因此不会恢复?

  • 我尝试在我的flink应用程序中使用广播状态模式,但经过一些研究,我做了以下工作: 在中,我读取数据,并根据来自的数据对数据进行一些逻辑处理并发出一些元素。基本上,我使用就像广播状态模式一样。我没有专门使用广播,因为没有简单的方法可以从访问我的某些状态。由于我的配置流被用作清理状态的指示符,我在我的中拥有。 流是<代码>。keyBy所以我不希望出现并行性问题 我的问题是,还需要广播哪些案例?在什么

  • 我想创建一个

  • 看着新的Azure cosmos数据库,我对它的多模型特性有点困惑。具体而言,这是否意味着: a)相同的底层数据库/存储可以以多种方式并发查询,以便我可以对相同的集合使用gremlin图查询和mongodb api。 或 b)这是否意味着您可以在预配Cosmos DB时选择不同的模型(图、键值、列、文档),这就是从那时起存储数据的方式。 小册子让它听起来像a),但使用Azure仪表板创建cosmo

  • 我们试图构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。从阅读文档来看,在我看来,Flink broadcast state自然适合这种情况。 作为一个实验,我构建了一个简化的版本:假设我有一个整数流,第二个流包含这些整数的乘法因子(我可以随意发送值)。第二个流的频率很低,很容易在事件之间的几天或几周内出现。目前,这两个都实现为简单的套接字服务器,最终产品将使用