我试图运行kafka生产者使用kafka-clients API。我有多个生产者与单独的线程运行,每个试图写数据到Kafka。问题是,当我增加并行运行的线程数量时,我会从kafka中得到一个中断的异常。例如,如果我并行运行20个线程,它不会引发任何异常,但当我并行运行100个线程时,我会得到以下异常:
线程“pool-910-thread-1”org.apache.kafka.common.errors.interruptexception:java.lang.interruptedexception
在org.apache.kafka.clients.producer.kafkaProducer.close(kafkaProducer.java:1154)
在org.apache.kafka.clients.producer.kafkaProducer.close(kafkaProducer.java:1154)
在Processor.runProducer(IEC104KafKareadMessageProcessor.java:45)
在com.t4e.iec104.connection.iec60870ReadListener.writeToJsonFile(IEC60870ReadListener.java:707)
在com.t4e.iec104.connection.iec60870ReadListener.newasdu(IEC60870ReadListener.java:75)
在org.openmuc.j60870.connection util.concurrent.ThreadPoolExecutor$worker.run(threadPoolExecutor.java:624)
在java.lang.thread.run(thread.java:748)
导致了y:java.lang.interruptedException
在java.lang.object.wait(本机方法)
在java.lang.thread.join(thread.java:1260)
在org.apache.kafka.clients.producer.kafkaProducer.close(kafkaProducer.java:1152)
下面是我的制作者代码:
private static final Logger logger = LoggerFactory.getLogger(IEC104KafkaReadMessageProcessor.class);
static KafkaProducerConfigReader kafkaConfig = new KafkaProducerConfigReader();
static String newLine = System.getProperty("line.separator");
/**
* @param message
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
public static synchronized RecordMetadata runProducer(String message) throws InterruptedException, ExecutionException {
Producer<Long, String> producer = ProducerCreator.createProducer();
ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(kafkaConfig.getTopicName(), message);
try {
RecordMetadata metadata = producer.send(record).get();
logger.info(("Record sent with key " + " to partition " + metadata.partition() + " with offset "
+ metadata.offset()));
return metadata;
} catch (ExecutionException e) {
logger.error("ExecutionException : Error in sending record to kafka");
throw new ExecutionException(e);
} catch (InterruptedException e) {
logger.error("InterruptedException : Error in sending record" + newLine);
throw new InterruptedException();
} finally {
logger.info(" Closing Kafka producer ");
producer.close();
}
}
问题可能不在您正在创建生产者的这段代码中。看着你们分享的日志,
线程“pool-910-thread-1”org.apache.kafka.common.errors.interruptexception:java.lang.interruptedexception
我可以看到有910个线程池处于活动状态。创建这么多的池而不是创建线程保存一个池可能是一个更好的主意。您可能希望查看创建线程池的位置并对其进行控制。
我怀疑您代码中的线程泄漏会导致此中断异常。
我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那
我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看
我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:
我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?
前期回顾 其中channel.finishConnect()中完成建立连接,调用了 sender的run(),继续分析 其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send() 调用selector的send() 调用kafkachannel的setsend() 开始发送数据 sender里面的pol
使用Spring kafka模板,我有2个不同的生产者,他们使用相同的键向主题发送不同的消息,始终以相同的形式: 生产者1:发送密钥:1消息:abc分区0 生产者2:发送密钥:2消息:def 我有3个分区,所有消息都根据消息键发送到同一个分区。 现在,我需要确保,根据某些属性,特定消息将发送到特定分区,以便能够管理系统中的某些优先级。 问题是生产者2无法知道生产者1选择了哪个分区。 Kafka确保