当前位置: 首页 > 知识库问答 >
问题:

如何使用RMQ和spring cloud stream创建基于分区的消费者

羊舌承天
2023-03-14

如果我有3个由producer创建的分区,如果我在CF中部署3个实例,每个实例选择一个队列并使用索引处理消息,那么我可以使用cloud stream和rabbit mq开发示例消费者。

现在的问题是,如果我有10个分区,我似乎需要10个实例,这是浪费资源,我们可以让一个消费者监听多个分区吗。我之所以有基于分区的生产者,是因为对我来说,消息序列是处理事务的顺序。

共有2个答案

周学义
2023-03-14

为什么你会认为这是浪费资源?如果您的需求表明您需要有状态处理,并且您正在拆分为多个分区,那么您将需要N个消费者来处理N个分区。

如果在同一队列上混合不同分区的消息,您的排序将受到影响。除非您添加一些逻辑来基于特定元数据聚合消息。

王昆
2023-03-14

这里有一种方法。。。

@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {

    public static void main(String[] args) {
        SpringApplication.run(So43661064Application.class, args);
    }

    @StreamListener("input1")
    public void foo1(String in) {
        doFoo(in);
    }

    @StreamListener("input2")
    public void foo2(String in) {
        doFoo(in);
    }

    protected void doFoo(String in) {
        System.out.println(in);
    }

    public interface TwoInputs {

        @Input("input1")
        SubscribableChannel input1();

        @Input("input2")
        SubscribableChannel input2();

    }

}

spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0

spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1

这将消耗生产者在回答您的另一个问题时创建的2个分区。

目前没有办法让StreamListener直接侦听2个分区。

编辑

下面是另一种方法,使用exchange-

制作人

@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43614477Application.class, args).close();
    }

    @Autowired
    private MessageChannel output;

    @Autowired
    private AmqpAdmin admin;

    @Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
    private int partitionCount;

    @Value("${spring.cloud.stream.bindings.output.destination}")
    private String destination;

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < this.partitionCount; i++) {
            String partition = this.destination + "-" + i;
            TopicExchange exchange = new TopicExchange(partition);
            this.admin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
                    .with(partition);
            this.admin.declareBinding(binding);
        }

        output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
        output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
    }

}

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2

消费者

@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {

    public static void main(String[] args) {
        SpringApplication.run(So43661064Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void foo1(String in) {
        System.out.println(in);
    }

}

spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1

主exchange中的分区被路由到分区exchange,使用者获得一个要将其队列绑定到的exchange列表。

您可以在命令行中传递该列表。

 类似资料:
  • 我试图找到spring cloud stream的例子,它为RMQ创建了基于分区的生产者。我想看看它将如何为这些队列创建绑定,因为RMQ本身不支持主题的分区,但它将创建与分区数量相等的队列数量(我读到了这篇文章,可能是错的)。首先,我想了解如何在RMQ上使用spring cloud stream为基于分区的producer创建producer。

  • 这是一个关于Kafka和信息如何被消费的非常基本的问题,但不幸的是,我在这一点上找不到任何答案。 假设我想过度分区,那么我将得到比消费者多10倍的分区。过度分区是必需的,因为我希望能够扩展(在未来并行处理更多的消息)。 1 个主题分为 1000 个分区,由 100 个使用者使用 =- 我的问题是: > 消息是如何为每个消费者消费的:它是以循环方式完成的吗?如果不是,分发是如何完成的? 有没有保证消

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我试图提出一个设计,使用Kafka为多个处理代理并行处理来自Kafka主题的消息。 null 或者还有什么我遗漏的地方可能有助于我对这一点的理解?

  • 假设我有一个名为“MyTopic”的主题,它有3个分区P0、P1和P2。这些分区中的每一个都有一个leader,并且本主题的数据(消息)分布在这些分区中。 1.Producer将始终根据代理上的负载以循环方式写到分区的领导者。对吗? 2.制作人如何认识隔断的首领?

  • 谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但