当前位置: 首页 > 知识库问答 >
问题:

当限制Kafka批大小时,如何使spark streaming在每个批中提交?

刘兴朝
2023-03-14

Kafka大约有5000万张唱片库存(即将消耗)。主题是3个分区。

zhihu_comment   0          10906153        28668062        17761909        -               -               -
zhihu_comment   1          10972464        30271728        19299264        -               -               -
zhihu_comment   2          10906395        28662007        17755612        -               -               -

我的消费应用程序:

public final class SparkConsumer {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    String brokers = "device1:9092,device2:9092,device3:9092";
    String groupId = "spark";
    String topics = "zhihu_comment";

    // Create context with a certain seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
    sparkConf.set("spark.streaming.backpressure.enabled", "true");
    sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("max.poll.records", "500");


    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    lines.count().print();

    jssc.start();
    jssc.awaitTermination();
  }
}

我限制了spark streaming的消耗大小,在我的例子中,我将MaxRatePerPartition设置为10000,这意味着在我的例子中,它每批消耗30000条记录。

zhihu_comment   0          28700537        28700676        139             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   1          30305102        30305224        122             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   2          28695033        28695146        113             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1

有什么方法可以让spark streaming在每个批处理中提交?

Spark streaming日志,证明它每批消耗的记录num:

20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000

20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms

共有1个答案

傅泉
2023-03-14

您需要禁用

kafkaParams.put("enable.auto.commit", false);

而是使用

messages.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // do here some transformations and action on the rdd, typically like:
  // rdd.foreachPartition(it -> {
  //   it.foreach(row -> ...)
  // })

  // some time later, after outputs have completed
  ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});

如Spark+Kafka集成指南所述。

 类似资料:
  • 是否可以限制Kafka消费者返回Spark Streaming的批的大小? 我这么问是因为我得到的第一批记录有上亿条记录,处理和检查它们需要很长时间。

  • 问题内容: 提交时是否有限制文件大小的选项? 例如:文件大小超过500K会产生警告。文件大小超过10M将会停止提交。 我完全知道这个问题,从技术上讲,它是一个重复的问题,但是答案仅是 push上的解决方案,对于我的要求而言为时已晚。 问题答案: 此预提交的挂钩将执行文件大小检查: .git / hooks / pre-commit 上面的脚本必须另存为启用执行权限()。 默认的软(警告)和硬(错误

  • 我用的是Kafka1.0,我增加了批量。大小=100K,用于优化我的制作人性能。但我发现,无论我设定的批次是什么,都没有任何效果。尺寸=100K或1000K或仅1K。此外,我还设定了我的逗留时间。ms=5,但这使性能更差。当我调试Kafka producer的源代码时,如下所示: 我发现了结果的价值。纽巴奇总是正确的,我想这就是为什么这一批。大小没有起任何作用,因为它每次都会唤醒发送者,而不是在b

  • partition/data只有15G,kafka日志文件夹是-/data/var/kafka/kafka-logs data/var/kafka/kafka-logs下的大多数文件夹大小为4K-40K 但两个文件夹的大小非常大--5G-7G,这导致/数据是100%

  • 我有一个关于优化kafka异步生产者吞吐量的问题:配置批处理。大小和逗留。ms在使用异步生成器时会产生影响吗? 我这样问是因为我认为这些参数只会影响同步生产者,因为它将等待代理确认。在异步生产者的情况下,这不会产生影响吗? 此外,是否有任何配置参数可以优化异步生成器?

  • 我们正在使用Spring云流霍克斯顿。SR4使用来自Kafka主题的消息。我们启用了spring.cloud.stream.bindings.。consumer.batch-Mode=true,每次轮询获取2000条记录。我想知道是否有一种方法可以手动确认/提交整个批次。