当前位置: 首页 > 知识库问答 >
问题:

ProcessWindowFunction中的键控状态存储行为(Apache Flink Java)

韩志专
2023-03-14

我有一个ProcessWindowFunction用于处理TumblingEventTimeWindows,其中我使用状态存储在多个滚动窗口中保存一些值。我的问题是,这个状态存储没有在滚动窗口中保存,也就是说,如果我首先在windows[0,999]中存储一些东西,然后从windows[1000,1999]访问这个存储,那么这个存储是空的。我知道这里所述的全局状态和每个窗口状态。我要使用全局状态。我还尝试创建一个最低限度的工作示例来调查这一点:

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;


public class twStateStoreTest {


    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000L);

        final DataStream<Element> elements = env.fromElements(
                Element.from(1, 500),
                Element.from(1, 1000),
                Element.from(1, 1500),
                Element.from(1, 2000),

                Element.from(99, 9999)
                ).
                assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
                    long w;
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(w);
                    }

                    @Override
                    public long extractTimestamp(Element element, long previousElementTimestamp) {
                        w = element.getTimestamp();
                        return w;
                    }
                });

        elements
                .keyBy(new KeySelector<Element, Integer>() {
                    @Override
                    public Integer getKey(Element element) throws Exception {
                        return element.value;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
                .process(new MyProcessWindowFn()).
                print();

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }

    static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
        MapState<Integer, Integer> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
        }

        @Override
        public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {

            if (stateStore.get(key) == null) {
                stateStore.put(key, 1);
            }else {
                int previous = stateStore.get(key);
                stateStore.put(key, previous+1);
            }
            out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
                    + " for window : " + context.window());
        }
    }




    static class Element {
        private final long timestamp;
        private final int value;

        public Element(long timestamp, int value) {
            this.timestamp = timestamp;
            this.value = value;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public int getValue() {
            return value;
        }

        public static Element from(int value, long timestamp) {
            return new Element(timestamp, value);
        }
    }


}

在这里,我试图计算process()函数为键调用的次数。这个例子起作用,并且状态确实存储在滚动窗口中。我已经确保这个示例完全镜像了实际的processWindow函数,去掉了其他不必要的代码。

但是在实际的ProcessWindowFunction中并没有跨窗口保留状态!

是不是有什么我明显漏掉的线索?对于使用如下定义的MapState的processWindowFunction,不在EventTimeTumblingWindows之间保留状态是否还有其他原因:

private MapState<UserDefinedEnum, Boolean> activeSessionStore;

@Override
    public void open(Configuration parameters) throws Exception {
        activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
                                                "name", UserDefinedEnum.class, Boolean.class));
    }
public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {

    private final static MapStateDescriptor<IUEventType, Boolean> desc =  new MapStateDescriptor<IUEventType, Boolean>(
            "store", IUEventType.class, Boolean.class);
    private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);

    @Override
    public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
        ...

        MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);

        Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
        // even though I populated activeSessionStore with some values in the previous invocation of process()

        ... do something based on lastFeatureStates....

        activeSessionStore.put(...);
    }

    @Override
    public void clear(Context context) throws Exception {
        context.globalState().getMapState(desc).clear();
    }
}
inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
            process(new IUFeatureStateCombiner())

编辑:问题已解决,不应该调用clear()方法,因为这是一个全局状态。

共有1个答案

张卓
2023-03-14

你想做更像这样的事。请记住,这些是每个键的状态存储--每个键都有一个单独的映射--所以您在哪里做statestore.get(key),这实际上是没有意义的。也许您所需要的只是valueState,如果您只需要为每个键存储一个整数。

static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
    private final static MapStateDescriptor mapDesc = new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class);

    @Override
    public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {

        MapState<Integer, Integer> stateStore = context.globalState.getMapState(mapDesc);

        ...
    }
}

请注意,全局状态存储从未被清除。因此,如果您有一个无界的密钥空间,您最终会遇到问题。您可以在状态描述符上配置状态TTL来处理这个问题。

 类似资料:
  • 我在工作中使用ProcessWindowFunction并保持StateValue。我的目标是将值保持在超过1个窗口的状态,这意味着状态不会在每个窗口的末尾被清除。我有两个问题: 我怎样才能清除状态?有没有设置触发器并用它来清除状态的选项?(当在ProcessFunction中使用状态时,我能够设置触发器以执行此清除,即使没有新事件) 有没有一种方法来构建一个单元测试来检查我的ProcessWin

  • 我目前正在考虑将opengl状态存储为某种适当类型的全局thread_local变量。那个设计有多糟糕?有什么陷阱吗?

  • 全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。

  • 问题内容: 广泛的讨论问题。是否已经有任何库可以让我在Java中存储应用程序的执行状态? 例如,我有一个处理文件的应用程序,现在该应用程序可能在某个时刻被迫关闭。我想存储所有已处理文件和未处理文件的信息,以及处理正在进行的阶段正在进行的流程。 是否已经有抽象此功能的库,或者我将不得不从头开始实现它? 问题答案: 似乎您正在寻找的是可以使用Java Serialization API 执行的序列化。

  • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

  • 我是Fink新手,希望计算流的键控总会话持续时间: 表示会话开始,而表示会话结束。预期输出应为事件到达时每个键控的总持续时间。因此,上述数据的样本输出为 在我的实现中,我使用了一个和一个全局的来跟踪 然而,在调试过程中,我无法得到我想要的。 每次调用时,globalstate都是新的对象,并且没有在上一个窗口中计算的任何数据。 因此,我想问 如何在ProcessWindowFunction中获取g