当前位置: 首页 > 知识库问答 >
问题:

Kafka-生产商批次数

干茂才
2023-03-14

有没有办法确定Kafka制作人为一组特定消息创建的批次数?例如,如果我在一个循环中发送10K条消息,有没有办法检查发送了多少批?我将“batch.size”设置为一个高值,我的期望是消息将被缓冲,并且在我的消费者中看到消息时会有延迟。然而,这似乎是打印几乎立即在我的消费者计划。

批处理时的默认值。尺寸是16384。这是字节数吗?

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


public class KafkaProducerApp {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("acks","0");
        properties.put("batch.size",33554432);

        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        Map<Integer,Integer> partitionCount = new HashMap<Integer,Integer>();
        partitionCount.put(0,0);
        partitionCount.put(1,0);
        partitionCount.put(2,0);


        try{
            Date from = new Date();
            for(int i=0;i<10000;i++) {
                RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("test_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                //RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String,String>("test_topic",0,Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                System.out.println(" Offset = " + ack.offset());
                System.out.println(" Partition = " + ack.partition());
                partitionCount.put(ack.partition(),partitionCount.get(ack.partition())+1);

            }
            Date to = new Date();
            System.out.println(" partition 0 =" + partitionCount.get(0));
            System.out.println(" partition 1 =" + partitionCount.get(1));
            System.out.println(" partition 2 =" + partitionCount.get(2));
            System.out.println(" Elapsed Time = " + (to.getTime()-from.getTime())/1000);

        } catch (Exception ex){
            ex.printStackTrace();
        } finally {
            kafkaProducer.close();
        }



    }

}

共有1个答案

穆飞龙
2023-03-14

您需要的是产品请求的总数。

您可以使用JMX Mbeankafka.producer查看每秒产生请求的平均数量:type=生产者-指标,client-id=([-. w])

 类似资料:
  • 假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?

  • 我用的是阿帕奇·Kafka。我创建了一个war文件,其中生产者用Java编码,消费者用Scala编码。制作人正在从HTML页面获取数据。我可以看到,生产商发布的大部分数据都是关于消费者的,但有些数据缺失。 这是我的制片人代码 文件1 } 文件2 现在,我使用以下命令检查消费者的消息。 我是否缺少任何生产者配置?

  • 我需要设置Kafka生产商发送500 msg在一批不是由味精味精,但批量进口味精。我查过了https://github.com/dpkp/kafka-python/issues/479并尝试了但失败并出现错误: 我也试着像《代码》制作人一样通过考试。制作(主题,*消息)失败: 因此,我挖掘了更多信息,发现我必须在producer配置中将类型设置为async和batch。大小要大于默认值,但当我尝试

  • 我们正在使用Spring云流霍克斯顿。SR4使用来自Kafka主题的消息。我们启用了spring.cloud.stream.bindings.。consumer.batch-Mode=true,每次轮询获取2000条记录。我想知道是否有一种方法可以手动确认/提交整个批次。

  • 从这篇文章https://www.confluent.io/blog/transactions-apache-kafka/ 使用为至少一次交付语义配置的vanilla Kafka生产者和消费者,流处理应用程序可能会以以下方式完全丢失一次处理语义: 制片人。由于内部重试,send()可能导致重复写入消息B。这是由幂等生产者解决的,而不是本文其余部分的重点 2.我们可能会重新处理输入消息A,导致重复的

  • 一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod