当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream消费者没有阅读来自Kafka主题的消息

常翰
2023-03-14

我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流

  • Spring boot starter父级:2.5.7
  • Spring云版本:2020.0.4

下面是代码和配置

application.yml

spring:
  zipkin:
    sender:
      type: kafka
  kafka:
    bootstrap-servers:
    - localhost:19091
  cloud:
    stream:
      bindings:
        audit-in-0:
          destination: com.tonitingaurav.kafka.log
          group: kafka-log-group
          consumer:
            concurrency: 10
            max-attempts: 3
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:19091

消息消费者类

@Configuration
public class LogConsumer {

    @Bean
    Consumer<Log> audit(){
        return log -> {
            System.out.println(log.getMessage());
        };
    }
}

下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。

@Component
public class LogEventPublisher {

    @Autowired
    @Qualifier(LogProducerKafkaConfig.KAFKA_LOG_PUBLISHER)
    MessageChannel messageChannel;

    public void logMessage(Log log) {
        Message<Log> message = MessageBuilder.withPayload(log).build();
        messageChannel.send(message);
    }

}

pom.xml

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

共有1个答案

羊舌涵涤
2023-03-14

您已经在这里发布了一个非常类似的问题,并提供了两种不同解决方案的回复。此外,以下是您可以作为起点使用的示例:https://github.com/spring-cloud/spring-cloud-stream-samples

 类似资料:
  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...