当前位置: 首页 > 知识库问答 >
问题:

我从多个生产者向kafka发送数据时得到InterruptedException

有凯泽
2023-03-14

我试图运行kafka生产者使用kafka-clients API。我有多个生产者与单独的线程运行,每个试图写数据到Kafka。问题是,当我增加并行运行的线程数量时,我会从kafka中得到一个中断的异常。例如,如果我并行运行20个线程,它不会引发任何异常,但当我并行运行100个线程时,我会得到以下异常:

线程“pool-910-thread-1”org.apache.kafka.common.errors.interruptexception:java.lang.interruptedexception
在org.apache.kafka.clients.producer.kafkaProducer.close(kafkaProducer.java:1154)
在org.apache.kafka.clients.producer.kafkaProducer.close(kafkaProducer.java:1154)
在Processor.runProducer(IEC104KafKareadMessageProcessor.java:45)
在com.t4e.iec104.connection.iec60870ReadListener.writeToJsonFile(IEC60870ReadListener.java:707)
在com.t4e.iec104.connection.iec60870ReadListener.newasdu(IEC60870ReadListener.java:75)
在org.openmuc.j60870.connection util.concurrent.ThreadPoolExecutor$worker.run(threadPoolExecutor.java:624)
在java.lang.thread.run(thread.java:748)
导致了y:java.lang.interruptedException
在java.lang.object.wait(本机方法)
在java.lang.thread.join(thread.java:1260)
在org.apache.kafka.clients.producer.kafkaProducer.close(kafkaProducer.java:1152)

下面是我的制作者代码

private static final Logger logger = LoggerFactory.getLogger(IEC104KafkaReadMessageProcessor.class);
    static KafkaProducerConfigReader kafkaConfig = new KafkaProducerConfigReader();
    static String newLine = System.getProperty("line.separator");

    /**
     * @param message
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public static synchronized RecordMetadata runProducer(String message) throws InterruptedException, ExecutionException {
        Producer<Long, String> producer = ProducerCreator.createProducer();
        ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(kafkaConfig.getTopicName(), message);
        try {
            RecordMetadata metadata = producer.send(record).get();
            logger.info(("Record sent with key " + " to partition " + metadata.partition() + " with offset "
                    + metadata.offset()));
            return metadata;
        } catch (ExecutionException e) {
            logger.error("ExecutionException : Error in sending record to kafka");
            throw new ExecutionException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException : Error in sending record" + newLine);
            throw new InterruptedException();
        } finally {
            logger.info(" Closing Kafka producer ");
            producer.close();
        }
    }

共有1个答案

公英哲
2023-03-14

问题可能不在您正在创建生产者的这段代码中。看着你们分享的日志,

线程“pool-910-thread-1”org.apache.kafka.common.errors.interruptexception:java.lang.interruptedexception

我可以看到有910个线程池处于活动状态。创建这么多的池而不是创建线程保存一个池可能是一个更好的主意。您可能希望查看创建线程池的位置并对其进行控制。

我怀疑您代码中的线程泄漏会导致此中断异常

 类似资料:
  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那

  • 我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看

  • 我正在尝试使用kafka-avro-convore-生产者发布一条具有键(带有模式)和值(带有模式)的消息。kafka环境(kafka的conFluent 6.2.0版本、连接、zoomaster、模式注册表)都正确启动,我可以确认我的连接器已安装。问题是当我发送消息时,我的Sink连接器失败并出现我无法诊断的错误。 感谢您的帮助: 我生成一条AVRO消息,如下所示: 并在连接日志中接收以下错误:

  • 我有一个用例“XML文件==>Kafka主题==>Build REST API to Query”来自Kafka主题的数据。我熟悉将数据转换为Avro格式,并编写到kafka主题。 您能建议如何发布XML吗?

  • 前期回顾 其中channel.finishConnect()中完成建立连接,调用了 sender的run(),继续分析 其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send() 调用selector的send() 调用kafkachannel的setsend() 开始发送数据 sender里面的pol

  • 使用Spring kafka模板,我有2个不同的生产者,他们使用相同的键向主题发送不同的消息,始终以相同的形式: 生产者1:发送密钥:1消息:abc分区0 生产者2:发送密钥:2消息:def 我有3个分区,所有消息都根据消息键发送到同一个分区。 现在,我需要确保,根据某些属性,特定消息将发送到特定分区,以便能够管理系统中的某些优先级。 问题是生产者2无法知道生产者1选择了哪个分区。 Kafka确保