当前位置: 首页 > 知识库问答 >
问题:

RichFlatMap中的状态管理有和没有键通过

华恩
2023-03-14

我有这样一个流应用程序:

DataStream<MyObject> stream1 = source
                .keyBy("clientip")
                .flatMap(new MyFlatMapFunction())
                .name("Stream1");

//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {

    private transient ValueState<Boolean> valueState;

    @Override
    public void open(Configuration parameters)
    {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(12))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
                .build();

        ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
                "valueState",
                Types.BOOLEAN);

        valueStateeDescriptor.enableTimeToLive(ttlConfig);
        valueState = getRuntimeContext().getState(valueState);
    }

    @Override
    public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
    {
       // get value from value state, check if it is matched with something
       // if matches some condition, then collector.collect(myObject)
       // update state for each myObject
    }
}

否:3台机器上有3名工人,并行度为16。总并行度为48。

在实现此代码时,我始终假设“如果ip地址1.2.3.4与条件匹配,则来自同一ip地址1.2.3.4的后续请求始终与条件匹配,直到状态被清除”。这句话对吗?

据我从flink docs了解,如果ip地址1.2.3.4转到机器1(通过生成clientip的哈希值),那么来自ip地址1.2.3.4的所有请求都会转到机器1?

方法在taskmanager jvm中调用一次。因此,flink创建了48个flatMapOperation实例(48个中的1-15个驻留在machine1中,48个中的16-32个驻留在machine2中,48个中的33-48个驻留在machine3中),每个flatMapInstance都将运行open方法。这意味着开放式方法需要48次?

最后,所有48个实例都访问相同的状态,但值不同(因为状态是本地的)。我的意思是,一部分实例组(假设机器1上的16个实例)将获得相同的状态值。

最后,如果在FlatMap之前没有keyBy,那么来自ip地址的请求1.2.3.4可以随机访问机器1、机器2或机器3?

共有1个答案

昝浩阔
2023-03-14
  1. 由于您执行了一个keyBy(“clientip”),因此该字段具有相同值的所有记录将由相同的MyFlatMapFunction子任务处理。因此,所有记录的集合被划分为48个子任务,并且假设IP地址的计数均匀分布,每个子任务将获得大约48条记录的1/48
  2. 是的,将有48个MyFlatMapFunction的实例被实例化,因此有48个对open()的调用
  3. 所有48个实例都访问相同的状态。否,状态是每个唯一键,因此状态是按键值在48个子任务之间划分的
  4. 如果没有keyBy(),那么MyFlatMapFunction操作符的每个子任务都将从源中获取分区中的任何数据。这取决于您的数据源,例如,如果您正在阅读一个Kafka主题,而该主题有48个分区,那么从Kafka分区到MyFlatMapFunction子任务有一个1对1的映射。如果您的Kafka分区少于48个,那么您的一些MyFlatMapFunction子任务将无法获取任何数据。如果要将传入记录重新分发到所有子任务,则可以执行重新平衡()。但请注意,您将无法维护每个IP地址的状态
 类似资料:
  • 写rust 程序,不知道全局状态放那里,lazy_static 太难用了,难道要把sqlite 中吗

  • 我想知道是否有一种方法,使通知弹出状态栏上方,像传统的祝酒词消息,当第一次收到。默认情况下,当收到通知时,通知标签会显示在状态栏中,并暂时隐藏其他通知图标(如果有的话),直到通知标签显示完毕。然后,通知图标通常被添加到等待用户交互的图标的水平列表中。我想要最初的接收报价只是上升到状态栏之上,然后添加图标与其余的他们。

  • 来自服务器的数据以及是否挂起或导致错误 UI状态如切换,警报和错误消息 自定义主题,凭据和本地化 许多其他类型的状态 Redux using ng2-redux Angular Services and RxJS(推荐)

  • 管理应用程序状态是个难题。您需要在多个后端,Web workers和UI组件之间进行协调。 像Redux和Flux这样的模式旨在通过使这种协调更加明确来解决这个问题。在本文中,我将展示如何使用RxJS在几行代码中实现类似的模式。然后我将展示如何使用这种模式来实现一个简单的Angular 2应用。 在谈论架构模式时,我喜欢从描述其核心属性开始。你可以写在餐巾背上的东西。The devil, of c

  • 建议使用基于redux封装出来的rematch, anujs也自带了这个框架。 rematch的官网 https://github.com/rematch/rematch resolve: { alias: { react: "anujs", "react-dom": "anujs", rematch: "anujs/dist/Rematch.js

  • 我知道如何发送一个通知到状态栏,但当它第一次启动时,它接管了整个状态栏,你看不到任何其他东西,除了你的图标。有没有什么方法可以显示你的图标,而不是它接管整个状态栏几秒钟?