当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream-Solace PubSub+-消费者并发性

宇文飞翮
2023-03-14

我正在使用Spring Cloud Stream 3.0.6(Cloud:hoxton.sr6,Boot 2.3.0.release)和Solace PubSub+。我不能让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。

以下是我的StreamListener代码:

    @StreamListener(JobTriggerEventConsumerBinding.INPUT)
    protected void onJobTriggerEvent(org.springframework.messaging.Message<JobExecutionTriggerEvent> message, 
                                     JobExecutionTriggerEvent event, 
                                     MessageHeaders headers) throws InterruptedException {
        
        log.info("Processing on thread: " + Thread.currentThread().getId());
      
        Thread.sleep(5000);
        
        log.info("Received the event!");
        log.info("-- Raw message:    {}", message);
        log.info("-- Headers:        {}", headers);
        log.info("-- Event:          {}", event);
        log.info("-- Event Contents: {}", event.getMessage());
    }
spring:
  cloud:
    stream:
      default:
        group: defaultConsumers
        consumer:
          concurrency: 3
      bindings:
        jobTriggers:
          group: jobTriggerConsumers 
          consumer:
            concurrency: 3
            max-attempts: 1
      solace: 
        bindings:
          jobTriggers:
            consumer:
              requeue-rejected: true
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <!-- Dependency to Solace PubSub+ Spring Cloud Stream integration (binder) -->
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-solace</artifactId>
      <version>2.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-cloud-connectors</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

这里会有什么问题?

  • 安慰pubsub+活页夹
  • 本地运行的Solace PubSub+实例的Docker组合文件:
# docker-compose -f PubSubStandard_singleNode.yml up
version: '3.3'

services:
  primary:
    container_name: pubSubStandardSingleNode
    image: solace/solace-pubsub-standard:latest
    shm_size: 1g
    ulimits:
      core: 1
      nofile:
        soft: 2448
        hard: 38048
    ports:
    #Port Mappings:  Ports are mapped straight through from host to
    #container.  This may result in port collisions on commonly used
    #ports that will cause failure of the container to start.
      #Web transport
      - '80:80'
      #Web transport over TLS
      - '443:443'
      #SEMP over TLS
      - '943:943'
      #MQTT Default VPN
      #- '1883:1883'
      #AMQP Default VPN over TLS
      - '5671:5671'
      #AMQP Default VPN
      - '5672:5672'
      #MQTT Default VPN over WebSockets
      #- '8000:8000'
      #MQTT Default VPN over WebSockets / TLS
      #- '8443:8443'
      #MQTT Default VPN over TLS
      #- '8883:8883'
      #SEMP / PubSub+ Manager
      - '8080:8080'
      #REST Default VPN
      #- '9000:9000'
      #REST Default VPN over TLS
      #- '9443:9443'
      #SMF
      - '55555:55555'
      #SMF Compressed
      #- '55003:55003'
      #SMF over TLS
      - '55443:55443'
    environment:
      - username_admin_globalaccesslevel=admin
      - username_admin_password=admin
      - system_scaling_maxconnectioncount=100

共有1个答案

水品
2023-03-14

好的,我自己来回答那个问题。

用RabbitMQ绑定器和一个正在运行的rabbit实例尝试上面的配置,并发工作得很好。所以我认为一定是安慰活页夹在制造问题。

在谷歌搜索之后,我确实找到了确认:https://github.com/solaceproducts/solace-spring-cloud/issues/7

<dependency>
 <groupId>com.solace.spring.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-solace</artifactId>
 <version>2.1.1</version
</dependency>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.solace.spring.cloud</groupId>
      <artifactId>solace-spring-cloud-bom</artifactId>
      <version>1.1.1</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
 </dependencies>
</dependencyManagement>

这适用于Spring Cloudhoxton.sr6

 类似资料:
  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

  • 我对RabbitMQ很陌生,所以如果我的问题听起来很琐碎,请原谅。我想在RabbitMQ上发布消息,它将由RabbitMQ消费者处理。 我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但QueueBasicConsumer一次推送一条消息。我如何编程来利用我可以同时处理多个消息的所有核心。 一种解决方案是在多个线程中打开多个通道,然后在那里处理消息。但在这种情况下,我将如何决定线程

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 主要内容:1 start启动服务定时清理过期消息,1.1 cleanExpireMsg清理过期消息,1.2cleanExpiredMsg清理过期消息,2 submitConsumeRequest提交消费请求,2.2 submitConsumeRequestLater延迟提交,2.2 consumeMessageBatchMaxSize和pullBatchSize,3 ConsumeRequest执行消费任务,,,,基于RocketMQ release-4.9.3,深入的介绍了ConsumeMes

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?