当前位置: 首页 > 知识库问答 >
问题:

ApacheFlink流媒体窗口字数

孙渝
2023-03-14

我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .window(Time.of(5, TimeUnit.SECONDS))
            .groupBy(0).sum(1)
            .flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        String word = value.f0;
        Integer delta_count = value.f1;
        Integer count = cumulateCounts.get(word);
        if (count == null)
            count = 0;
        count = count + delta_count;
        cumulateCounts.put(word, count);
        System.out.println("(" + word + "," + count.toString() + ")");
    }
});

共有1个答案

墨承泽
2023-03-14

您应该首先分组,并在键控数据流上应用窗口(您的代码适用于Flink 0.9.1,但Flink 0.10.0中的新API对此要求严格):

final DataStream<Tuple2<String, Integer>> counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
        .flatten();

如果在未设置关键帧的数据流上应用窗口,则在一台计算机上只有一个单线程窗口操作符(即,没有并行性)在整个数据流上构建窗口(在Flink 0.9.1中,此全局窗口可以通过groupBy())拆分为子窗口,但在Flink 0.10.0中,这将不再有效)。要计算字数,您需要为每个不同的键值构建一个窗口,即,首先为每个键值获取一个子流(通过groupBy()),并在每个子流上应用一个窗口操作符(因此,您可以为每个子流拥有一个自己的窗口操作符实例,允许并行执行)。

对于全局(累积)计数,您可以简单地应用group by(). sum()构造。首先,流被分成子流(每个键值一个)。其次,计算整个流的总和。因为流没有窗口化,所以计算(累积)中的总和并为每个传入的元组更新(更详细地说,总和的初始结果值为零,并且每个元组的结果更新为结果=tuple.value )。每次调用求和后,都会发出新的当前结果。

在您的代码中,您不应使用特殊的接收器函数,而应按如下操作:

counts.groupBy(0).sum(1).print();

 类似资料:
  • 流处理中的5秒翻滚窗口与微批处理时的5秒微批处理有何不同?两者都有一个5秒的非重叠窗口,在此期间它们处理记录并继续前进。 我知道流处理中有时间的概念:事件、摄取和处理时间。我们是否可以推断,使用微拍摄的流处理只不过是使用具有摄取时间或处理时间的翻滚窗口的流处理?

  • 首先,我是流处理框架的新手。我想对其中一些进行基准测试,所以我从Flink开始。 对于我的用例,我需要将窗口t中的事件与窗口t-1中的事件进行比较,两者的大小都是15分钟,然后进行一些聚合。 以下是我的用例的简化版本: 我们将分析的事件视为形式的元组。在窗口1中,我们有:(A,1),(B,2),(C,3),在窗口2中,我们有:(D,6)和(B,7)。然后,我需要将当前窗口中的事件与前一个窗口中的事

  • 我想在Apache Flink中做流媒体工作来做Kafka- 这应该是流式处理。

  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • Streaming API用于通过令牌读取JSON令牌。 它将JSON内容读写为离散事件。 JsonReader和JsonWriter将数据读/写为令牌,称为JsonToken 。 它是处理JSON的三种方法中最强大的方法。 它具有最低的开销,并且在读/写操作中非常快。 它类似于XML的Stax解析器。 在本章中,我们将展示使用GSON流API来读取JSON数据。 Streaming API与to

  • 在我正在开发的Wordpress主题中,我有一个TinyMCEPopup来向编辑器添加短代码,一些短代码需要图像。我是否可以添加一个“添加媒体”按钮,打开Wordpress媒体上传器,允许用户选择或上传图像,即使我在TinyMCEPopup中?