当前位置: 首页 > 知识库问答 >
问题:

KafkaStreams应用程序停止处理消息取消订阅所有主题或模式以及分配的分区

洪楚
2023-03-14

我有一个KafkaStream应用程序,以前工作得很好。现在无论我用新的应用程序id重新启动它多少次,它都不会开始消耗主题,我收到了这个日志:

INFO org.apache.kafka.streams.KafkaStreams stream-client [score_redeemX-7e37d43e-12ab-4c66-984e-5f959d4e5e08] State transition from REBALANCING to RUNNING
INFO org.apache.kafka.clients.consumer.KafkaConsumer [Consumer clientId=score_redeemX-7e37d43e-12ab-4c66-984e-5f959d4e5e08-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
INFO org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeemX-7e37d43e-12ab-4c66-984e-5f959d4e5e08-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
INFO org.apache.kafka.clients.consumer.KafkaConsumer [Consumer clientId=score_redeemX-7e37d43e-12ab-4c66-984e-5f959d4e5e08-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions

当我将“日志级别”设置为“调试”时,应用程序会生成以下日志:

2020-05-09 18:45:20,335 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:20,338 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:20,339 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:21,339 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:21,341 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:21,342 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:22,343 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:22,344 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:22,345 DEBUG org.apache.kafka.streams.processor.internals.StreamThread stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3] Committed all active tasks [] and standby tasks [] in 0ms
2020-05-09 18:45:22,435 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3-consumer, groupId=score_redeem] Sending Heartbeat request to coordinator ###.###.###.###:9092 (id: 2147483642 rack: null)
2020-05-09 18:45:22,437 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3-consumer, groupId=score_redeem] Received successful Heartbeat response
2020-05-09 18:45:22,465 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2-consumer, groupId=score_redeem] Sending Heartbeat request to coordinator ###.###.###.###:9092 (id: 2147483642 rack: null)
2020-05-09 18:45:22,466 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2-consumer, groupId=score_redeem] Received successful Heartbeat response
2020-05-09 18:45:22,468 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-1-consumer, groupId=score_redeem] Sending Heartbeat request to coordinator ###.###.###.###:9092 (id: 2147483642 rack: null)
2020-05-09 18:45:22,470 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-1-consumer, groupId=score_redeem] Received successful Heartbeat response

没有分配任务,如日志所示:

2020-05-09 18:43:04,322 DEBUG org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor stream-thread [score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2-consumer] Constructed client metadata {0633c837-8b94-4ba3-8e59-67a4c71de337=ClientMetadata{hostInfo=null, consumers=[score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2-consumer-b830a8e5-8654-40b7-8142-d1754c32268a], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions.

这是日志中显示领导和成员信息的部分:

2020-05-09 18:43:04,361 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3-consumer, groupId=score_redeem] Received successful JoinGroup response: JoinGroupResponse(throttleTimeMs=0, error=NONE, generationId=4, groupProtocol=stream, memberId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3-consumer-93905915-8b52-44b4-afd1-2a5c79a272a4, leaderId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-2-consumer-b830a8e5-8654-40b7-8142-d1754c32268a, members=)
2020-05-09 18:43:04,362 DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [Consumer clientId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3-consumer, groupId=score_redeem] Sending follower SyncGroup to coordinator ###.###.###.###:9092 (id: 2147483642 rack: null): (type=SyncGroupRequest, groupId=score_redeem, generationId=4, memberId=score_redeem-0633c837-8b94-4ba3-8e59-67a4c71de337-StreamThread-3-consumer-93905915-8b52-44b4-afd1-2a5c79a272a4, groupAssignment=)

我有很多应用程序正在运行,他们现在都经历了这种情况。我试着制作新的连接器,但仍然没有成功。但我可以通过kafka console consumer成功消费这些主题。

另一个奇怪但非常重要的一点是,当我用Kafka消费API重写应用程序时,它工作得很好!!!

共有1个答案

魏凡
2023-03-14

正如@bbejeck已经提到的,StreamThread-1-恢复-消费者日志取消订阅所有主题或模式和分配的分区,这是一个INFO级别的日志。它只是标记潜在状态恢复的结束,与读取输入主题无关。

消费者心跳日志只是记录预期行为。

其他日志:在0毫秒内提交所有活动任务[]和备用任务[]可能表明没有活动任务。这可以解释为什么没有进展。然而,它提出了一个问题:为什么没有任务。

在重新平衡过程中,每个实例都会记录任务分配。你能确认没有分配任务吗?此外,一个Kafka Streams实例(每个应用程序)将充当“组长”,从而记录有关其计算的任务的更多详细信息。(顺便问一下:你用的是什么版本?)

 类似资料:
  • 我正在构建一个由亚马逊服务提供支持的警报系统。 我每天将一个文件放到S3上,它生成一个lambda函数(我们称之为生成器函数)来处理该文件。 Generator基于此文件构建警报并将多条消息发布到SNS主题(让我们称之为发件箱)-由Generator计算的每个收件人一条消息。 我在发件箱中订阅了第二个lambda函数(我们称之为Courier),它应该接收每条消息并对其进行处理。 发电机代码: 以

  • 我有一个主题T,它有4个分区TP1、TP2、TP4和TP4。 假设我有8条消息M1到M8。现在当我的制作人将这些消息发送到主题T时,在以下场景下,Kafka经纪人将如何接收它们: 场景1:只有一个kafka broker实例具有前面提到的分区的主题T。 现在假设kafka broker实例1宕机,消费者会作何反应?我假设我的使用者正在读取broker实例1。

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 微信文档:https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.addTemplate.html 组合模板并添加至帐号下的个人模板库 $tid = 563; // 模板标题 id,可通过接口获取,也可登录小程序后台查看获取 $kidLi

  • 开普勒消息目前分为三大类:公告,告警和通知。 通知中根据不同的操作事件类型,分为十几个事件。每个事件都跟项目操作相关。便于接收项目操作变更的通知。 分类 事件 公告 Alarm 告警 Proclaim 通知 Build,Apply,Audit,Delete,Rollback,Logging,Reboot,Command,Storage,Extend... 订阅界面: 用户中心,点击头像,下拉菜单→