我使用的是Spring Cloud Stream和RabbitMQ,我在同一个应用程序中配置了生产者和消费者,但使用了两个不同的通道。问题是,我的应用程序有五个副本在Kubernetes上运行,但只有发送消息的pod会使用它们,其他四个不会使用。在RabbitMQ控制台上,没有收到任何消息,消息速率图不会更改。当我注释OrdersListener时,消息会成功发送到RabbitMQ集群。有人知道可能出了什么问题吗?
以下是制片人频道:
@Component
public interface OrdersChannel {
@Output("Orders")
MessageChannel publishOrders();
}
这是消费者渠道:
@Component
public interface OrdersListenerChannel {
String BINDING = "Orders";
@Input(BINDING)
SubscribableChannel input();
}
以下是生成消息的服务:
@Service
@RequiredArgsConstructor
@EnableBinding(OrdersChannel.class)
public class OrdersEventService {
private final OrdersChannel ordersChannel;
public void sendOrderEvents(final Set<Order> orders) {
orders.parallelStream().forEach(order ->
ordersChannel.publishOrders()
.send(message(OrderMessageDTOBuilder.build(order))));
}
private static <T> Message<T> message(T val) {
return MessageBuilder.withPayload(val).build();
}
}
下面是听众:
@Component
@RequiredArgsConstructor
@EnableBinding(OrdersListenerChannel.class)
public class OrdersListener {
private final OrderService orderService;
@StreamListener(OrdersListenerChannel.BINDING)
public void listen(final OrderMessageDTO orderMessageDTO) {
// orderService method call
}
}
我的application.properties是这样的:
spring.cloud.stream.bindings.Orders.destination=GeneralExchange
spring.cloud.stream.bindings.Orders.producer.requiredGroups=OrdersQueue
spring.cloud.stream.rabbit.bindings.Orders.producer.bindingRoutingKey=OrdersQueue
spring.cloud.stream.rabbit.bindings.Orders.producer.routingKeyExpression='OrdersQueue'
spring.cloud.stream.bindings.OrdersListenerChannel.group=OrdersQueue
spring.cloud.stream.bindings.OrdersListenerChannel.consumer.max-attempts=3
spring.cloud.stream.bindings.OrdersListenerChannel.consumer.concurrency=10
spring.cloud.stream.rabbit.bindings.OrdersListenerChannel.consumer.bind-queue=false
spring.cloud.stream.rabbit.bindings.OrdersListenerChannel.consumer.republish-to-dlq=false
所以,您要问几个问题。首先,您的配置和编程模型(基于注释)早已弃用,实际上将在下一个主要版本中删除。我们渴望切换到更简单的函数式编程模型。
关于你的副本。我认为这与你所观察到的没有任何关系。您实际上是在说您有五个消费者,但只有一个收到消息。这意味着您没有在RabbiotMQ中配置发布子模型。这类似于未配置或未理解交换类型和队列路由密钥之间的RabbitMQ关系。您的exchange必须是正确路由到绑定队列的类型(例如,作为路由密钥)
目前,我让生产者和消费者在同一个Spring Boot应用程序中进行配置,但触发消息的Spring云流没有通过Kafka(我正在使用Kafka控制台消费者监控消息),但消费者仍然收到消息(使用与生产者相同的线程)。 如果我在应用程序中删除consumerHandler(@StreamListener),制作人就会成功地将消息发送给Kafka。 这个有什么配置吗?默认情况下,我需要将Spring云流
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
我刚开始使用ActiveMQ Artemis,并在我的机器上安装了Artemis2.17.0。创建了SpringBoot测试应用程序,其中存在JMS和MQTT发布者和接收者。还创建了小的RestController,这样我就可以使用JMS和MQTT生成器发送消息。接收器非常简单,只需创建一条日志消息到控制台。现在,当我使用MQTT生产者创建消息时,JMS和MQTT接收器都将消息获取并记录到控制台。
下面我用一篇关于临时排队的文章来解释我的想法,我只想知道我对还是错。 参考链接:如何使用JMS实现请求响应 “创建临时目的地、消费者、生产者和连接都是与代理同步的请求-响应操作,因此在处理每个请求时应避免,因为它会导致与JMS代理进行大量聊天。” 我不明白这句话在咒骂什么?在不同的线程中我们可以访问临时队列吗?一点道理都没有?有人能解释一下吗
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。