当前位置: 首页 > 知识库问答 >
问题:

与flink并行性相关的对象实例&应用方法

彭炳
2023-03-14

>

  • 先让我问一下我的问题,然后你能澄清一下我对应用方法的设想吗?

    问题:如果我的应用程序在每一分钟的间隔内创建1.500.000(大约)条记录,并且flink job使用15++不同的运算符从kafka consumer读取这些记录,那么这个逻辑可能会产生延迟、背压等?(可以假设并行度为16)

    public class Sample{
      //op1 = 
         kafkaSource
                    .keyBy(something)
                    .timeWindow(Time.minutes(1))
                    .apply(new ApplySomething())
                    .name("Name")
                              .addSink(kafkaSink);
      //op2 = 
        kafkaSource
                    .keyBy(something2)
                    .timeWindow(Time.seconds(1)) // let's assume that this one second
                    .apply(new ApplySomething2())
                    .name("Name")
                              .addSink(kafkaSink);
     // ...
    
      //op16 = 
        kafkaSource
                    .keyBy(something16)
                    .timeWindow(Time.minutes(1)) 
                    .apply(new ApplySomething16())
                    .name("Name")
                              .addSink(kafkaSink);
    
    }
    // ..
    public class ApplySomething ... {
      private AnyObject object;
      private int threshold = 30, 40, 100 ...;
    
          @Override
        public void open(Configuration parameters) throws Exception{
            object = new AnyObject();
        }
    
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
            int counter = 0;
            for (Record each : input){
              counter += each.getValue();
              if (counter > threshold){
                out.collec(each.getResult());
                return;
              }
            }
        }
    }
    
    • 如果是,是否应该使用带有状态的flatMap(rocksDB)而不是TimeWindow?
    • 我的预测是“是”。让我解释一下为什么我会这样想:
      • 如果并行度为16,则将有16个不同的indivudualapplySomething1()、ApplySomething2()...applySomething16()实例,并且每个applySomething..()类将有16个AnyObject()实例。
      • 应用程序工作时,如果keyby(something)分区数大于16(假设我的应用程序每天有1.00.000个不同的something),则一些applysomething..()实例将处理不同的键,因此一个apply()应该在处理之前等待其他实例循环。那么这会造成延迟吗?
  • 共有1个答案

    周和志
    2023-03-14

    闪烁的时间窗口与历元对齐(例如,如果你有一堆小时窗口,它们都会在小时触发)。因此,如果你真的打算在你的工作中有一堆不同的窗口,你应该配置它们有不同的偏移量,这样它们就不会同时被触发。那样做会分散负荷。看起来像这样

    .window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15))
    

    (或视情况使用TumblingEventTimeWindows)。这将创建一分钟长的窗口,每分钟后15秒触发。

    只要用例允许,就应该使用增量聚合(通过reduceaggrege),而不是使用WindowFunction(或ProcessWindowFunction),后者必须收集分配给列表中每个窗口的所有事件,然后将它们作为一种小型批处理进行处理。

    键控时间窗口将保持其状态在RocksDB中,假设您已经将RocksDB配置为您的状态后端。您不需要切换到使用richflatmap来访问rocksdb。(此外,由于flatMap不能使用计时器,我认为您最终会使用process函数。)

    当window操作符的任何并行实例忙于执行它的窗口函数(applysomethings)时,您认为该任务不会执行任何其他操作是正确的--因此它将(除非非常快地完成)产生暂时的背压。您将希望根据需要增加并行度,以便作业能够满足您对吞吐量和延迟的要求。

     类似资料:
    • JavaScript 在Object对象上面,提供了很多相关方法,处理面向对象编程的相关操作。本章介绍这些方法。 Object.getPrototypeOf() Object.getPrototypeOf方法返回参数对象的原型。这是获取原型对象的标准方法。 var F = function () {}; var f = new F(); Object.getPrototypeOf(f) === F

    • 为了操作一个对象,我们需要先获取这个对象的实例,而这肯定会涉及调用对象的构造方法。有关如何在扩展中调用PHP的函数与对象的方法这里不展开描述了。 首先我们先了解下一个object在PHP内核中到底是如何实现的。 typedef struct _zend_object_value { zend_object_handle handle; zend_object_handlers *h

    • 本文向大家介绍pandas的相关系数与协方差实例,包括了pandas的相关系数与协方差实例的使用技巧和注意事项,需要的朋友参考一下 1、输出百分比变化以及前后指定的行数 2、计算DataFrame列与列的相关系数和协方差 3、计算DataFrame与列或者Series的相关系数 注意:在使用DataFrame或Series在计算相关系数或者协方差的时候,都会计算索引重叠的、非NA的、按照索引对齐原

    • SQLAlchemy 1.4 / 2.0 Tutorial 此页是 SQLAlchemy 1.4/2.0教程 . 上一页: 使用ORM进行数据操作 |下一步: |next| 使用相关对象 在本节中,我们将介绍一个更重要的ORM概念,即ORM如何与引用其他对象的映射类交互。在本节中 声明映射类 ,映射的类示例使用了一个名为 relationship() . 这个构造定义了两个不同的映射类之间的链接,

    • 本文向大家介绍Java实现储存对象并按对象某属性排序的几种方法示例,包括了Java实现储存对象并按对象某属性排序的几种方法示例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Java实现储存对象并按对象某属性排序的几种方法。分享给大家供大家参考,具体如下: 在编程的时候,经常会出现对某一种类的对象们按照某属性进行自定义的排序,比如:学生对象按照age大小排序。 有一种方法就是把age单独提

    • 通常与 BOM 相关的常用对象有: Location 当前页面地址相关信息,如当前页面地址 Navigator 当前浏览器相关信息,如浏览器版本 Screen 包含屏幕相关信息,如屏幕的长宽 History 浏览器的历史相关信息,如返回上一页 其他的还有一些辅助方法,如 alert、confirm。 这些内容都可以通过 window 对象进行访问。 这些对象在访问时开头是小写的! 1. Locat