当前位置: 首页 > 知识库问答 >
问题:

多重消费者Spring Kafka

袁炳
2023-03-14

问题是Spring Kafka侦听器只配置了主题名。

@Service
public class Consumer {

    @KafkaListener(topics = "${app.topic}")
    public void receive(@Payload String message,
                        @Headers MessageHeaders headers) {
        System.out.println("Received message="+message);
        headers.keySet().forEach(key -> System.out.println(key+"->"+headers.get(key)));
    }
}

我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?

共有1个答案

羊舌庆
2023-03-14

请查看以下答案,以了解Apache中的Kafka使用者Kafka为什么不能有比分区更多的使用者实例?

要在单个使用者组中正确地分发消息,您必须有多个分区。一旦为负载找到了正确的分区数量,我将使用spring cloud streaming来更好地管理并发性和使用者组分配。

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

水槽样品

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

public static void main(String[] args) {
    SpringApplication.run(LoggingConsumerApplication.class, args);
}

@StreamListener(Sink.INPUT)
public void handle(Person person) {
    System.out.println("Received: " + person);
}

public static class Person {
    private String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String toString() {
        return this.name;
    }
}
}
cloud:
  stream:
    bindings:
      input:
        destination: <topic-name>
        group: <consumer-group>
        consumer:
          headerMode: raw
          partitioned: true
          concurrency: 20
 类似资料:
  • 然而,当在我的环境中测试此示例时,我得到了一个异常。

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我们观察到,其中一位消费者多次试图从Kafka主题中选取事件。我们在消费者应用程序方面有以下内容。spring.kafka.consumer.enable auto commit=false

  • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者

  • 想要从使用的Spring启动应用程序的不同集群上创建同质。 即想要为已经定义的类创建一个 Kafka Consumer 对象,该对象侦听动态定义的多个集群。 例如:假设一个Spring启动应用程序S,其中包含kafkaconsumer的