我正在编写一个将处理事件消息的应用程序(发布到主题file-adafade-完成
)。我有多个endpoint应该使用这些消息(元数据-Reader
和配额检查器
),出于实用的原因,我想将这些endpoint一起部署在一个聚合包中。
有了Spring Cloud Stream,我可以使用Spring。云流动绑定。文件上载已完成。group=元数据读取器,为第一个endpoint设置消费者组;我还想在配额检查组下处理消息,但基于属性的配置只允许每个消息队列绑定一个消费者组。
有没有办法配置Spring Cloud Stream,以便我可以绑定到同一应用程序中多个消费者组下的单个消息队列?
AFAIK消费者群体是指集群中的独生子女消费者。如果您的两个endpoint位于同一个应用程序中,则没有理由提取新的消费者群。您只需在内部将传入消息发布或订阅到所有endpoint即可。
但基于属性的配置只允许每个消息队列绑定一个使用者组。
每个消息队列是指每个绑定吗?
此外,当您指定属性spring.cloud.stream.bindings.file-upload-completed.group=元数据-阅读器
时,文件-上传-完成
在这里表示绑定的目标名称(例如:频道名称),您可以定义尽可能多的频道及其与绑定到特定目标的特定消费者组的绑定(主题:文件-上传-完成
)
spring.cloud.stream.bindings.mychannel1.destination=file-upload-completed
spring.cloud.stream.bindings.mychannel1.group=metadata-reader
spring.cloud.stream.bindings.mychannel2.destination=file-upload-completed
spring.cloud.stream.bindings.mychannel2.group=metadata-reader
使用上述配置,您可以执行连接等操作,如下所示:
@StreamListener
public void receive(@Input(Processor.INPUT) SubscribableChannel input1,
@Input("mychannel2") SubscribableChannel input2) {
// perform operations (join etc.,)
}
由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。
我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消
问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在
我有一个Kafka系统,看起来像这样(所有消费者都在一个消费者群体中): 在每个消费者中,我轮询消息,然后进行昂贵的计算(从1到60秒)。如果操作成功,我将提交消费者。 在我提交之前,另一个使用者是否会开始处理相同的消息?我需要保证,一旦消息被拾取,它就会被只执行一次 - 除非处理中途失败。
我是Guice的新手,正在为以下用例寻求帮助: 我开发了一个软件包(PCKG),其中该软件包的入门级依赖于其他类,例如: 在我的绑定模块中,我正在做: 注意我没有为A提供绑定信息,因为我想通过它的消费者类来提供它的绑定。(设计就是这样,所以我的要求是继续讨论主要问题,而不是设计)。 现在,我的消费者阶层正在做这样的事情: 同样在我的消费者包装中: Q1。我在消费者类中做的绑定正确吗?我很困惑,因为
我用java编写我所有的微服务。我想在Amazon SQS中使用多个消费者,但每个消费者在负载均衡器后面的AWS上有多个实例。 我使用SNS作为输入流 我在SNS之后使用SQS标准队列。 我在stackoverflow上发现了同样的问题(使用多个消费者的Amazon SQS) 此示例为 https://aws.amazon.com/fr/blogs/aws/queues-and-notificat