如何使用新的Spring Cloud Stream Kafka功能模型发送消息?
不推荐的方式是这样的。
public interface OutputTopic {
@Output("output")
MessageChannel output();
}
@Autowired
OutputTopic outputTopic;
public someMethod() {
outputTopic.output().send(MessageBuilder.build());
}
但是我如何以函数式风格发送消息呢?
应用yml公司
spring:
cloud:
function:
definition: process
stream:
bindings:
process-out-0:
destination: output
binder: kafka
@Configuration
public class Configuration {
@Bean
Supplier<Message<String>> process() {
return () -> {
return MessageBuilder.withPayload("foo")
.setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
};
}
我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!
您可以使用StreamBridge
或反应器API-请参阅将任意数据发送到输出(例如外部事件驱动源)
我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放
我正在使用Spring Cloud Stream和RabbitMq在不同的微服务之间交换消息。 这是我发布消息的设置。 . 以及接收消息的设置 . 我能够成功地使用此设置交换消息。我希望,发送消息和接收消息的ID是相等的。但它们总是不同的UUID。 有没有一种方法可以让消息从生产者通过RabbitMq一直保持相同的ID到消费者?
所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?
是否可以记录来自流函数的入站消息?有什么拦截器能让我这么做吗?
如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?