当前位置: 首页 > 知识库问答 >
问题:

Flink获取KeyedState状态值并在另一个流中使用

梁丘柏
2023-03-14

我知道键控状态属于its键,只有当前键访问它的状态值,其他键不能访问不同键的状态值。

下面是示例(我知道keyBy(sommething)对于两个流操作都是一样的):

public class Sample{
       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(4))
                .process(new CustomMyProcessFunction())
                .name("CustomMyProcessFunction")
                .print();

       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(1))
                .process(new CustomMyAnotherProcessFunction())
                .name("CustomMyProcessFunction")
                .print();
}

public class CustomMyProcessFunction extends ProcessWindowFunction<..>
{
    private Logger logger = LoggerFactory.getLogger(CustomMyProcessFunction.class);
    private transient ValueState<SimpleEntity> simpleEntityValueState;
    private SimpleEntity simpleEntity;

    @Override
    public void open(Configuration parameters) throws Exception
    {
        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value == null)
        {
            SimpleEntity newVal = new SimpleEntity("sample");
            logger.info("New Value put");
            simpleEntityValueState.update(newVal);
        }
        ...
    }
...
}

public class CustomMyAnotherProcessFunction extends ProcessWindowFunction<..>
{


    private transient ValueState<SimpleEntity> simpleEntityValueState;

    @Override
    public void open(Configuration parameters) throws Exception
    {

        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value != null)
            logger.info(value.toString()); // I expect that SimpleEntity("sample")
        out.collect(...);
    }
...
}

共有1个答案

葛鸿轩
2023-03-14

正如已经指出的,状态总是单个操作符实例的本地状态。无法共享。

但是,您可以做的是将状态更新从持有状态的运算符流式传送给其他需要它的运算符。通过边输出,您可以创建复杂的数据流,而不需要共享状态。

 类似资料:
  • 我有两条流: 测量 WhoMeasured(关于谁进行了测量的元数据) 这些是它们的案例类: 流包含大量数据。流几乎没有任何可用性。事实上,对于<code>who_measured_id</code>流中的每个<code>who_。这本质上是一个哈希表,由流填充。 在我的自定义窗口函数中 这是我的工作。现在你可能会看到,有一些东西不见了:两个流的结合。 因此,从本质上讲,这是一种查找表,当流中的新

  • A类是bean范围原型 B类是Bean范围 Singleton有A的getter和setter 在C类中,B是自动连线的,并设置A的对象 如何使用springAnnotations或XML在类D中使用B获取A的值

  • 我想将输入XML转换为输出XML。同样,使用xslt进行XML转换。 输入xml和支持xml文件位于本地路径中(仅限同一路径)。 XSl和saxon9。jar位于服务器路径中。 将在本地路径中创建输出xml(与输入xml路径相同)。 使用xslt2.0我可以得到一个输入的xml值,但不能得到支持的xml值(存在于本地) d:\测试 下面是我的xsl用于从supporting.xml获取值 有人能帮

  • 我正试图使它使field2将显示在Field1中输入的任何值。我正在使用onchange事件来完成此操作。 field1是一个输入文本框,因此如果field1=“Mary”,则field2应为“Mary”。 如果field1说“花生酱和果冻”,field2应该说“花生酱和果冻”。 如果field1为空,则field2应为空。 现在我有一个代码使field2复制field1,但是我必须给它一组预定的

  • 问题内容: 我有两节课 在ABC班 我想在另一个课程中使用它,让我们说DEF课程 但是它说java.lang.NullPointerException,你知道吗?谢谢 这是我的ABC类的代码,我想在另一类> _ <中使用它们 问题答案: 您需要在内存中分配ABC,否则它指向,因此NullPointerException: 编辑 :好的,如果ABC没有像这样的零参数构造函数: 您有两种选择来解决您的

  • 如何使用ajax将两个参数从两个不同的文本框传递到另一个页面。我应该使用哪个函数来做到这一点。 指数jsp 收到jsp:我希望这两个参数值出现在这个页面中。 请帮帮我。 谢谢。