当前位置: 首页 > 知识库问答 >
问题:

数据发布期间生成大量Kafka生产者网络线程,空指针异常Spring Kafka

孟福
2023-03-14

我正在使用Spring Kafka 2.3.9编写一个Kafka制作人,该制作人假设向一个主题发布大约200000条消息。例如,我有一个从数据库中提取的200000个对象的列表,我想将这些对象的json消息发布到一个主题。

我写的制作人在发布1000条消息方面做得很好。然后它创建了一些空指针错误(我已经包括了下面的屏幕截图)。

在调试过程中,我发现Kafka Producer网络线程的数量非常高。我数不清,但肯定有500多个。

我读过线程Kafka Producer thread,即使没有发送消息,也有大量线程,并通过在DefaultKafkaProducerFactory上使producerPerConsumerPartition属性为false进行了类似的配置。但它仍然没有减少Kafka生产者网络线程数。

下面是我的代码片段、错误和这些线程的图片。我不能发布所有的代码段,因为它来自一个真实的项目。

代码段

public DefaultKafkaProducerFactory<String, String> getProducerFactory() throws IOException, IllegalStateException {
        Map<String, Object> configProps = getProducerConfigMap();
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configProps);
        //defaultKafkaProducerFactory.transactionCapable();
        defaultKafkaProducerFactory.setProducerPerConsumerPartition(false);
        defaultKafkaProducerFactory.setProducerPerThread(false);
        return defaultKafkaProducerFactory;
    }
public Map<String, Object> getProducerConfigMap() throws IOException, IllegalStateException {   
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getKafkaRetryConfig());
        configProps.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getKafkaAcknowledgementConfig());
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getKafkaClientId());
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 512 * 1024 * 1024);
        configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10 * 1000);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //updateSSLConfig(configProps);
        return configProps;
    }

@Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        ProducerFactory<String, String> producerFactory = getProducerFactory();
        KafkaTemplate<String, String> kt = new KafkaTemplate<String, String>(stringProducerFactory, true);
        kt.setCloseTimeout(java.time.Duration.ofSeconds(5));
        return kt;
    }

错误

2020-12-07 18:14:19.249  INFO 26651 --- [onPool-worker-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=kafka-client-09f48ec8-7a69-4b76-a4f4-a418e96ff68e-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2020-12-07 18:14:19.254 ERROR 26651 --- [onPool-worker-1] c.w.p.r.g.xxxxxxxx.xxx.KafkaPublisher   : Exception happened publishing to topic. Failed to construct kafka producer
2020-12-07 18:14:19.273  INFO 26651 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-12-07 18:14:19.281 ERROR 26651 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at xxx.xxx.xxx.Application.main(Application.java:46) [classes/:na]
Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[na:1.8.0_144]
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[na:1.8.0_144]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[na:1.8.0_144]
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_144]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_144]
Caused by: java.lang.NullPointerException: null
    at com.xxx.xxx.xxx.xxx.KafkaPublisher.publishData(KafkaPublisher.java:124) ~[classes/:na]
    at com.xxx.xxx.xxx.xxx.lambda$0(Publisher.java:39) ~[classes/:na]
    at java.util.ArrayList.forEach(ArrayList.java:1249) ~[na:1.8.0_144]
    at com.xxx.xxx.xxx.xxx.publishData(Publisher.java:38) ~[classes/:na]
    at com.xxx.xxx.xxx.xxx.Application.lambda$0(Application.java:75) [classes/:na]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[na:1.8.0_144]
    ... 5 common frames omitted

以下是发布消息的代码-行号124是我们实际调用KafkaTemboard的时候

public void publishData(Object object) {

        ListenableFuture<SendResult<String, String>> future = null;

        // Convert the Object to JSON
        String json = convertObjectToJson(object);

        // Generate unique key for the message
        String key = UUID.randomUUID().toString();

        // Post the JSON to Kafka
        try {
            future = kafkaConfig.kafkaTemplate().send(kafkaProperties.getTopicName(), key, json);
            
        } catch (Exception e) {
            logger.error("Exception happened publishing to topic. {}", e.getMessage());
        }

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    logger.info("Sent message with key=[" + key + "]");
                }

                @Override
                public void onFailure(Throwable ex) {
                    logger.error("Unable to send message=[ {} due to {}", json, ex.getMessage());
                }
        });
        kafkaConfig.kafkaTemplate().flush();
}

============================

我不确定这个错误是否是由这些网络线程引起的。

发布数据后,我调用了KafkaTemplate flush方法。它不起作用。我也叫ProducerFactory closeThreadBoundProducer,reset,destroy方法。它们似乎都不起作用。

我是否缺少任何配置?

共有1个答案

蒋文光
2023-03-14

空指针问题实际上与Spring Kafka无关。我们从网络连接的不同位置读取主题名称。该网络连接在少数情况下失败,并导致空指针问题,最终导致上述错误。

 类似资料:
  • 我们的生产环境中出现了随机的和: 我们偶尔会在我的生产者日志中得到这个异常: 主题:XXXXXX:5608 ms的过期记录自批量创建加上逗留时间以来已经过去。 此类错误消息中的毫秒数不断变化。有时是5秒,有时是13秒! 我们很少能得到: 集群由3个经纪人和3个动物园管理员组成。生产者服务器和Kafka集群在同一个网络中。 我在打同步电话。有一个web服务,多个用户请求调用它来发送数据。Kafka

  • 我的理解是,Kafka制作人向Kafka经纪人群发送消息。我的问题是,Kafka制作人在网络分割期间的行为是什么?如果分区太长(并且卷太高),最终消息会丢失吗? 此外,如果系统在分区期间崩溃,Kafka队列中的所有消息都会丢失吗?

  • 我无法使用Spring Kafka集成发布消息,尽管我的Kafka Java客户端工作正常。 Java代码在Windows上运行,Kafka在Linux上运行。 我得到以下错误

  • 我尝试使用 kafka 实现一个简单的生产者消费者示例,并使用以下属性实现了: 然而,当我在另一个项目(数据可视化软件的插件)中尝试完全相同的配置时,我得到了以下错误: 在我说它工作的第一个版本中,我使用了“mvn clean compile assembly:single”,但是在第二个版本中,我为整个项目创建了一个jar文件。因为可视化软件需要一个jar文件来安装插件。因为每件事都是一样的(至

  • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会

  • 我是Android Studio的新手,我以为我做得很好,但是昨晚遇到了一个错误,尽管我尽了最大努力在谷歌上搜索,我似乎还是无法修复。我的一个活动上的一个按钮“可能会产生java.lang.NullPointerExctive”,只是每次按下它都会失败。这可能只是在错误的地方订购一行代码等简单的事情,但是我对Android Studio太陌生了,我真的不知道我哪里出错了。 这是XML 还有清单文件