当前位置: 首页 > 知识库问答 >
问题:

Flink:在CoFlatMapFunction中共享状态

葛高澹
2023-03-14

使用CoFlatMapFunction时有点卡住了。如果我把它放在DataStreambefore窗口上,它似乎可以正常工作,但如果放在窗口的“apply”函数之后,它就会失败。

我正在测试两个流,主功能在flatMap1不断摄取数据和控制流模型在flatMap2更改模型的要求。

我能够在flatMap2中正确设置并查看b0/b1,但是flatMap1始终可以看到b0和b1在初始化时设置为0。

我是不是漏了什么明显的东西?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}

共有1个答案

符渊
2023-03-14

以下是邮件列表中的答案。。。

CoFlatMapFunction是否打算并行执行

如果是,则需要某种方法来确定地将哪个记录分配给哪个并行实例。在某种程度上,CoFlatMapFunction在模型和会话窗口的结果之间进行并行(分区)连接,因此您需要某种形式的键来选择元素转到哪个分区。这有意义吗?

如果不是,请尝试显式地将其设置为parallelism 1。

你好斯蒂芬

所有人都可以只读访问的全局状态可以通过广播()实现。

可供所有人读取和更新的全局状态目前不可用。一致的操作将是相当昂贵的,需要某种形式的分布式通信/共识。

相反,我鼓励您采用以下方法:

1)如果可以对状态进行分区,请使用keyBy()。mapBackState()-这可以本地化状态操作,并使其非常快。

2)如果你的状态不是按键组织的,你的状态很可能很小,你可能可以使用非并行操作。

3)如果一些操作更新了状态,另一个操作访问了状态,您通常可以通过迭代和CoFlatMapFunction来实现(一边是原始输入,另一边是反馈输入)。

所有的方法最终都将状态访问和修改本地化,如果可能的话,这是一个很好的模式。

你好斯蒂芬

 类似资料:
  • 这个问题在这里已经被问到了,但是两年过去了,我想知道是否有什么改变。 我有一个用例,我希望在两个Flink操作符之间共享状态: > A流是主流,它连续流动 流B只是富集数据的数据集。它很大(几个GBs),因此不能作为广播流。 流B有一个与之相关联的运算符(FlatMap,但可以是任何实际的),它充当状态加载器,并将浓缩数据作为列表状态加载到RocksDB中。 null

  • 我想知道在Flink中是否可以在运营商之间共享状态。 比方说,我在一个操作符上按键进行分区,我需要一个分区的状态(无论出于什么原因)(图1.a),或者我需要下游操作符中操作符的状态(图1.b)。 我知道可以将记录广播到所有分区。因此,如果在记录中包含操作符的内部状态,则可以与下游操作符共享内部状态 然而,这可能是一个昂贵的操作,而不是简单地让op1专门请求op2状态。 可查询状态的最新发展是朝着这

  • 有没有办法以可共享的格式(比如Json)指定Apache Flink CEP模式? 我的用例是:开发一个规则引擎,允许用户自定义CEP模式,而无需编写大量Java代码,并轻松地与其他人共享模式。

  • 我计划使用cassandra作为我的应用程序的nosql数据存储。我的用例之一是更新用户的“余额”。假设每个用户的余额存储为一个关键UID_balance。现在,如果我的应用程序想要更新多个用户的余额,我将如何处理原子性? 我想,在某个时刻,应用程序基本上将执行以下操作: 现在,这里有几个问题: 与cassandra的连接可能会中断,导致代码只更新少数用户的余额 在步骤2和4之间,可能有另一个进程

  • 看来我在路由之间共享信息时遇到了问题。 传递信息的骆驼模式是什么? 我查看了交易所属性,但我认为这在路线之间并没有停留...... 例如: 一个文件有一个文件有一些配置,我有一个路由来读取这个文件,还有几个其他路由将根据配置进行操作, 我该如何完成此操作? 我曾想过将值放入单例bean中,但这看起来有点难看。。。

  • 虽然消息传递是一个很好的处理并发的方式,但并不是唯一一个。再一次思考一下 Go 编程语言文档中口号的这一部分:“通过共享内存通讯”: What would communicating by sharing memory look like? In addition, why would message passing enthusiasts not use it and do the opposi