假设Kafka消息包含flink窗口大小配置。
我想阅读Kafka的信息,并在flink中创建一个全局窗口。
问题陈述:
我们可以使用BroadcastStream处理上述情况吗?
或
是否有其他支持上述情况的方法?
Flink的窗口API不支持动态更改窗口大小。
您需要做的是使用进程函数实现自己的窗口。在这种情况下是KeyedBroadcastProcessFunction,其中广播窗口配置。
您可以检查Flink训练以获取如何使用KeyedProcessFunction实现时间窗口的示例(复制如下):
public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
// Keyed, managed state, with an entry for each window.
// There is a separate MapState object for each sensor.
private MapState<Long, Integer> countInWindow;
boolean eventTimeProcessing;
int durationMsec;
/**
* Create the KeyedProcessFunction.
* @param eventTime whether or not to use event time processing
* @param durationMsec window length
*/
public PseudoWindow(boolean eventTime, int durationMsec) {
this.eventTimeProcessing = eventTime;
this.durationMsec = durationMsec;
}
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, Integer> countDesc =
new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
countInWindow = getRuntimeContext().getMapState(countDesc);
}
@Override
public void processElement(
KeyedDataPoint<Double> dataPoint,
Context ctx,
Collector<KeyedDataPoint<Integer>> out) throws Exception {
long endOfWindow = setTimer(dataPoint, ctx.timerService());
Integer count = countInWindow.get(endOfWindow);
if (count == null) {
count = 0;
}
count += 1;
countInWindow.put(endOfWindow, count);
}
public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
long time;
if (eventTimeProcessing) {
time = dataPoint.getTimeStampMs();
} else {
time = System.currentTimeMillis();
}
long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);
if (eventTimeProcessing) {
timerService.registerEventTimeTimer(endOfWindow);
} else {
timerService.registerProcessingTimeTimer(endOfWindow);
}
return endOfWindow;
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
// Get the timestamp for this timer and use it to look up the count for that window
long ts = context.timestamp();
KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
out.collect(result);
countInWindow.remove(timestamp);
}
}
git 闪烁消息效果由特殊字符实现,该特殊字符可以在 Vim 等编辑器中输入,但同时需要终端支持。 制作方式 首先,输入 git commit 进入默认编辑器修改 commit 信息,进入 Vim 的插入模式, 输入 Ctrl + v,放手后再按 Esc 键即可得到形如 ^[ 的字符(实际上并不是); 紧接着再输入 [5m,之后再输入 commit 信息(这里假设内容为 COMMIT_MESSAG
2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数
作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数
有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====
我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1
我有一个流系统,在这里我可以获得点击流数据。 数据格式: 我怎样才能做到这一点呢?基本上,我必须维护窗口中所有事件的状态,然后,一旦我获得事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有事件的状态。我也有一些自定义的Reduce操作。 在:我将2个事件数据加入到列表中。