当前位置: 首页 > 知识库问答 >
问题:

我用kafka-producer在多线程中发送消息,但发生了消息丢失

邹普松
2023-03-14

我正在使用kafka-producer并将数据发送到kafka-cluster(由三个代理组成)中由replication-factor 3和partitions 1组成的主题'test-topic'。

我创造了五条线。每个线程发送了10,000条消息(每个消息大小为4000字节)。

我预计最新抵销50,000,但实际上是44,993.

约有5,000条信息丢失。

为什么会出现消息丢失?在我的代码下面...(Kafka-1.1.0版)

public class KafkaMessageSender {
    private final static Logger logger = 
 LoggerFactory.getLogger(KafkaMessageSender.class);
    private Properties props;
    private KafkaProducer<String, String> producer;
    private String topic;
    private AtomicInteger count;

    public KafkaMessageSender(AtomicInteger count, String bootstrapUrls, String topic) {
        logger.info("KafkaMessageSender initializing...");
        this.topic = topic;
        this.count = count;
        props = new Properties();
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16384
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
        logger.info("KafkaMessageSender initializing end");     

    }

    public void sendMessages() {
        producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K)); //Messages.MSG_4K indicates 4000bytes message
        count.getAndIncrement();
        logger.info("count : "+count.get());
    }
}
public class KafkaMessageSenderMain {

    private final static Logger logger = LoggerFactory.getLogger(KafkaMessageSenderMain.class);

    final static String bootstrap_url = "ism1.solulink.co.kr:9092,ism2.solulink.co.kr:9092,ism3.solulink.co.kr:9092";
    final static String topic = "test-topic"; //topic name
    final static AtomicInteger count = new AtomicInteger(0);
    final static int MAX_LOOP = 10000; //message sending count
    final static int MAX_THREAD = 5;  //created number of threads

    public static void main(String[] args) {        
        long startTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
        for(int i = 0; i < MAX_THREAD; i++) {
            executorService.execute(() ->{
                    KafkaMessageSender sender = new KafkaMessageSender(count, bootstrap_url, topic);
                    for(int j = 0; j < MAX_LOOP; j++) {
                        sender.sendMessages(); //send message
                    }
            });
        }

        executorService.shutdown();
        try {
            boolean flag = executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 
            long endTime = System.currentTimeMillis();
            long procTime = (endTime - startTime);
            logger.info("all Threads is shutdown? : "+flag);
            logger.info("processTime : " + ((double)procTime/(double)1000L)+"sec");
        } catch (InterruptedException e) {
            logger.error("awaitTermination exception",e);
        }
    }
}

结果

结果图像

共有1个答案

欧阳俊逸
2023-03-14

你能像下面这样修改和运行你的代码,看看错误是什么吗?

producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K), new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null)
                e.printStackTrace();
        }
});
 类似资料:
  • 主要内容:1 send源码入口,1.1 同步消息,1.2 单向消息,1.3 异步消息,2 sendDefaultImpl发送消息实现,2.1 makeSureStateOK确定生产者服务状态,2.2 checkMessage校验消息的合法性,2.3 tryToFindTopicPublishInfo查找topic的发布信息,2.4 计算发送次数timesTotal,2.5 selectOneMessageQueue选择消息队列,,,基于RocketMQ 4.9.3,详细的介绍了Producer发

  • 我目前分析了我的kafka制作人Spring boot应用程序,发现许多“kafka制作人-网络-线程”正在运行(总共47个)。即使没有发送数据,它也永远不会停止运行。我的应用程序看起来有点像这样: 与KafkaSender: 因为每次我想给kafka发送消息时,我都会引用一个新的KafkaSender,所以我认为会创建一个新的线程,然后将消息发送到Kafka队列。目前看起来像是产生了一批生产者,

  • 我是如何临时修复它的:考虑我已经实现了下面的方法 它向websocket会话发送一个TextMessage。我无法使整个方法同步,因为多个线程可以为不同的websocketSessions和消息调用它。我也不能将会话放在同步块中(尝试了但没有工作) 虽然,我这样解决了我的问题

  • 我正在为android中的knx模块开发一个串口应用程序。我可以向knx modulde发送和接收赞扬。当从serialport接收到消息时,我想更改ui(例如按钮属性)。我用处理程序试过了,但我无法更改ui。帮我一把。 @覆盖公共空OnSerialsData(最终字节[]缓冲区,最终int大小){......} 它是我的串行端口侦听器函数,调用insine ReadThread。此线程从我的活动

  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送