当前位置: 首页 > 知识库问答 >
问题:

具有evictor的Flink连接流的不同输出

屠晟睿
2023-03-14

我试图通过在Local上连接两个数据流来运行Flink上的基本连接。源流的数据类型是相同的(Tuple4(String,String,Long,Long))。在多次运行下面提到的函数后,我随机收到了两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到了相同的调试日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。

//Basic Function
    public void runBasicJoin() throws Exception {

        TumblingEventTimeWindows tsAssigner;
        //tried with getExecutionEnvironment as well
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        env.setMaxParallelism(1);
        //class declared below
        CollectTuple2Sink.VALUES.clear();

        Tuple4<String, String, Long, Long> input1 =
                new Tuple4<String, String, Long, Long>("key1", "val1", 1L, t(1));
        Tuple4<String, String, Long, Long> input2 =
                new Tuple4<String, String, Long, Long>("key1", "val2", 12L, t(2));
        Tuple4<String, String, Long, Long> input3 =
                new Tuple4<String, String, Long, Long>("key1", "val3", 3L, t(3));
        Tuple4<String, String, Long, Long> input4 =
                new Tuple4<String, String, Long, Long>("key2", "val4", 18L, t(4));
        Tuple4<String, String, Long, Long> input5 =
                new Tuple4<String, String, Long, Long>("key1", "val5", 11L, t(6));
        Tuple4<String, String, Long, Long> input6 =
                new Tuple4<String, String, Long, Long>("key1", "val6", -121L, t(7));
        Tuple4<String, String, Long, Long> input7 =
                new Tuple4<String, String, Long, Long>("key2", "val7", -111L, t(8));
        Tuple4<String, String, Long, Long> input8 =
                new Tuple4<String, String, Long, Long>("key2", "val8", 111L, t(9));

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream1 = env.addSource(new Tuple4Soruce(
                t(0), input1, input2, input3, input4,t(5),
                input5, input6, input7, input8,t(10)
        ));

        dataStream1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream2 = env.addSource(new Tuple4Soruce(
                t(0), input1, input3,input3,input4,input4,input4,t(5),
                 input5,input6, t(10),t(11)
        ));

        dataStream2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        tsAssigner = TumblingEventTimeWindows.of(Time.minutes(5));

        dataStream1.join(dataStream2)
                .where(new Tuple4KeySelector())
                .equalTo(new Tuple4KeySelector())
                .window(tsAssigner)
                .trigger(EventTimeTrigger.create())
                .evictor(CountEvictor.of(2))
                .apply(new Tuple4JoinFunction())
                .addSink(new CollectTuple2Sink());
        env.execute();
        System.out.println(CollectTuple2Sink.VALUES);

    }

    private static class CollectTuple2Sink
            implements SinkFunction<Tuple2<String, Long>> {

        public static final List<Tuple2<String, Long>> VALUES = new ArrayList<>();

        @Override
        public synchronized void invoke(Tuple2<String, Long> value)
                throws Exception {
            VALUES.add(value);
        }

    }
//join function ---> Takes the 2nd and 4th field of a tuple and convert tuple4 to tuple2
    private static class Tuple4JoinFunction implements JoinFunction<Tuple4<String, String, Long, Long>, Tuple4<String, String, Long, Long>, Tuple2<String, Long>> {
        @Override
        public Tuple2<String, Long> join(Tuple4<String, String, Long, Long> tuple4, Tuple4<String, String, Long, Long> tuple42) throws Exception {
            return new Tuple2<>(tuple4.f1, tuple4.f3);
        }
    }
//key selector --> select the 2nd value of tuple 4
    private static class Tuple4KeySelector implements KeySelector<Tuple4<String, String, Long, Long>, String> {
        @Override
        public String getKey(Tuple4<String, String, Long, Long> tuple4) throws Exception {
            return tuple4.f1;
        }
    }

//source function --> generates a sequence input for tuple4
    private static class Tuple4Soruce
            implements SourceFunction, ResultTypeQueryable<Tuple4<String, String, Long, Long>> {
        private volatile boolean running = true;
        private Object[] testStream;
        private TypeInformation<Tuple4<String, String, Long, Long>> typeInformation =
                TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>() {
                });


        Tuple4Soruce(Object... eventsOrWatermarks) {
            this.testStream = eventsOrWatermarks;
        }

        @Override
        public void run(SourceContext ctx) throws Exception {
            for (int i = 0; (i < testStream.length) && running; i++) {
                if (testStream[i] instanceof Tuple4) {
                    Tuple4<String, String, Long, Long> tuple =
                            (Tuple4<String, String, Long, Long>) testStream[i];
                    ctx.collectWithTimestamp(tuple, tuple.f3);
                } else if (testStream[i] instanceof Long) {
                    Long ts = (Long) testStream[i];
                    ctx.emitWatermark(new Watermark(ts));
                } else {
                    throw new RuntimeException(testStream[i].toString());
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }

        @Override
        public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
            return typeInformation;
        }

    }
//util function to generate time
    public long t(int n) {
        return new DateTime(2018, 1, 1, 0, 0).plusMinutes(n).getMillis();
    }
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1914335182] with leader session id 2a8bf59e-01fa-4e67-892c-83b10dd65be1.
01/09/2020 00:50:16 Job execution switched to status RUNNING.
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:50:16 Job execution switched to status FINISHED.
[(val1,1514745060000), (val5,1514745360000), (val6,1514745420000)]
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1448653751] with leader session id 291df2cb-96fd-4e3c-b46c-911d2ca11905.
01/09/2020 00:49:42 Job execution switched to status RUNNING.
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:49:42 Job execution switched to status FINISHED.
[(val1,1514745060000), (val3,1514745180000), (val4,1514745240000), (val5,1514745360000), (val6,1514745420000)]

源函数和其他定义都来自本教程。还从Flink官方文件中探索了在有和没有驱逐者的情况下运行基本工作的多种方式。在没有evictor的情况下测试了多个东西,所有运行的输出都是预期的。一旦驱逐者出现在画面中,事情就开始变得不确定了。

Flink版本1.4.2

共有1个答案

慕容文昌
2023-03-14

您还没有共享所有代码,但根据我所看到的,我对正在发生的事情的猜测是,结果取决于摄入顺序--例如,基于计数的窗口就是这种情况--在这种情况下,您不能期望确定性的结果。

窗口连接从两个输入流中读取,虽然事件将在每个流中按顺序处理,但两个流将以不确定和不可控的方式相互竞争。当且仅当窗口的触发和处理完全基于事件时间时,结果才是确定的。如果涉及计数或处理时间,那么您不能期望产生确定性的结果。

 类似资料:
  • 我们有一个Flink任务,它将两个流连接起来,两个流都使用来自Kafka的事件。下面是示例代码 但是,我们没有看到任何连接输出。我们检查了每个流是否连续发射带有时间戳和适当水印的元素。有人知道可能的原因吗?

  • 问题内容: 我下面有以下代码示例。你可以在其中输入的命令,即回显结果。但是,先读后。其他输出流不起作用? 为什么会这样或我做错了什么?我的最终目标是创建一个线程计划任务,该任务定期执行对/ bash的命令,因此必须一前一后工作,而不能停止工作。我也一直在经历错误的任何想法? 谢谢。 问题答案: 首先,我建议更换生产线 与线 ProcessBuilder是Java 5中的新增功能,它使运行外部进程更

  • 根据ApacheFlink文档,它有预定义的数据源。它还提到了其他一些连接器,例如RabitMq连接器等。我想知道是否有类似的项目可以使用Webhook作为数据源。

  • 我正在开发一个需要ML模型集成的Android应用程序。为此,我使用TensorFlow lite进行部署。我使用基于自定义模型的暹罗网络进行输出,输出形状为[1 128]。当我在Google Colab上推断python中的tf lite模型时,输出[1 128]数字与我在Android设备上产生的数字不同。输入图像在两种推断以及输入和输出形状上都是相同的,但我在Android手机和Python

  • 我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f

  • 问题内容: 我正在尝试创建一种控制台/终端,允许用户输入一个字符串,然后将其编入进程并打印出结果。就像普通的控制台一样。但是我在管理输入/输出流时遇到了麻烦。我已经研究了这个线程,但是可悲的是,该解决方案不适用于我的问题。 与标准命令(例如“ ipconfig”和“ cmd.exe”)一起,如果脚本要求输入,我还需要能够运行脚本并使用相同的输入流传递一些参数。 例如,在运行脚本“ python p