我正在使用kafka-producer并将数据发送到kafka-cluster(由三个代理组成)中由replication-factor 3和partitions 1组成的主题'test-topic'。
我创造了五条线。每个线程发送了10,000条消息(每个消息大小为4000字节)。
我预计最新抵销50,000,但实际上是44,993.
约有5,000条信息丢失。
为什么会出现消息丢失?在我的代码下面...(Kafka-1.1.0版)
public class KafkaMessageSender {
private final static Logger logger =
LoggerFactory.getLogger(KafkaMessageSender.class);
private Properties props;
private KafkaProducer<String, String> producer;
private String topic;
private AtomicInteger count;
public KafkaMessageSender(AtomicInteger count, String bootstrapUrls, String topic) {
logger.info("KafkaMessageSender initializing...");
this.topic = topic;
this.count = count;
props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16384
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
logger.info("KafkaMessageSender initializing end");
}
public void sendMessages() {
producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K)); //Messages.MSG_4K indicates 4000bytes message
count.getAndIncrement();
logger.info("count : "+count.get());
}
}
public class KafkaMessageSenderMain {
private final static Logger logger = LoggerFactory.getLogger(KafkaMessageSenderMain.class);
final static String bootstrap_url = "ism1.solulink.co.kr:9092,ism2.solulink.co.kr:9092,ism3.solulink.co.kr:9092";
final static String topic = "test-topic"; //topic name
final static AtomicInteger count = new AtomicInteger(0);
final static int MAX_LOOP = 10000; //message sending count
final static int MAX_THREAD = 5; //created number of threads
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
for(int i = 0; i < MAX_THREAD; i++) {
executorService.execute(() ->{
KafkaMessageSender sender = new KafkaMessageSender(count, bootstrap_url, topic);
for(int j = 0; j < MAX_LOOP; j++) {
sender.sendMessages(); //send message
}
});
}
executorService.shutdown();
try {
boolean flag = executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
long endTime = System.currentTimeMillis();
long procTime = (endTime - startTime);
logger.info("all Threads is shutdown? : "+flag);
logger.info("processTime : " + ((double)procTime/(double)1000L)+"sec");
} catch (InterruptedException e) {
logger.error("awaitTermination exception",e);
}
}
}
结果
结果图像
你能像下面这样修改和运行你的代码,看看错误是什么吗?
producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null)
e.printStackTrace();
}
});
我目前分析了我的kafka制作人Spring boot应用程序,发现许多“kafka制作人-网络-线程”正在运行(总共47个)。即使没有发送数据,它也永远不会停止运行。我的应用程序看起来有点像这样: 与KafkaSender: 因为每次我想给kafka发送消息时,我都会引用一个新的KafkaSender,所以我认为会创建一个新的线程,然后将消息发送到Kafka队列。目前看起来像是产生了一批生产者,
主要内容:1 send源码入口,1.1 同步消息,1.2 单向消息,1.3 异步消息,2 sendDefaultImpl发送消息实现,2.1 makeSureStateOK确定生产者服务状态,2.2 checkMessage校验消息的合法性,2.3 tryToFindTopicPublishInfo查找topic的发布信息,2.4 计算发送次数timesTotal,2.5 selectOneMessageQueue选择消息队列,,,基于RocketMQ 4.9.3,详细的介绍了Producer发
我是如何临时修复它的:考虑我已经实现了下面的方法 它向websocket会话发送一个TextMessage。我无法使整个方法同步,因为多个线程可以为不同的websocketSessions和消息调用它。我也不能将会话放在同步块中(尝试了但没有工作) 虽然,我这样解决了我的问题
我正在为android中的knx模块开发一个串口应用程序。我可以向knx modulde发送和接收赞扬。当从serialport接收到消息时,我想更改ui(例如按钮属性)。我用处理程序试过了,但我无法更改ui。帮我一把。 @覆盖公共空OnSerialsData(最终字节[]缓冲区,最终int大小){......} 它是我的串行端口侦听器函数,调用insine ReadThread。此线程从我的活动
如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。
我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送