当前位置: 首页 > 知识库问答 >
问题:

spring-cloud-stream kafka消费者并发

惠诚
2023-03-14

使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到html" target="_blank">循环分区。

s-c-s文档根本没有提到spring.cloud.stream.bindings.*.concurrency,尽管这在我上面描述的用例中似乎很重要。带有生产者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save:
          destination: customer-save
          group: customer-save
          content-type: application/json
          partitionCount: 3

和使用者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save: 
          destination: customer-save
          group: customer-save
          content-type: application/x-java-object;type=foo.Customer
          partitioned: true
          concurrency: 3

当我没有指定partitionKeyExpression或PartitionKeyExtractorClass时,是否有一些默认的键提取和分区策略在生产者上使用?这是一种使用kafka设置s-c-s消费者的适当方法吗?在kafka中,您需要多个线程来消费消息,以提高消费者吞吐量?

共有1个答案

邵阳德
2023-03-14

由于您的生产者没有分区(没有partitionkeyexpression),生产者端将在3个分区上循环(如果这不是观察到的行为,请在Git Hub中打开一个票证)。如果您配置了PartitionKeyExpression,那么生成器将根据配置的逻辑有效地对数据进行分区。

在消费者端,我们确保线程/分区的亲和性,因为这是一个广受尊重的Kafka约定--我们确保给定分区上的消息是按顺序处理的--这可能是您所观察到的行为的原因。如果将消息A、B、C、D发送到分区0、1、2、0-D将不得不等到A被处理完,即使有另外两个线程可用。

增加吞吐量的一个选择是过分区(这是Kafka中相当典型的策略)。这将进一步分散消息,并增加消息发送到不同线程的机会。

如果您不关心排序,增加吞吐量的另一个选择是在下游异步处理消息:例如,通过将输入通道桥接到ExecutorChannel。

一般来说,partitied是指客户端接收分区数据的能力(Kafka客户端总是分区的,但此设置也适用于Rabbit和/或Redis)。它与属性instanceIndexinstanceCount结合使用,以确保主题的分区在多个应用程序实例之间正确划分(另请参见http://docs.spring.io/spring-cloud-stream/docs/1.0.0.m4/reference/htmlsingle/index.html#_instance_index_and_instance_count)

 类似资料:
  • 对于具有多个分区的主题- 1)单个SpringBoot实例是否使用多个线程来处理来自每个分区的每个消息(使用StreamListener注释的方法)? 2)是否可以为每个分区配置多个线程,或者我必须手动从监听器线程切换到工作池?

  • 我已经建立了一个生产者Spring云流应用程序和Kafka作为活页夹。以下是application.yml: 我有两个实例(同一个应用程序运行在一个jvm上)作为消费者。以下是application.yml: 我对Kafka群体的理解是,对于同一群体中的消费者来说,消息只会被消费一次。假设生产者应用程序产生消息A、B,而同一组中有两个消费者应用程序,则消息A将由消费者1读取,消息B、C将由消费者2

  • 我正在使用Spring Cloud Stream 3.0.6(Cloud:hoxton.sr6,Boot 2.3.0.release)和Solace PubSub+。我不能让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。 以下是我的代码: 这里会有什么问题? 安慰pubsub+活页夹 本地运行的Solace PubSub+实例的Docker组合文件:

  • 上文我们创建了注册中心,以及服务的提供者microservice-provider-user,并成功地将服务提供者注册到了注册中心上。 要想消费microservice-provider-user的服务是很简单的,我们只需要使用RestTemplate即可,或者例如HttpClient之类的http工具也是可以的。但是在集群环境下,我们必然是每个服务部署多个实例,那幺服务消费者消费服务提供者时的负

  • 我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数

  • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre