我试图找到spring cloud stream的例子,它为RMQ创建了基于分区的生产者。我想看看它将如何为这些队列创建绑定,因为RMQ本身不支持主题的分区,但它将创建与分区数量相等的队列数量(我读到了这篇文章,可能是错的)。首先,我想了解如何在RMQ上使用spring cloud stream为基于分区的producer创建producer。
@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43614477Application.class, args);
}
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
}
}
使用属性。。。
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2
spring.cloud.stream.bindings.output.producer.required-groups=bar
我添加了必需组
,以便您可以查看消费者队列是如何绑定的。
foo
交换绑定:
表达式可以是根据消息计算的任何有效表达式(例如,有效负载。hashCode());然后根据分区计数对其进行修改,以确定最终分区。
如果我有3个由producer创建的分区,如果我在CF中部署3个实例,每个实例选择一个队列并使用索引处理消息,那么我可以使用cloud stream和rabbit mq开发示例消费者。 现在的问题是,如果我有10个分区,我似乎需要10个实例,这是浪费资源,我们可以让一个消费者监听多个分区吗。我之所以有基于分区的生产者,是因为对我来说,消息序列是处理事务的顺序。
我有几个Samza工作运行所有阅读Kafka主题的消息,并为新主题编写新消息。为了发送新消息,我使用Samza内置的OutgoingMessageEnvelope。并使用MessageCollector发送新消息。它看起来是这样的:
null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?
我想用Intellij IDEA中的Gradle启动一个空项目,但它似乎不像Eclipse那样方便。 我所做的是文件->新项目->分级。并且显示了一个项目,但它没有完成(例如,它没有src文件夹)。 并且我在build.gradle中添加了。当我提到IntelliJ Idea中的所有任务时。没有可供我建立IntelliJ想法。 在Eclipse+gradle插件中,只需创建一个新的gradle项目
所以,我想对我的spark数据帧执行某些操作,将它们写入DB,并在最后创建另一个数据帧。看起来是这样的: 这给我一个错误,因为map分区期望返回的类型,但这里是。我知道这在forEach分区中是可能的,但我也想做映射。单独做会有开销(额外的火花工作)。该怎么办? 谢谢
问题内容: 我有一个基于Maven的Spring-WS客户端项目,我希望将其打包为一个jar。在日食中,一切正常运行。当我尝试将其打包为可执行jar时,由于Spring jar不包含在我的应用程序jar中,因此出现ClassNotFound异常。 因此,我添加了maven-shade-plugin以将我的所有依赖项都包含在应用程序jar中。当我查看我的应用程序jar时,会看到包含的所有依赖项中的所