当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-连接流顺序和背压

寿和通
2023-03-14

下面的相同代码显示了两个源函数-一个产生0-20的偶数,另一个产生1-20的奇数,连接在一起以输出所有两个流的并集并将它们打印出来。

示例代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 20; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
                Thread.sleep(1000);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

输出

1578465207355:SourceB: 1
1578465207379:SourceA: 0
1578465207437:OperatorA: 0
1578465208360:SourceB: 3
1578465208380:SourceA: 2
1578465209364:SourceB: 5
1578465209383:SourceA: 4
1578465210366:SourceB: 7
1578465210386:SourceA: 6
1578465211369:SourceB: 9
1578465211390:SourceA: 8
1578465212370:SourceB: 11
1578465212394:SourceA: 10
1578465212440:Sink: 0
1578465212441:OperatorB: 1
1578465213375:SourceB: 13
1578465213399:SourceA: 12
1578465214379:SourceB: 15
1578465214401:SourceA: 14
1578465215383:SourceB: 17
1578465215406:SourceA: 16
1578465216388:SourceB: 19
1578465216409:SourceA: 18
1578465217441:Sink: 1
1578465217441:OperatorB: 3
1578465222446:Sink: 3
1578465222446:OperatorB: 5
1578465227448:Sink: 5
1578465227449:OperatorB: 7
1578465232452:Sink: 7
1578465232453:OperatorB: 9
1578465237453:Sink: 9
1578465237453:OperatorB: 11
1578465242456:Sink: 11
1578465242456:OperatorA: 2
1578465247462:Sink: 2
1578465247462:OperatorA: 4
1578465252467:Sink: 4
1578465252467:OperatorA: 6

Q1.

Flink应该将连接流中最先到达的项目发送到协处理函数。然而,我们在这里看到的是,数字“2”是以源函数的方式在数字“11”之前生成的,但数字“11”是在“2”之前发送给协处理函数的。为什么会这样?

第二季度。

连接流中无背压发生。源函数一直运行到结束,即使它们仍在由操作符处理(在上面的代码中由Thread.sleep模拟)。是否有任何方法可以实现连接流的背压?

代码编辑V2

        Configuration config = new Configuration();
    config.setInteger("taskmanager.network.numberOfBuffers", 4);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config);

    env.setParallelism(1);

    env 
    .addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=0; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceA: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })
    .connect(env.addSource(new SourceFunction<Integer>() {
        private static final long serialVersionUID = 1L;
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            for(int i=1; i < 50000; i=i+2) {
                System.out.println(System.currentTimeMillis()+":SourceB: " + i);
                ctx.collect(i);
            }               
        }

        @Override
        public void cancel() {}

    })).process(new CoProcessFunction<Integer, Integer, Integer>(){

        private static final long serialVersionUID = 1L;

        @Override
        public void processElement1(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorA: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);
        }

        @Override
        public void processElement2(Integer arg0, CoProcessFunction<Integer, Integer, Integer>.Context arg1, Collector<Integer> arg2) throws Exception {
            System.out.println(System.currentTimeMillis()+":OperatorB: "+arg0);
            Thread.sleep(5000);
            arg2.collect(arg0);

        }

    })      
    .addSink(new SinkFunction<Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public void invoke(Integer value) throws Exception {
            System.out.println(System.currentTimeMillis()+":Sink: " + value);
        }

    });

    env.execute();

输出

1578605461497:SourceB: 7279
1578605461497:SourceB: 7281
1578605466406:Sink: 1
1578605466406:OperatorB: 3 <---- only odd numbers (input B) in the output
1578605471411:Sink: 3
1578605471411:OperatorB: 5
1578605476414:Sink: 5
1578605476415:OperatorB: 7
1578605481415:Sink: 7
1578605481415:OperatorB: 9
1578605486417:Sink: 9
1578605486417:OperatorB: 11
1578605491422:Sink: 11
1578605491422:OperatorB: 13
1578605496427:Sink: 13
1578605496427:OperatorB: 15
1578605501432:Sink: 15
1578605501432:OperatorB: 17
1578605506434:Sink: 17
1578605506434:OperatorB: 19
1578605511435:Sink: 19
1578605511435:OperatorB: 21
1578605516435:Sink: 21
1578605516436:OperatorB: 23
1578605521436:Sink: 23
1578605521436:OperatorB: 25
1578605526440:Sink: 25
1578605526440:OperatorB: 27
1578605531443:Sink: 27
1578605531443:OperatorB: 29
1578605536447:Sink: 29
1578605536447:OperatorB: 31
1578605541452:Sink: 31
1578605541452:OperatorB: 33
1578605546457:Sink: 33
1578605546457:OperatorB: 35
1578605551457:Sink: 35
1578605551457:OperatorB: 37
1578605556460:Sink: 37
1578605556460:OperatorB: 39
1578605561518:Sink: 39
1578605561519:OperatorB: 41
1578605566536:Sink: 41
1578605566536:OperatorB: 43
1578605571547:Sink: 43
1578605571547:OperatorB: 45
1578605576554:Sink: 45
1578605576554:OperatorB: 47
1578605581561:Sink: 47
1578605581562:OperatorB: 49
1578605586568:Sink: 49
1578605586568:OperatorB: 51
1578605591576:Sink: 51
1578605591576:OperatorB: 53
1578605596580:Sink: 53
1578605596580:OperatorB: 55
1578605601586:Sink: 55
1578605601587:OperatorB: 57
1578605606592:Sink: 57
1578605606592:OperatorB: 59
1578605611596:Sink: 59
1578605611596:OperatorB: 61
1578605616602:Sink: 61
1578605616602:OperatorB: 63
1578605621606:Sink: 63
1578605621606:OperatorB: 65
1578605626608:Sink: 65
1578605626608:OperatorB: 67
1578605631613:Sink: 67
1578605631613:OperatorB: 69
1578605636618:Sink: 69
1578605636618:OperatorB: 71

共有1个答案

柯波娃
2023-03-14

第一季度

必须了解,有序性保证仅适用于通道。这种自由允许具有两个输入的操作员主动选择要使用的输入。设想一个哈希连接,即第一个完全使用一方来构建哈希表,然后流式处理第二方来探测该表。

特别是对您来说,这意味着您在两个连接的通道之间没有任何有序性保证,因为它们在逻辑和物理上仍然是分离的。

您是否有任何用例需要两个输入之间的顺序?

第二季度。

您无法观察背压,因为您的数据太少。在任何网络通道上,发送方和接收方都有缓冲区。所以,直到你将两者都饱和,你将不会看到任何背压被施加。

编辑:关于您的第一条评论

Q1 CoGroup处理器将在尽力而为的基础上在输入之间交替,以避免输入饥饿。但是,当其中一个输入空闲时,它将只从另一个输入读取。输入再次繁忙后,可能需要一些时间(

Q2我调整了您的代码,将网络缓冲区的数量减少到10个,并从输入中删除了Hibernate,得到了以下显示背压的输出。

1578560715990:SourceA: 0
1578560715990:SourceB: 1
...
1578560716041:OperatorA: 0 <-- blocks coprocessfunction
...
1578560716280:SourceB: 29127 <-- at this point network buffers are full
1578560721030:Sink: 0 <-- slow processing in coprocess function, no more inputs are generated because of backpressure
1578560721030:OperatorB: 1
1578560726034:Sink: 1
1578560726034:OperatorA: 2 <-- clear alternation between inputs
1578560731038:Sink: 2
1578560731039:OperatorB: 3
1578560736043:Sink: 3
1578560736043:OperatorA: 4
1578560741047:Sink: 4
1578560741047:OperatorB: 5
1578560746051:Sink: 5
...
 类似资料:
  • 我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。 我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段, 创建的流如下所示。 流连接使用等于的连接执行,如下所示。 如下所述, 有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢

  • 问题内容: 如果我有这样的数据: 我如何将命令连接成这样: 我在下面使用了此查询,但命令列的顺序不依其顺序号而定: 任何意见和建议将不胜感激。^ _ ^ 问题答案: 永远不要使用。阅读为什么不在Oracle中使用WM_CONCAT函数? 请参阅本主题https://stackoverflow.com/a/28758117/3989608。 它没有记录,并且依赖的任何应用程序一旦升级到后都将无法工作

  • 问题内容: 我有一个包含5个整数ID的表,并想添加一列以接受这些ID,对其进行排序并以类似于以下内容的方式将它们连接起来。 是否有使订购更快更轻松的功能?上帝禁止我必须手动编写订单。 问题答案: 您也可以使用(在线演示)

  • 我试图从动态表和基于某些字段的流中派生新表。 有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。 书籍 ============================ BookId, Instruments, Quantity Book1, Goog,100 Book2, Vod,10 Book1, Appl,50 Book2, Goog,60 Book1, Vod,130 Book3,

  • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?