Kafka新手。
Kafka版本:2.3.1
我正在尝试使用Spring cloud使用来自两个主题的Kafka消息。除了kafka活页夹和下面的一些简单配置之外,我没有做太多配置。每当(组协调器lbbb111a.uat.pncint.net:9092(id:2147483641机架:null)不可用或无效时,将尝试重新发现)发生时,已经处理的一堆消息会再次被处理。不确定发生了什么。
spring.cloud.stream.kafka.binder.brokers: xxxxx:9094
spring:
cloud:
stream:
default:
group: bbb-bl-kyc
bindings:
input:
destination: bbb.core.sar.blul.events,bbb.core.sar.bluloc.events
contentType: application/json
consumer:
headerMode: embeddedHeaders
spring.kafka.consumer.properties.spring.json.trusted.packages: "*"
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
#Custom Serializer configurations to secure data
spring.cloud.stream.kafka.binder.configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArraySerializer
value.deserializer: pnc.aop.core.kafka.serialization.MessageSecuredByteArrayDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
2020-05-29 07:01:11.389 INFO 1 --- [container-0-C-1] p.a.b.k.service.KYCOrchestrationService : Done with Customer xxxx MS call response handling Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-29 07:01:11.393 INFO 1 --- [container-0-C-1] p.a.b.kyc.service.DMSIntegrationService : Message written to the DMS topic successfully 159073553171893
2020-05-29 07:01:11.394 INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService : Message written to Admin console Application Log topic successfully Confm Id: 159073553171893 Appln Id: HSUKQJDJNZNMWVZZ
2020-05-30 17:21:13.140 INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:13.122 INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null) is unavailable or invalid, will attempt rediscovery
2020-05-30 17:21:14.522 INFO 1 --- [ad | bbb-bl-kyc] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:14.692 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Discovered group coordinator lbbb111a.uat.pncint.net:9092 (id: 2147483641 rack: null)
2020-05-30 17:21:15.151 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-4-f5a03efd-75cd-425b-94e1-efd3d728d7ca is not valid.
2020-05-30 17:21:15.152 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.173 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions revoked: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:15.141 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Attempt to heartbeat failed for since member id consumer-2-52012bae-1b22-4211-b107-803fb3765720 is not valid.
2020-05-30 17:21:15.175 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:15.176 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Revoking previously assigned partitions [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions revoked: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:15.184 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] (Re-)joining group
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Successfully joined group with generation 66
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.200 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Setting newly assigned partitions: bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.blul.events-0
2020-05-30 17:21:18.203 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Found no committed offset for partition bbb.core.sar.bluloc.events-0
2020-05-30 17:21:18.537 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-2, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.blul.events-0 to offset 4.
2020-05-30 17:21:18.538 INFO 1 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-4, groupId=bbb-bl-kyc] Resetting offset for partition bbb.core.sar.bluloc.events-0 to offset 0.
2020-05-30 17:21:18.621 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions assigned: [bbb.core.sar.blul.events-0]
2020-05-30 17:21:18.625 INFO 1 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : bbb-bl-kyc: partitions assigned: [bbb.core.sar.bluloc.events-0]
2020-05-30 17:21:18.822 INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener : Initiating KYC Orchestration 159071814927374
2020-05-30 17:21:18.826 INFO 1 --- [container-0-C-1] p.a.b.k.stream.KYCbbbCoreEventsListener : Initiating KYC Orchestration null
2020-05-30 17:21:18.928 INFO 1 --- [container-0-C-1] p.a.b.k.s.AdminConsoleProducerService : Message written to Admin console Application topic successfully Confm Id: null Appln Id: XQZ58K3H3XZADTAT
在不改变大部分消费者配置的情况下,您将至少拥有一次交付语义。
当组协调器暂时不可用时,您的消费者将无法提交它处理的消息。重新加入后,您的消费者将再次处理相同的消息(因为它们尚未提交),导致重复。
您可以在这里找到有关GroupCoordinator和交付语义的更多详细信息
我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理
有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。
由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM
我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?