我们有一个kafka streams应用程序(2.0),它正在与kafka代理(1.1.0)通信。streams应用程序一直在毫无原因地重新处理整个日志-应用程序没有重新启动,没有重新平衡,只是闲坐着-在某些情况下,它正在处理消息,在另一些情况下,它正在等待接收消息(处理消息的时间不到6个小时)。我们已经做了大量的研究,通过将偏移保留分钟设置为1周,与邮件保留时间相同,排除了潜在原因。此外,如果消费者组偏移量在积极处理消息时被重置,那么这将是问题的根本原因,这是没有意义的。
在事件发生时,代理日志中没有任何有趣的内容:
[2019-02-21 09:02:20,009] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-21 09:12:20,009] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-21 09:12:51,084] INFO [ProducerStateManager partition=MY_TOPIC-1] Writing producer snapshot at offset 422924 (kafka.log.ProducerStateManager)
[2019-02-21 09:12:51,085] INFO [Log partition=MY_TOPIC-1, dir=/data1/kafka] Rolled new log segment at offset 422924 in 1 ms. (kafka.log.Log)
[2019-02-21 09:14:56,384] INFO [ProducerStateManager partition=MY_TOPIC-12] Writing producer snapshot at offset 295610 (kafka.log.ProducerStateManager)
[2019-02-21 09:14:56,384] INFO [Log partition=MY_TOPIC-12, dir=/data1/kafka] Rolled new log segment at offset 295610 in 1 ms. (kafka.log.Log)
[2019-02-21 09:15:19,365] INFO [ProducerStateManager partition=__transaction_state-8] Writing producer snapshot at offset 3939084 (kafka.log.ProducerStateManager)
[2019-02-21 09:15:19,365] INFO [Log partition=__transaction_state-8, dir=/data1/kafka] Rolled new log segment at offset 3939084 in 0 ms. (kafka.log.Log)
[2019-02-21 09:21:26,755] INFO [ProducerStateManager partition=MY_TOPIC-9] Writing producer snapshot at offset 319799 (kafka.log.ProducerStateManager)
[2019-02-21 09:21:26,755] INFO [Log partition=MY_TOPIC-9, dir=/data1/kafka] Rolled new log segment at offset 319799 in 1 ms. (kafka.log.Log)
[2019-02-21 09:22:20,009] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-02-21 09:23:31,283] INFO [ProducerStateManager partition=__consumer_offsets-17] Writing producer snapshot at offset 47345110 (kafka.log.ProducerStateManager)
[2019-02-21 09:23:31,297] INFO [Log partition=__consumer_offsets-17, dir=/data1/kafka] Rolled new log segment at offset 47345110 in 28 ms. (kafka.log.Log)
应用程序日志中绝对没有任何内容(即使日志级别设置为
DEBUG
)。
你知道是什么导致了这个问题吗?
将Kafka代理升级到2.0.0解决了此问题。
我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。
我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何
想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这
我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: