当前位置: 首页 > 知识库问答 >
问题:

Flink和Beam SDK如何处理窗口-哪个更有效?

柳志专
2023-03-14

我将Apache Beam SDK与Flink SDK进行流处理比较,以确定使用Beam作为附加框架的成本/优势。

我有一个非常简单的设置,其中数据流从Kafka源读取并由运行Flink的节点集群并行处理。

根据我对这些SDK工作原理的理解,逐窗口处理数据流的最简单方法是:

>

  • 使用Apache Beam(在Flink上运行):

    1.1.创建一个Pipeline对象。

    1.2。创建Kafka记录的PCollection。

    1.3。应用窗口功能。

    1.4。将管道转换为“按窗口设置关键帧”。

    1.5。按键分组记录(窗口)。

    1.6。对窗口记录应用所需的任何函数。

    使用FlinkSDK

    2.1。从Kafka源创建数据流。

    2.2。通过提供键函数将其转换为键控流。

    2.3。应用窗口功能。

    2.4。对窗口记录应用所需的任何函数。

    虽然Flink解决方案在编程上更加简洁,但根据我的经验,它在高数据量时效率较低。我只能想象这种开销是由键提取函数引入的,因为Beam不需要这个步骤。

    我的问题是:我是在比较同类吗?这些过程是否不等效?有什么可以解释光束方式更有效,因为它使用Flink作为跑步者(所有其他条件都相同)?

    这是使用Beam SDK的代码

        PipelineOptions options = PipelineOptionsFactory.create();
    
        //Run with Flink
        FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
        flinkPipelineOptions.setRunner(FlinkRunner.class);
        flinkPipelineOptions.setStreaming(true);
        flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime
    
        // Create the Pipeline object with the options we defined above.
        Pipeline p = Pipeline.create(flinkPipelineOptions);
    
        // Create a PCollection of Kafka records
        PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
                .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
                .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
                .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));
    
        //Apply Windowing Function    
        PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
    
        //Transform the pipeline to key by window
        PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
                windowedKafkaCollection.apply(
                        ParDo.of(
                                new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                    @ProcessElement
                                    public void processElement(ProcessContext context, IntervalWindow window) {
                                        context.output(KV.of(window, context.element()));
                                    }
                                }));
        //Group records by key (window)
        PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
                .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());
    
        //Process windowed data
        PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
                .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
    
        // Run the pipeline.
        p.run().waitUntilFinish();
    

    这是使用FlinkSDK的代码

    // Create a Streaming Execution Environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.setParallelism(6);
    
    //Connect to Kafka
    Properties properties = new Properties();   
    properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
    properties.setProperty("group.id", CONSUMER_GROUP);
    
    DataStream<ObjectNode> stream = env
                .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));
    
    //Key by id
    stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())
    
            //Set the windowing function.
            .timeWindow(Time.seconds(5L), Time.seconds(1L))
    
            //Process Windowed Data
            .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));
    
    // execute program
    env.execute("Using Flink SDK");
    

    非常感谢您提供的任何见解。

    我想我应该添加一些可能相关的指标。

    • taskmanager。2.
    • 2645765232
    • 2,827,676,598
    • 2422309148
    • 2428570491
    • 2,431,368,644
    • taskmanager。2.
    • 4435132862
    • 4766399314
    • 4425190393
    • 4096576110
    • 4092849114
    • 任务管理器.2
      • 93.00%
      • 92.00%
      • 91.00%
      • 90.00%
      • 90.00%
      • 92.00%
      • 任务管理器.2
        • 52.0%
        • 71.0%
        • 72.0%
        • 40.0%
        • 56.0%
        • 26.0%

        Beam似乎使用了更多的网络,而Flink使用了更多的CPU。这是否表明Beam以更有效的方式并行处理?

        我很确定PueCalculatorFn类是等效的,但我将在这里共享代码,看看这两个进程之间是否有明显的差异。

        public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
        private transient List<IKafkaConsumption> realEnergyRecords;
        private transient List<IKafkaConsumption> itEnergyRecords;
        
        @ProcessElement
        public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
            KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
            Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
            Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
            Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();
        
            //Calculate Pue
            IPueResult result = calculatePue(element.getKey(), records);
        
            //Create IntervalWindowResult object to return
            DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
            IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
                    formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);
        
            //Return Pue keyed by Window
            c.output(KV.of(intervalWindowResult, result));
        }
        
        private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
            //Define accumulators to gather readings
            final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
            final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
        
            //Declare variable to store the result
            BigDecimal pue = BigDecimal.ZERO;
        
            //Initialise transient lists
            realEnergyRecords = new ArrayList<>();
            itEnergyRecords = new ArrayList<>();
        
            //Transform the results into a stream
            Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);
        
            //Iterate through each reading and add to the increment count
            streamOfRecords
                    .map(record -> {
                        byte[] valueBytes = record.getKV().getValue();
                        assert valueBytes != null;
                        String valueString = new String(valueBytes);
                        assert !valueString.isEmpty();
                        return KV.of(record, valueString);
                    }).map(kv -> {
                Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
                KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
                return KV.of(kv.getKey(), consumption);
        
            }).forEach(consumptionRecord -> {
                        switch (consumptionRecord.getKey().getTopic()) {
                            case REAL_ENERGY_TOPIC:
                                totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                                realEnergyRecords.add(consumptionRecord.getValue());
                                break;
                            case IT_ENERGY_TOPIC:
                                totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                                itEnergyRecords.add(consumptionRecord.getValue());
                                break;
                        }
                    }
            );
        
            assert totalRealIncrement.doubleValue() > 0.0;
            assert totalItIncrement.doubleValue() > 0.0;
        
            //Beware of division by zero
            if (totalItIncrement.doubleValue() != 0.0) {
                //Calculate PUE
                pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
            }
        
            //Create a PueResult object to return
            IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
            return new PueResult(intervalWindow, pue.stripTrailingZeros());
        }
        
        @Override
        protected void finalize() throws Throwable {
            super.finalize();
            RecordSenderFactory.closeSender();
            WindowSenderFactory.closeSender();
        }
        } 
        
        public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
        private transient List<KafkaConsumption> realEnergyRecords;
        private transient List<KafkaConsumption> itEnergyRecords;
        
        @Override
        public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
            Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
            Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
            BigDecimal pue = calculatePue(iterable);
        
            //Create IntervalWindowResult object to return
            DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
            IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
                    formatter.format(windowEnd), realEnergyRecords
                    .stream()
                    .map(e -> (IKafkaConsumption) e)
                    .collect(Collectors.toList()), itEnergyRecords
                    .stream()
                    .map(e -> (IKafkaConsumption) e)
                    .collect(Collectors.toList()));
        
        
            //Create PueResult object to return
            IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());
        
            //Collect result
            collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));
        
        }
        
        protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
            //Define accumulators to gather readings
            final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
            final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
        
            //Declare variable to store the result
            BigDecimal pue = BigDecimal.ZERO;
        
            //Initialise transient lists
            realEnergyRecords = new ArrayList<>();
            itEnergyRecords = new ArrayList<>();
        
            //Iterate through each reading and add to the increment count
            StreamSupport.stream(iterable.spliterator(), false)
                    .forEach(object -> {
                        switch (object.get("topic").textValue()) {
                            case REAL_ENERGY_TOPIC:
                                totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                                realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                                break;
                            case IT_ENERGY_TOPIC:
                                totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                                itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                                break;
                        }
        
                    });
        
            assert totalRealIncrement.doubleValue() > 0.0;
            assert totalItIncrement.doubleValue() > 0.0;
        
            //Beware of division by zero
            if (totalItIncrement.doubleValue() != 0.0) {
                //Calculate PUE
                pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
            }
            return pue;
        }
        
        }
        

        这是我在Beam示例中使用的自定义反序列化器。

        public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {
        
        public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
            if(jsonElement == null) {
                return null;
            } else {
                JsonObject jsonObject = jsonElement.getAsJsonObject();
                JsonElement id = jsonObject.get("id");
                JsonElement energyConsumed = jsonObject.get("energyConsumed");
                Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
                Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
                JsonElement topic = jsonObject.get("topic");
                Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
                return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
            }
          }
        
        }
        
  • 共有2个答案

    董飞航
    2023-03-14

    值得一提的是,如果可以通过reduce()或aggregate()预聚合窗口处理,那么原生Flink作业的性能应该比当前更好。

    许多细节,例如状态后端的选择、序列化、检查点等,也会对性能产生很大影响。

    在这两种情况下是否使用相同的Flink——即相同的版本、相同的配置?

    百里光熙
    2023-03-14

    不确定您编写的Beam管道为什么更快,但从语义上讲,它与Flink作业不同。与Flink中窗口的工作方式类似,一旦在Beam中指定了窗口,以下所有操作都会自动将窗口考虑在内。您不需要按窗口分组。

    可以将梁管道定义简化如下:

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);
    
    // Create a PCollection of Kafka records
    PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...
    
    //Apply Windowing Function
    PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
     Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));
    
    //Process windowed data
    PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
        .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));
    
    // Run the pipeline.
    p.run().waitUntilFinish();
    

    至于性能,这取决于许多因素,但请记住,Beam是Flink之上的抽象层。总的来说,如果你在Flink上看到Beam的性能提高,我会感到惊讶。

    编辑:为了进一步澄清,您不会在Beam管道中的JSON“id”字段上进行分组,而您会在Flink片段中进行分组。

     类似资料:
    • 我正在学习如何使用Flink处理流数据。 根据我的理解,我可以多次使用函数进行各种转换。 表示数据源不断向Flink发送字符串。所有字符串都是JSON格式的数据,如下所示: 下面是我的代码: 正如您所看到的,我的示例非常简单:获取并反序列化数据-->将string转换为Json对象-->将Json对象转换为string并获取所需内容(这里只需要)。 就目前而言,似乎一切都很好。我确实从日志文件中获

    • 我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期

    • 我想在一个操作符中接收和处理三个流。例如,Storm中实现的代码如下: <代码>生成器。setBolt(“C\u螺栓”,C\u螺栓(),parallelism\u提示)。字段分组(“A\u bolt”,“TRAINING”,新字段(“word”))。字段分组(“B\U螺栓”,“分析”,新字段(“word”))。所有分组(“A\U螺栓”、“总和”) 在Flink中,实现了和的处理: 但我不知道如何添

    • 问题内容: 我正在尝试从第二个窗口切换到第三个窗口。但是无法处理第三个窗口。有人可以帮助我解决此问题。我已经使用比较窗口标题的逻辑,但是它不起作用。代码======================= 错误堆栈跟踪: 问题答案: 这是切换到 并单击 按钮的完整代码块: 我的IDE控制台上的输出是:

    • 我有这样的场景,当点击一个按钮时,它打开了一个基于PDF文件的窗口: 我使用的是Gecko驱动程序版本-21.0Firefox版本-61.0.1 Selenium独立服务器-3.13 我无法切换到基于PDF文件的窗口获取错误: 我想用最新的壁虎驱动程序-21.0来处理它

    • 主要内容:1.窗口概述,2.窗口分类,3.细分,4.窗口Api,5.窗口分配器 Window Assigners,6.窗口函数,7.TopN 实例1.窗口概述 Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口