我有一个ProcessWindowFunction
用于处理TumblingEventTimeWindows,其中我使用状态存储在多个滚动窗口中保存一些值。我的问题是,这个状态存储没有在滚动窗口中保存,也就是说,如果我首先在windows[0,999]中存储一些东西,然后从windows[1000,1999]访问这个存储,那么这个存储是空的。我知道这里所述的全局状态和每个窗口状态。我要使用全局状态。我还尝试创建一个最低限度的工作示例来调查这一点:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class twStateStoreTest {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
final DataStream<Element> elements = env.fromElements(
Element.from(1, 500),
Element.from(1, 1000),
Element.from(1, 1500),
Element.from(1, 2000),
Element.from(99, 9999)
).
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
long w;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(w);
}
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
w = element.getTimestamp();
return w;
}
});
elements
.keyBy(new KeySelector<Element, Integer>() {
@Override
public Integer getKey(Element element) throws Exception {
return element.value;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
.process(new MyProcessWindowFn()).
print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
MapState<Integer, Integer> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
}
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
if (stateStore.get(key) == null) {
stateStore.put(key, 1);
}else {
int previous = stateStore.get(key);
stateStore.put(key, previous+1);
}
out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
+ " for window : " + context.window());
}
}
static class Element {
private final long timestamp;
private final int value;
public Element(long timestamp, int value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public int getValue() {
return value;
}
public static Element from(int value, long timestamp) {
return new Element(timestamp, value);
}
}
}
在这里,我试图计算process()
函数为键调用的次数。这个例子起作用,并且状态确实存储在滚动窗口中。我已经确保这个示例完全镜像了实际的processWindow函数,去掉了其他不必要的代码。
但是在实际的ProcessWindowFunction中并没有跨窗口保留状态!
是不是有什么我明显漏掉的线索?对于使用如下定义的MapState的processWindowFunction,不在EventTimeTumblingWindows之间保留状态是否还有其他原因:
private MapState<UserDefinedEnum, Boolean> activeSessionStore;
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {
private final static MapStateDescriptor<IUEventType, Boolean> desc = new MapStateDescriptor<IUEventType, Boolean>(
"store", IUEventType.class, Boolean.class);
private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
// even though I populated activeSessionStore with some values in the previous invocation of process()
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}
@Override
public void clear(Context context) throws Exception {
context.globalState().getMapState(desc).clear();
}
}
inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
process(new IUFeatureStateCombiner())
编辑:问题已解决,不应该调用clear()方法,因为这是一个全局状态。
你想做更像这样的事。请记住,这些是每个键的状态存储--每个键都有一个单独的映射--所以您在哪里做statestore.get(key)
,这实际上是没有意义的。也许您所需要的只是valueState
,如果您只需要为每个键存储一个整数。
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
private final static MapStateDescriptor mapDesc = new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class);
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
MapState<Integer, Integer> stateStore = context.globalState.getMapState(mapDesc);
...
}
}
请注意,全局状态存储从未被清除。因此,如果您有一个无界的密钥空间,您最终会遇到问题。您可以在状态描述符上配置状态TTL来处理这个问题。
我在工作中使用ProcessWindowFunction并保持StateValue。我的目标是将值保持在超过1个窗口的状态,这意味着状态不会在每个窗口的末尾被清除。我有两个问题: 我怎样才能清除状态?有没有设置触发器并用它来清除状态的选项?(当在ProcessFunction中使用状态时,我能够设置触发器以执行此清除,即使没有新事件) 有没有一种方法来构建一个单元测试来检查我的ProcessWin
我目前正在考虑将opengl状态存储为某种适当类型的全局thread_local变量。那个设计有多糟糕?有什么陷阱吗?
全局状态存储与普通状态存储有何不同? 全局状态存储是否在不同机器上运行的所有实例中都有数据副本?由于全局状态存储不使用任何更改日志主题进行恢复,因此在重新启动时它的行为在我的场景中全局存储的源主题没有键。
问题内容: 广泛的讨论问题。是否已经有任何库可以让我在Java中存储应用程序的执行状态? 例如,我有一个处理文件的应用程序,现在该应用程序可能在某个时刻被迫关闭。我想存储所有已处理文件和未处理文件的信息,以及处理正在进行的阶段正在进行的流程。 是否已经有抽象此功能的库,或者我将不得不从头开始实现它? 问题答案: 似乎您正在寻找的是可以使用Java Serialization API 执行的序列化。
我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知
我是Fink新手,希望计算流的键控总会话持续时间: 表示会话开始,而表示会话结束。预期输出应为事件到达时每个键控的总持续时间。因此,上述数据的样本输出为 在我的实现中,我使用了一个和一个全局的来跟踪 然而,在调试过程中,我无法得到我想要的。 每次调用时,globalstate都是新的对象,并且没有在上一个窗口中计算的任何数据。 因此,我想问 如何在ProcessWindowFunction中获取g