当前位置: 首页 > 知识库问答 >
问题:

Spring的云流Kafka发送消息

葛霄
2023-03-14

如何使用新的Spring Cloud Stream Kafka功能模型发送消息?

不推荐的方式是这样的。

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}

但是我如何以函数式风格发送消息呢?

应用yml公司

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }

我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

共有1个答案

窦涵忍
2023-03-14

您可以使用StreamBridge或反应器API-请参阅将任意数据发送到输出(例如外部事件驱动源)

 类似资料:
  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 我正在使用Spring Cloud Stream和RabbitMq在不同的微服务之间交换消息。 这是我发布消息的设置。 . 以及接收消息的设置 . 我能够成功地使用此设置交换消息。我希望,发送消息和接收消息的ID是相等的。但它们总是不同的UUID。 有没有一种方法可以让消息从生产者通过RabbitMq一直保持相同的ID到消费者?

  • 所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?

  • 是否可以记录来自流函数的入站消息?有什么拦截器能让我这么做吗?

  • 如何使用项目Reactor背压功能与Kafka粘合剂在斯佩林云流? 如果我使用这种方式,比发布者发送延迟1秒的消息,但消费者消费消息没有任何延迟。 有可能在春雨云流中使用cunsumer上的BackPereSure吗?