当前位置: 首页 > 知识库问答 >
问题:

如何使用RMQ和spring cloud stream创建基于分区的生产者?

司承业
2023-03-14

我试图找到spring cloud stream的例子,它为RMQ创建了基于分区的生产者。我想看看它将如何为这些队列创建绑定,因为RMQ本身不支持主题的分区,但它将创建与分区数量相等的队列数量(我读到了这篇文章,可能是错的)。首先,我想了解如何在RMQ上使用spring cloud stream为基于分区的producer创建producer。

共有1个答案

狄宪
2023-03-14
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43614477Application.class, args);
    }

    @Autowired
    private MessageChannel output;

    @Override
    public void run(String... args) throws Exception {
        output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
        output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
    }

}

使用属性。。。

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2

spring.cloud.stream.bindings.output.producer.required-groups=bar

我添加了必需组,以便您可以查看消费者队列是如何绑定的。

foo交换绑定:

表达式可以是根据消息计算的任何有效表达式(例如,有效负载。hashCode());然后根据分区计数对其进行修改,以确定最终分区。

 类似资料:
  • 如果我有3个由producer创建的分区,如果我在CF中部署3个实例,每个实例选择一个队列并使用索引处理消息,那么我可以使用cloud stream和rabbit mq开发示例消费者。 现在的问题是,如果我有10个分区,我似乎需要10个实例,这是浪费资源,我们可以让一个消费者监听多个分区吗。我之所以有基于分区的生产者,是因为对我来说,消息序列是处理事务的顺序。

  • 我有几个Samza工作运行所有阅读Kafka主题的消息,并为新主题编写新消息。为了发送新消息,我使用Samza内置的OutgoingMessageEnvelope。并使用MessageCollector发送新消息。它看起来是这样的:

  • null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?

  • 我想用Intellij IDEA中的Gradle启动一个空项目,但它似乎不像Eclipse那样方便。 我所做的是文件->新项目->分级。并且显示了一个项目,但它没有完成(例如,它没有src文件夹)。 并且我在build.gradle中添加了。当我提到IntelliJ Idea中的所有任务时。没有可供我建立IntelliJ想法。 在Eclipse+gradle插件中,只需创建一个新的gradle项目

  • 所以,我想对我的spark数据帧执行某些操作,将它们写入DB,并在最后创建另一个数据帧。看起来是这样的: 这给我一个错误,因为map分区期望返回的类型,但这里是。我知道这在forEach分区中是可能的,但我也想做映射。单独做会有开销(额外的火花工作)。该怎么办? 谢谢

  • 问题内容: 我有一个基于Maven的Spring-WS客户端项目,我希望将其打包为一个jar。在日食中,一切正常运行。当我尝试将其打包为可执行jar时,由于Spring jar不包含在我的应用程序jar中,因此出现ClassNotFound异常。 因此,我添加了maven-shade-plugin以将我的所有依赖项都包含在应用程序jar中。当我查看我的应用程序jar时,会看到包含的所有依赖项中的所