当前位置: 首页 > 知识库问答 >
问题:

Kafka Producer线程,即使没有发送消息,也有大量线程

国阳
2023-03-14

我目前分析了我的kafka制作人Spring boot应用程序,发现许多“kafka制作人-网络-线程”正在运行(总共47个)。即使没有发送数据,它也永远不会停止运行。我的应用程序看起来有点像这样:

var kafkaSender = KafkaSender(kafkaTemplate, applicationProperties)
kafkaSender.sendToKafka(json, rs.getString("KEY"))

与KafkaSender:

@Service
class KafkaSender(val kafkaTemplate: KafkaTemplate<String, String>, val applicationProperties: ApplicationProperties) {

@Transactional(transactionManager = "kafkaTransactionManager")
fun sendToKafka(message: String, stringKey: String) {
   kafkaTemplate.executeInTransaction { kt ->
       kt.send(applicationProperties.kafka.topic, System.currentTimeMillis().mod(10).toInt(), System.currentTimeMillis().rem(10).toString(),
               message)
   }
}

companion object {
    val log = LoggerFactory.getLogger(KafkaSender::class.java)!!
}
}

因为每次我想给kafka发送消息时,我都会引用一个新的KafkaSender,所以我认为会创建一个新的线程,然后将消息发送到Kafka队列。目前看起来像是产生了一批生产者,但从来没有清理过,即使他们没有任何事情可做。

这种行为是有意的吗?

在我看来,这种行为应该与数据源池几乎相同,使线程保持活动状态一段时间,但是当无事可做时,请清除它。

共有1个答案

巫马昆杰
2023-03-14

使用事务时,生产者缓存会按需增长,而不会减少。

如果您正在侦听器容器(使用者)线程上生成消息;每个主题/分区/消费者组都有一个生产者。这是解决僵尸Geofence问题所必需的,以便在发生重新平衡且分区移动到其他实例时,事务id将保持不变,以便代理可以正确处理这种情况。

如果您不关心僵尸防护问题(并且您可以处理重复交付),请将< code > DefaultKafkaProducerFactory 上的< code > producerPerConsumerPartition 属性设置为false,生产者的数量将会少得多。

编辑

从版本2.8开始,默认EOSMode现在是V2(又名BETA);这意味着不再需要每个主题/分区/组都有一个生产者,只要代理版本是2.5或更高版本。

 类似资料:
  • 我正在为android中的knx模块开发一个串口应用程序。我可以向knx modulde发送和接收赞扬。当从serialport接收到消息时,我想更改ui(例如按钮属性)。我用处理程序试过了,但我无法更改ui。帮我一把。 @覆盖公共空OnSerialsData(最终字节[]缓冲区,最终int大小){......} 它是我的串行端口侦听器函数,调用insine ReadThread。此线程从我的活动

  • 问题内容: 我需要向程序中运行的每个线程发送信息,并且每个线程都必须处理该信息。 我无法使用常规队列来执行此操作,因为那样一来,一旦一个线程从队列中删除了数据,所有其他线程将无法再看到它。 实现此目标的最佳方法是什么? 问题答案: 一种方法是在 每个 线程中都有一个队列,广播信息的功能负责将消息插入每个线程的队列中。 例如,这类似于消息队列在Windows中的工作方式。每个执行GUI操作的线程都有

  • 我是如何临时修复它的:考虑我已经实现了下面的方法 它向websocket会话发送一个TextMessage。我无法使整个方法同步,因为多个线程可以为不同的websocketSessions和消息调用它。我也不能将会话放在同步块中(尝试了但没有工作) 虽然,我这样解决了我的问题

  • 如果有一个写入线程和一个读取线程,我们仍然可以有一个具有易失性变量的竞争条件。就像下面的代码一样,在一种情况下,编写线程检查x值为零,没有发生上下文切换,读取线程也看到x值为0。在一种情况下,写入线程检查x的值为零,递增1,将其刷新到主存储器(因为x是易失性的),发生上下文切换,读取线程将x的值视为1。我只是想知道在这个用例中,挥发性是否足够,或者我们需要使用同步来避免竞争条件 我试过执行下面的代

  • 我正在尝试一个带有openfire、smack和android的聊天应用程序,其中脱机消息不起作用。如果两个用户都在线,则能够正确发送和接收消息。但是,如果用户A处于脱机状态,并且用户B发送了一条消息,那么一旦用户A处于联机状态,他就不会收到B发送的消息。尝试了stackoverflow中可能的解决方案,但均无效。使用以下代码检索脱机消息。 新线程(){公共无效运行(){

  • 我试图使用Netty 4.0.8创建一个相当简单的WebSocket服务器。我有基本的握手设置和工作。但是从一个单独的线程发送的消息似乎没有传到客户端。 客户机/服务器交互的工作方式是,客户机启动连接,然后通过WebSocket发送初始消息(“hello”)。服务器立即响应。此消息通过Chrome开发工具传递并可见。写入此消息后,我将存储在中。此初始化如下: 然后添加,如下所示: 在单独的线程(在