当前位置: 首页 > 知识库问答 >
问题:

如何对无水印闪烁的联合数据流进行排序

徐文斌
2023-03-14

flink流有多个数据流,然后我将这些数据流与org合并。阿帕奇。Flink。流式处理。api。数据流。DataStream#联合方法。然后,我得到了问题,数据流是无序的,我不能设置窗口来排序数据流中的数据。

在Apache Flink中对流的联合进行排序以识别用户会话

我得到了答案,但是com。利亚姆。学Flink。实例协会UnionStreamDemo。从未调用SortFunction#onTimer。

环境信息:flink版本1.7.0

一般来说,我希望对没有水印的联合数据流进行排序。

共有1个答案

相高谊
2023-03-14

您需要水印,以便排序函数知道何时可以安全地发出已排序的元素。没有水印,您会从流B中获得一条日期早于流A的前N条记录的记录,对吗?

但添加水印很容易,特别是如果您知道任何一个流的“事件时间”都在严格增加。下面是我写的一些代码,它将DavidAnderson在回答上面提到的另一个SO问题时发布的内容扩展到了其他SO问题,希望这能帮助您开始。

--肯

package com.scaleunlimited.flinksnippets;

import java.util.PriorityQueue;
import java.util.Random;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;
import org.junit.Test;

public class MergeAndSortStreamsTest {

    @Test
    public void testMergeAndSort() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Event> streamA = env.addSource(new EventSource("A"))
                .assignTimestampsAndWatermarks(new EventTSWAssigner());
        DataStream<Event> streamB = env.addSource(new EventSource("B"))
                .assignTimestampsAndWatermarks(new EventTSWAssigner());

        streamA.union(streamB)
        .keyBy(r -> r.getKey())
        .process(new SortByTimestampFunction())
        .print();

        env.execute();
    }

    private static class Event implements Comparable<Event> {
        private String _label;
        private long _timestamp;

        public Event(String label, long timestamp) {
            _label = label;
            _timestamp = timestamp;
        }

        public String getLabel() {
            return _label;
        }

        public void setLabel(String label) {
            _label = label;
        }

        public String getKey() {
            return "1";
        }

        public long getTimestamp() {
            return _timestamp;
        }

        public void setTimestamp(long timestamp) {
            _timestamp = timestamp;
        }

        @Override
        public String toString() {
            return String.format("%s @ %d", _label, _timestamp);
        }

        @Override
        public int compareTo(Event o) {
            return Long.compare(_timestamp, o._timestamp);
        }
    }

    @SuppressWarnings("serial")
    private static class EventTSWAssigner extends AscendingTimestampExtractor<Event> {

        @Override
        public long extractAscendingTimestamp(Event element) {
            return element.getTimestamp();
        }
    }

    @SuppressWarnings("serial")
    private static class SortByTimestampFunction extends KeyedProcessFunction<String, Event, Event> {
        private ValueState<PriorityQueue<Event>> queueState = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
                    }));
            queueState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
            TimerService timerService = context.timerService();

            long currentWatermark = timerService.currentWatermark();
            System.out.format("processElement called with watermark %d\n", currentWatermark);
            if (context.timestamp() > currentWatermark) {
                PriorityQueue<Event> queue = queueState.value();
                if (queue == null) {
                    queue = new PriorityQueue<>(10);
                }

                queue.add(event);
                queueState.update(queue);
                timerService.registerEventTimeTimer(event.getTimestamp());
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
            PriorityQueue<Event> queue = queueState.value();
            long watermark = context.timerService().currentWatermark();
            System.out.format("onTimer called  with watermark %d\n", watermark);
            Event head = queue.peek();
            while (head != null && head.getTimestamp() <= watermark) {
                out.collect(head);
                queue.remove(head);
                head = queue.peek();
            }
        }
    }

    @SuppressWarnings("serial")
    private static class EventSource extends RichParallelSourceFunction<Event> {

        private String _prefix;

        private transient Random _rand;
        private transient boolean _running;
        private transient int _numEvents;

        public EventSource(String prefix) {
            _prefix = prefix;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            _rand = new Random(_prefix.hashCode() + getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void cancel() {
            _running = false;
        }

        @Override
        public void run(SourceContext<Event> context) throws Exception {
            _running = true;
            _numEvents = 0;
            long timestamp = System.currentTimeMillis() + _rand.nextInt(10);

            while (_running && (_numEvents < 100)) {
                long deltaTime = timestamp - System.currentTimeMillis();
                if (deltaTime > 0) {
                    Thread.sleep(deltaTime);
                }

                context.collect(new Event(_prefix, timestamp));
                _numEvents++;

                // Generate a timestamp every 5...15 ms, average is 10.
                timestamp += (5 + _rand.nextInt(10));
            }
        }

    }
}
 类似资料:
  • 我需要从表中选择所有的vip并按兰德排序,然后添加按日期排序的其他数据。在第一个子查询中,一切正常,但在第二个子查询中,spa_date DESC不起作用。我知道UNION子查询中的ORDER BY子句会被忽略而没有限制(但ORDER BY rand()起作用),但我需要所有查询(1+2)中的限制,而不是子查询中的限制 问题: 我需要选择spa_vip=1的所有spa_id并按RAND()排序,然

  • 我正在使用flink 1.5.2解决CEP问题。 我的数据来自一个列表,当系统运行时,其他一些进程将向该列表添加新的事件对象。它不是套接字或网络消息。我一直在阅读官方网站的示例。以下是我想我应该做的步骤。 使用env创建数据流。fromCollection(列表) 定义图案图案 使用CEP获取PatternStream。模式(数据流,模式) 使用pattern\u流。选择(…实现选择接口…)以数据

  • 我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW

  • 我需要根据一个键连接两个事件源。事件之间的间隔最长可达1年(即具有id1的event1可能在今天到达,而来自第二个事件源的具有id1的相应event2可能在一年后到达)。假设我只想输出连接的事件输出。 我正在探索在RocksDB后端使用Flink的选项(我遇到了表API,它们似乎适合我的用例)。我找不到做这种长窗口连接的引用体系结构。我希望系统一天能处理大约2亿个事件。 关于处理这种长窗口连接的任

  • 我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。

  • 我想运行流作业。 当我尝试使用和Flink Web界面在本地运行该作业时,没有问题。 但是,我当前正在尝试使用Flink on YARN(部署在Google Dataproc上)运行我的作业,并且当我尝试取消它时,取消状态将永远持续,并且TaskManager中仍有一个插槽被占用。 这是我得到的日志: