如何使用新的Spring Cloud Stream Kafka功能模型发送消息?
不推荐的方式是这样的。
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
但是我如何以函数式风格发送消息呢?
应用yml公司
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!
您可以使用StreamBridge
或反应器API-请参阅将任意数据发送到输出(例如外部事件驱动源)
我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放
我正在使用Spring Cloud Stream和RabbitMq在不同的微服务之间交换消息。 这是我发布消息的设置。 . 以及接收消息的设置 . 我能够成功地使用此设置交换消息。我希望,发送消息和接收消息的ID是相等的。但它们总是不同的UUID。 有没有一种方法可以让消息从生产者通过RabbitMq一直保持相同的ID到消费者?
所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?
是否可以记录来自流函数的入站消息?有什么拦截器能让我这么做吗?
我正在努力实现以下目标: 使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。 我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程