当前位置: 首页 > 知识库问答 >
问题:

Spring云流KStream消费者并发没有影响?

姜弘新
2023-03-14
spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'
root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278
spring.cloud.stream.bindings.input:
  destination: kstream_test
  group: consumer-group-G1_test
  consumer:
    useNativeDecoding: true
    headerMode: raw
    startOffset: latest
    partitioned: true
    concurrency: 3
    @StreamListener
    @SendTo(MessagingStreams.OUTPUT)
    public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
        ......
        log.info("Got a message");
        ......
        return kstreams;
    }
2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.923  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.945  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.956  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message
2018-10-18 11:50:01.972  INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler     : Got a message

更新:

根据答案,下面的num.stream.threads配置在绑定器级别工作。

spring.cloud.stream.kafka.streams.binder.configuration:
 num.stream.threads: 3

共有1个答案

平学
2023-03-14

似乎需要设置num.stream.threads以增加并发...

/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";

...它默认为1。

绑定器应该根据...consumer.concurrency属性设置;请在绑定器上打开一个github发行版。

 类似资料:
  • 我想知道使用spring时“并发”的确切含义。云流动违约消费者并发属性。 文档(https://docs.spring.io/spring-cloud-stream/docs/Chelsea.RELEASE/reference/html/_configuration_options.html)表示“入站消费者的并发”,这可以通过多种方式解释。 幕后创建了什么类型的线程执行器? 谢啦!

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

  • 我有一个服务,它从不同的Spring云流通道(绑定到EventHub/Kafka主题)生成和使用消息。有几种设置类似的服务。 配置如下所示 生产者/发布者代码如下所示 类似地,我还有多个其他发布者发布到不同的活动中心/主题。请注意,每个已发布的消息都有一个租户id标头。这是我的多租户应用程序特定于跟踪租户上下文的内容。还请注意,在发送消息时,我正在获取要发布到的频道。 我的消费者代码如下所示 同样

  • 我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC

  • 使用Spring Cloud Stream版本Chelsea. SR2,RabbitMQ作为消息代理。要拥有多个消费者,我们使用属性并发(入站消费者的并发)。 如果我们将并发设置为50。它从1开始,慢慢地增加消费者计数。有没有任何可能的解决方案可以使用更高的数字而不是一个来启动初始消费者计数,以提高消费者性能。