当前位置: 首页 > 知识库问答 >
问题:

通过读取kafka中的详细信息创建动态闪烁窗口

广献
2023-03-14

假设Kafka消息包含flink窗口大小配置。

我想阅读Kafka的信息,并在flink中创建一个全局窗口。

问题陈述:

我们可以使用BroadcastStream处理上述情况吗?

是否有其他支持上述情况的方法

共有1个答案

明安阳
2023-03-14

Flink的窗口API不支持动态更改窗口大小。

您需要做的是使用进程函数实现自己的窗口。在这种情况下是KeyedBroadcastProcessFunction,其中广播窗口配置。

您可以检查Flink训练以获取如何使用KeyedProcessFunction实现时间窗口的示例(复制如下):

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
    // Keyed, managed state, with an entry for each window.
    // There is a separate MapState object for each sensor.
    private MapState<Long, Integer> countInWindow;

    boolean eventTimeProcessing;
    int durationMsec;

    /**
     * Create the KeyedProcessFunction.
     * @param eventTime whether or not to use event time processing
     * @param durationMsec window length
     */
    public PseudoWindow(boolean eventTime, int durationMsec) {
        this.eventTimeProcessing = eventTime;
        this.durationMsec = durationMsec;
    }

    @Override
    public void open(Configuration config) {
        MapStateDescriptor<Long, Integer> countDesc =
                new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
        countInWindow = getRuntimeContext().getMapState(countDesc);
    }

    @Override
    public void processElement(
            KeyedDataPoint<Double> dataPoint,
            Context ctx,
            Collector<KeyedDataPoint<Integer>> out) throws Exception {

        long endOfWindow = setTimer(dataPoint, ctx.timerService());

        Integer count = countInWindow.get(endOfWindow);
        if (count == null) {
            count = 0;
        }
        count += 1;
        countInWindow.put(endOfWindow, count);
    }

    public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
        long time;

        if (eventTimeProcessing) {
            time = dataPoint.getTimeStampMs();
        } else {
            time = System.currentTimeMillis();
        }
        long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);

        if (eventTimeProcessing) {
            timerService.registerEventTimeTimer(endOfWindow);
        } else {
            timerService.registerProcessingTimeTimer(endOfWindow);
        }
        return endOfWindow;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
        // Get the timestamp for this timer and use it to look up the count for that window
        long ts = context.timestamp();
        KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
        out.collect(result);
        countInWindow.remove(timestamp);
    }
} 

 类似资料:
  • git 闪烁消息效果由特殊字符实现,该特殊字符可以在 Vim 等编辑器中输入,但同时需要终端支持。 制作方式 首先,输入 git commit 进入默认编辑器修改 commit 信息,进入 Vim 的插入模式, 输入 Ctrl + v,放手后再按 Esc 键即可得到形如 ^[ 的字符(实际上并不是); 紧接着再输入 [5m,之后再输入 commit 信息(这里假设内容为 COMMIT_MESSAG

  • 2)我研究了循环分区的重新平衡。假设我建立了一个集群,如果我的源的并行度为1,如果我进行了重新平衡,我的数据是否会在机器之间进行重排以提高性能?如果是这样,是否有一个特定的端口将数据传送到集群中的其他节点? 3)状态维护有什么限制吗?我计划维护一些用户id相关的数据,这些数据可能会变得很大。我读到flink使用rocks db来维护状态。只是想检查一下是否有限制可以维护多少数据? 4)同样,如果数

  • 作业并行性(4,8,16):[自动生成源]-->[Map1]-->[滚动窗口(10s)]-->[Map2]-->[接收器] Flink窗口性能eps 4p、8p、16p 作业以上的性能最高达到了每秒50k+-左右,不管我如何将集群缩放成4-16的并行度。 闪烁性能无窗口4p、8p 我已经删除了窗口的逻辑,以消除瓶颈性能的应用程序逻辑,但似乎窗口仍然导致我的整个流性能下降,即使该窗口只是一个通过函数

  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1

  • 我有一个流系统,在这里我可以获得点击流数据。 数据格式: 我怎样才能做到这一点呢?基本上,我必须维护窗口中所有事件的状态,然后,一旦我获得事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有事件的状态。我也有一些自定义的Reduce操作。 在:我将2个事件数据加入到列表中。