当前位置: 首页 > 知识库问答 >
问题:

使用 Apache Flink 对 DataStream 进行排序

姚新霁
2023-03-14

我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。

    DataStream<Tuple2<String, Integer>> dataStream = env
            .socketTextStream("localhost", 9000)
            .flatMap(new Splitter())
            .keyBy(0)
            .timeWindow(Time.seconds(5))
            .apply(new MyWindowFunction())
            .sum(1)
            .filter(word -> word.f1 >= 3);

我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。

public static class MyWindowFunction implements WindowFunction<
        Tuple2<String, Integer>, // input type
        Tuple2<String, Integer>, // output type
        Tuple2<String, Integer>, // key type
        TimeWindow> {

    void apply(Tuple2<String, Integer> key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {

        String word = ((Tuple2<String, Integer>)key).f0;
        Integer count = ((Tuple2<String, Integer>)key).f1;

        .........
        out.collect(new Tuple2<>(word, count));
    }
}

共有2个答案

姚星腾
2023-03-14

只要Splitter类(应该是一个FlatMapFunction)正在发射Tuple2,. sum(1)方法将完成您需要的一切(无需使用Application()

那么 .sum(1) 将为您进行聚合。如果你需要的东西与 sum() 不同,你通常会使用 .reduce(new MyCustomReduceFunction()),因为这将是最有效和可扩展的方法,因为就不需要在内存中缓冲大量数据而言。

余弘新
2023-03-14

我正在更新这个答案,使用Flink 1.12.0。为了对中的流元素进行排序,我必须在使用< code>ReduceFunction对流进行计数后使用< code > KeyedProcessFunction 。然后,我必须将最后一个转换的并行度设置为< code>1,以便不改变我使用< code > KeyedProcessFunction 排序的元素的顺序。我使用的序列是< code>socketTextStream -

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.socketTextStream("localhost", 9000)
                .flatMap(new SplitterFlatMap())
                .keyBy(new WordKeySelector())
                .reduce(new SumReducer())
                .keyBy(new WordKeySelector())
                .process(new SortKeyedProcessFunction(3 * 1000))
                .print().setParallelism(1);
        String executionPlan = env.getExecutionPlan();
        System.out.println("ExecutionPlan ........................ ");
        System.out.println(executionPlan);
        System.out.println("........................ ");
        env.execute("Window WordCount sorted");
    }
}

我用来对流进行排序的自定义项是SortKeyedProcessFunction,它扩展了keyedProcessFunctions。我使用<code>值状态

public class SortKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Event> {
        private static final long serialVersionUID = 7289761960983988878L;
        // delay after which an alert flag is thrown
        private final long timeOut;
        // state to remember the last timer set
        private ValueState<List<Event>> listState = null;
        private ValueState<Long> lastTime = null;

        public SortKeyedProcessFunction(long timeOut) {
            this.timeOut = timeOut;
        }

        @Override
        public void open(Configuration conf) {
            // setup timer and HLL state
            ValueStateDescriptor<List<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<List<Event>>() {
                    }));
            listState = getRuntimeContext().getState(descriptor);

            ValueStateDescriptor<Long> descriptorLastTime = new ValueStateDescriptor<Long>(
                    "lastTime",
                    TypeInformation.of(new TypeHint<Long>() {
                    }));

            lastTime = getRuntimeContext().getState(descriptorLastTime);
        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context context, Collector<Event> collector) throws Exception {
            // get current time and compute timeout time
            long currentTime = context.timerService().currentProcessingTime();
            long timeoutTime = currentTime + timeOut;
            // register timer for timeout time
            context.timerService().registerProcessingTimeTimer(timeoutTime);

            List<Event> queue = listState.value();
            if (queue == null) {
                queue = new ArrayList<Event>();
            }
            Long current = lastTime.value();
            queue.add(new Event(value.f0, value.f1));
            lastTime.update(timeoutTime);
            listState.update(queue);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {
            // System.out.println("onTimer: " + timestamp);
            // check if this was the last timer we registered
            System.out.println("timestamp: " + timestamp);
            List<Event> queue = listState.value();
            Long current = lastTime.value();

            if (timestamp == current.longValue()) {
                Collections.sort(queue);

                queue.forEach( e -> {
                    out.collect(e);
                });
                queue.clear();
                listState.clear();
            }
        }
    }

class Event implements Comparable<Event> {
    String value;
    Integer qtd;
    public Event(String value, Integer qtd) {
        this.value = value;
        this.qtd = qtd;
    }
    public String getValue() { return value; }
    public Integer getQtd() { return qtd; }
    @Override
    public String toString() {
        return "Event{" +"value='" + value + '\'' +", qtd=" + qtd +'}';
    }
    @Override
    public int compareTo(@NotNull Event event) {
        return this.getValue().compareTo(event.getValue());
    }
}

因此,当我使用$ nc -lk 9000并在控制台上键入单词时,我会在输出上按顺序看到它们

...
Event{value='soccer', qtd=7}
Event{value='swim', qtd=5}

...
Event{value='basketball', qtd=9}
Event{value='soccer', qtd=8}
Event{value='swim', qtd=6}

其他UDF用于流程序的其他转换,它们在这里是为了完整性。

public class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 3121588720675797629L;
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(Tuple2.of(word, 1));
            }
        }
    }
    public class WordKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return value.f0;
        }
    }
    public class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> event1, Tuple2<String, Integer> event2) throws Exception {
            return Tuple2.of(event1.f0, event1.f1 + event2.f1);
        }
    }
 类似资料:
  • 最近我尝试使用ApacheFlink进行快速批处理。我有一个表,它有一个列:value和一个不相关的索引列 基本上我想计算每5行值的平均值和范围。然后我将根据我刚才计算的平均值计算平均值和标准偏差。所以我想最好的方法是使用窗口。 看起来是这样的 但是我不知道用。我试过,但它说没有这样的输入。我只希望它在从源代码读取时按顺序分组。但是它必须是一个时间属性,所以我不能使用索引列作为排序。 我是否必须添

  • 问题内容: 在我可以排序的使用此声明: 我无法使用 Swift 重现相同的语句。我发现的只是使用。 问题答案: 您可以使用Swift的内置排序函数,也可以使用Swift数组,因为Swift数组是桥接的,因此可以直接从swift 调用。 使用Swift的功能: 或者,使用的:

  • 问题内容: 我有可能需要按1-n键排序的大文件。其中一些键可能是数字键,有些则可能不是数字键。这是一个固定宽度的柱状文件,因此没有定界符。 有没有一种很好的方法可以用Unix排序呢?使用一个键,就像使用“ -n”一样简单。我已经阅读了手册页并简短地搜索了Google,但是没有找到一个很好的例子。我将如何实现这一目标? 注意:由于文件大小可能,我排除了Perl。这将是不得已的办法。 问题答案: 使用

  • 问题内容: 是否可以使用排序数组,然后再将另一个相关数组定位为与排序数组相同,例如: 从这一点出发,我想对数组进行排序,这样,如果“人”有一个cellNo“ x”,则在对数组进行排序后,他将具有相同的“ cellNo”“ x” 问题答案: 我会采用另一种方法: 创建一个新对象: 创建一个比较器: 打电话一对阵列

  • 问题内容: 为什么我的打印输出数组未在以下代码中排序? 问题答案: 您需要两个循环来实现Bubble Sort。 样例代码:

  • 问题内容: 我有一组三个列表项,它们希望在页面加载时从高到低自动显示。理想情况下使用jquery或javascript。 每个列表项都需要有自己的ID,因为它们每个都有各自的背景图像。数字必须是文本节点,以便用户可以编辑它们。 问题答案: 这可能是最快的方法,因为它不使用jQuery: 像下面这样调用函数: 您可以以相同的方式对其他列表进行排序,如果列表类在同一页面上还有其他元素,则应给您的ul一