当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Streaming-生产者的独立连接

虞华彩
2023-03-14

我有一个使用RabbitMQ的Spring Cloud Streaming transformer应用程序。它从兔子队列中读取数据,进行一些转换,然后写入兔子交换。我将应用程序部署到PCF,并绑定到Rabbit服务。

这工作正常,但是现在我需要一个单独的连接来消费和产生消息。(我想使用一个连接从Rabbit队列中读取,并使用不同的连接写入Rabbit交换)。我将如何配置这个?是否可以将我的应用程序绑定到2个不同的Rabbit服务,使用1作为生产者,1作为消费者?

共有1个答案

艾善
2023-03-14

好吧,从版本1.3开始,Rabbit Binder确实为生产者创建了一个单独的ConnectionFactory:https://docs.spring.io/spring-cloud-stream/docs/Ditmars.RELEASE/reference/htmlsingle/#_rabbitmq_binder

从1.3版开始,RabbitMessageChannelBinder为非事务性生产者创建一个内部ConnectionFactory副本,以避免消费者在共享时死锁,缓存的连接因代理上的内存警报而被阻塞。

所以,升级到Spring Cloud Stream Ditmars之后,这可能就足够了。

更新

如何使用不同的连接属性配置此内部ConnectionFactory副本?

不,那是不同的故事。您需要的是多活页夹支持:https://docs.spring.io/spring-cloud-stream/docs/Ditmars.RELEASE/reference/htmlsingle/#multiple-活页夹

您应该为不同的连接工厂声明几个块:

spring.cloud.stream.bindings.input.binder=rabbit1
spring.cloud.stream.bindings.output.binder=rabbit2

...

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: foo
          binder: rabbit1
        output:
          destination: bar
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>
 类似资料:
  • 我是Kafka的新手,我想验证我的设计。下面是我所拥有的。 我有一个生产者发布到一个主题,有一堆容器(部署我的web应用程序的地方),每个容器上都运行着一个消费者。这些消费者不在消费者组中,也不独立地消费消息。每个消费者都应该阅读主题中的所有消息。例如,假设主题m0,m1,m2上有3条消息,那么consumer1到consumerN应该独立地读取m0,m1,m2。每个使用者在处理读取的消息后立即提

  • 在我的程序中,我正在访问wep api。最多可以有7个不同的线程访问web api的不同服务器。每个线程负责一个服务器,每个服务器速率限制每个线程。每个线程更新相同的mysql数据库。线程数保持不变。 在我的示例中,是否需要连接池?我不应该只打开7个不同的连接,这些连接将在程序的生命周期中打开吗?

  • 我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统

  • 我有一组Kafka代理实例作为集群运行。我有一个客户正在生产数据给Kafka: 当我们使用tcpdump进行监控时,我可以看到只有到broker1和broker2的连接被建立,而对于broker3,没有来自我的生产者的连接。我有一个只有一个分区的单一主题。 我的问题是: > 为什么在我的情况下,我无法连接到broker3?或者至少我的网络监控没有显示我的制作人与broker3建立了连接? 如果我能

  • kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。 Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。 我使用文档中的裸露骨骼示例: 我收到这个错误: 单步通过( /usr/lib/python2.6/site-

  • 问题内容: 我想向同一队列发送一批20k JMS消息。我使用10个线程将任务拆分,因此每个线程将处理2k条消息。我不需要交易。 我想知道是否建议建立一个连接,一个会话和10个生产者? 如果所有线程共享一个生产者,该怎么办?我的消息会损坏还是会同步发送(不会提高性能)? 如果我总是连接到同一队列,那么决定是创建新连接还是会话的一般指导方针是什么? 谢谢你,很抱歉一次问了很多。 问题答案: 如果某些消