当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka Binder未收到任何消息,但已连接到主题

柯翔
2023-03-14

你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。

你知道可能遗漏了什么吗?非常感谢。

聚甲醛

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>

应用程序YML

cloud:
zookeeper:
  connect-string: port1.test.com:2181,port2.test.com:2181,port3.test.com:2181
stream:
  kafka:
    binder:
      brokers:
        - port1.test.com:6667
        - port2.test.com:6667
        - port3.test.com:6667
      auto-create-topics: false
      auto-add-partitions: false
      jaas:
        controlFlag: REQUIRED
        loginModule: com.sun.security.auth.module.Krb5LoginModule
        options:
          useKeyTab: true
          storeKey: true
          serviceName: kafka
          # Change location to your local location
          keyTab: C:\\Users\\src\\main\\resources\\kafka\\kafka_user.keytab
          principal: kafka_user@TEST.COM
          debug: true
      configuration:
        security:
          protocol: SASL_PLAINTEXT
  bindings:
    stream-input:
      binder: kafka
      destination: TOPIC
      group: service-dev
security:
  krb5conf:
    # Change location to your local location
    location: C:\\Users\\src\\main\\resources\\kafka\\krb5nonprod.conf

消费者阶层

public interface EventConsumer {

@Input("stream-input")
SubscribableChannel consumeMessage();
}

侦听器类

@Slf4j
@Component
@RequiredArgsConstructor
@EnableBinding(EventConsumer.class)
public class EventListener {

@StreamListener(target = "stream-input")
public void processMessage(Object msg) {

日志

Started Application in 75.471 seconds (JVM running for 184.663)
2021-09-29 19:45:01.342  INFO 30340 --- [container-0-C-1] org.apache.kafka.clients.Metadata        
: [Consumer clientId=consumer-2, groupId=service-dev] Cluster ID: qa_IFa70SravzxvdcDhHA
2021-09-29 19:45:01.390  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
Discovered group coordinator port1.test.com:6667 (id: 2147482644 rack: null)
2021-09-29 19:45:01.399  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
Revoking previously assigned partitions []
2021-09-29 19:45:01.400  INFO 30340 --- [container-0-C-1] 
o.s.c.s.b.k.KafkaMessageChannelBinder$1  : service-dev: partitions revoked: []
2021-09-29 19:45:01.401  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
(Re-)joining group
2021-09-29 19:45:01.854  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
(Re-)joining group
2021-09-29 19:45:04.387  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
Successfully joined group with generation 36
2021-09-29 19:45:04.400  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
Setting newly assigned partitions: TOPIC-0
2021-09-29 19:45:04.481  INFO 30340 --- [container-0-C-1] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=service-dev] 
Setting offset for partition TOPIC-0 to the committed offset FetchPosition{offset=1076, 
offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=port1.test.com:6667 (id: 1003 
rack: /default-rack), epoch=2}}
2021-09-29 19:45:04.557  INFO 30340 --- [container-0-C-1] 
o.s.c.s.b.k.KafkaMessageChannelBinder$1  : service-dev: partitions assigned: [TOPIC-0]

从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些什么才能让我接收消息吗?很明显我能够连接到主题。谢谢!

共有1个答案

宋烨烁
2023-03-14

原木看起来不错;这意味着在该组的当前提交偏移量之后,没有更多的记录可读取

>Setting offset for partition TOPIC-0 to the committed offset FetchPosition{offset=1076 ...

如果要重读整个主题,可以设置resetOffsets kafka consumer绑定属性,或者将更改为没有提交偏移量的组。

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.4/reference/html/spring-cloud-stream-binder-kafka.html#reset-抵消

 类似资料:
  • 我正在尝试设置一个基本的Java消费者来接收来自Kafka主题的消息。我在-https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例-并具有以下代码: 和 Kafka在有问题的EC2主机上运行,我可以使用kafka-console-producer.sh和kafka-console-consumer.sh工具发送和接收关于主题“测试

  • 当监听设备后,会返回接收到的消息数据。 请求方式: 无 返回值: "|4|2|5|message|" 返回接收到的消息 参数 message 返回的消息内容

  • 我的LocalBroadcastManager回调函数不接收消息。有人能告诉我为什么吗? 我尝试在我的三星Galaxy S3 mini(4.1.2-Jelly Bean,API 16)上实现这一点。 SenderClass: 主要活动 编辑:这是清单 显示

  • 我正在为云消息使用xamarin iOS FirebasePushNotificationPlugin,但我正在接收令牌,但我无法接收任何通知。< code > crossfirebase push notification。current . OnNotificationReceived 甚至不触发。 以下是我的appDelegate的外观。 以下是我收到通知的方式: 我已验证 .启用后台模式-

  • 当订阅的topic接收到消息后,会返回接收到的消息数据。 请求方式: 无 返回值: "|4|1|5|topic|message|\r" 返回接收到的消息 参数 topic 返回消息的topic message 返回的消息内容

  • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志: