Kafka大约有5000万张唱片库存(即将消耗)。主题是3个分区。
zhihu_comment 0 10906153 28668062 17761909 - - -
zhihu_comment 1 10972464 30271728 19299264 - - -
zhihu_comment 2 10906395 28662007 17755612 - - -
我的消费应用程序:
public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "device1:9092,device2:9092,device3:9092";
String groupId = "spark";
String topics = "zhihu_comment";
// Create context with a certain seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("max.poll.records", "500");
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
lines.count().print();
jssc.start();
jssc.awaitTermination();
}
}
我限制了spark streaming的消耗大小,在我的例子中,我将MaxRatePerPartition
设置为10000,这意味着在我的例子中,它每批消耗30000条记录。
zhihu_comment 0 28700537 28700676 139 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 1 30305102 30305224 122 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 2 28695033 28695146 113 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
有什么方法可以让spark streaming在每个批处理中提交?
Spark streaming日志,证明它每批消耗的记录num:
20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000
20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms
您需要禁用
kafkaParams.put("enable.auto.commit", false);
而是使用
messages.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// do here some transformations and action on the rdd, typically like:
// rdd.foreachPartition(it -> {
// it.foreach(row -> ...)
// })
// some time later, after outputs have completed
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});
如Spark+Kafka集成指南所述。
是否可以限制Kafka消费者返回Spark Streaming的批的大小? 我这么问是因为我得到的第一批记录有上亿条记录,处理和检查它们需要很长时间。
问题内容: 提交时是否有限制文件大小的选项? 例如:文件大小超过500K会产生警告。文件大小超过10M将会停止提交。 我完全知道这个问题,从技术上讲,它是一个重复的问题,但是答案仅是 push上的解决方案,对于我的要求而言为时已晚。 问题答案: 此预提交的挂钩将执行文件大小检查: .git / hooks / pre-commit 上面的脚本必须另存为启用执行权限()。 默认的软(警告)和硬(错误
我用的是Kafka1.0,我增加了批量。大小=100K,用于优化我的制作人性能。但我发现,无论我设定的批次是什么,都没有任何效果。尺寸=100K或1000K或仅1K。此外,我还设定了我的逗留时间。ms=5,但这使性能更差。当我调试Kafka producer的源代码时,如下所示: 我发现了结果的价值。纽巴奇总是正确的,我想这就是为什么这一批。大小没有起任何作用,因为它每次都会唤醒发送者,而不是在b
partition/data只有15G,kafka日志文件夹是-/data/var/kafka/kafka-logs data/var/kafka/kafka-logs下的大多数文件夹大小为4K-40K 但两个文件夹的大小非常大--5G-7G,这导致/数据是100%
我有一个关于优化kafka异步生产者吞吐量的问题:配置批处理。大小和逗留。ms在使用异步生成器时会产生影响吗? 我这样问是因为我认为这些参数只会影响同步生产者,因为它将等待代理确认。在异步生产者的情况下,这不会产生影响吗? 此外,是否有任何配置参数可以优化异步生成器?
我们正在使用Spring云流霍克斯顿。SR4使用来自Kafka主题的消息。我们启用了spring.cloud.stream.bindings.。consumer.batch-Mode=true,每次轮询获取2000条记录。我想知道是否有一种方法可以手动确认/提交整个批次。