当前位置: 首页 > 知识库问答 >
问题:

如何利用Java在Apache Flink中对数据流执行平均操作

邵弘伟
2023-03-14

我正在尝试计算Flink中输入数据流(无窗口)的平均值

输入数据流来自套接字连接,形式为“键值”,如“x 5”

public class AvgViews {

DataStream<Tuple2<String, Double>> AvgViewStream = dataStream
                .map(new AvgViews.RowSplitter())
                .keyBy(0)
                //.??? 



    public static class RowSplitter implements
            MapFunction<String, Tuple3<String, Double, Integer>> {

        public Tuple3<String, Double, Integer> map(String row)
                throws Exception {
            String[] fields = row.split(" ");
            if (fields.length == 2) {
                return new Tuple3<String, Double, Integer>(
                        fields[0],
                        Double.parseDouble(fields[1]),
                        1);
            }
            return null;
        }
    }
}

共有1个答案

隗锐进
2023-03-14

您可以使用RichMap(或RichFlatMap)将Tuple2保持在键控状态。您需要将每个传入记录添加到状态中,并将平均值作为输出。

文档中的CountWindowAverage示例做了类似的事情,不过稍微复杂一些。

 类似资料:
  • 我有一个这样的数据集 我想选择第3列和第4列作为我的键和值,我如何在Apache Flink中执行平均操作。 我最多能做到“按键分组”。但是我无法对每个键的值执行平均运算。

  • 第1流: 流-2: 现在,我想对这两个流执行JOIN操作,并希望仅检索流-1中不存在于流-2中的行。我的输入流数据是AVRO格式 预期产出: 那么,我应该执行哪个连接操作,以及如何实现预期的输出呢?谁能帮我实现这个目标

  • 是否可以对流进行求和、平均并将其转换为新对象。我有个目标 现在我想得到这个对象列表的平均值和总和(代码总和价格和代码平均价格) 然后我想创建一个新对象(页脚 这就是我现在所拥有的,它可以工作,但是我要通过两次流。我想要一个方法,我可以通过一次流来做到这一点。 有没有更好的方法做到这一点而不必重复这一点。谢谢

  • 我想采取以下方法: 并使用Streams api更新它。这是我到目前为止得到的: 有没有办法在不流式传输两次(第二次获得计数)的情况下做到这一点?

  • 问题内容: 在使用流的Java 8中,当我一个接一个地链接方法时,将以流水线方式执行操作。 例: 输出:- 但是当我在javascript中尝试类似的时候,结果却不同。正如在javascript中一样,第一个操作完成,然后执行第二个操作。例如:- 输出:- JavaScript中有什么方法可以使其以管道方式执行操作,并且可以像Java程序一样获得输出? 这个问题仅询问如何像JavaScript那样

  • 我想在Apache Flink中制作一个流数据的时间窗口。我的数据看起来有点像这样: 但显然,Flink并不是将我的数据作为列表来阅读。它将其作为字符串读取,因此,我得到以下异常: 如何对字符串数据执行时间窗口,或者如何将此数据转换为元组?