当前位置: 首页 > 知识库问答 >
问题:

Spring的云流Kafka发送消息

葛霄
2023-03-14

如何使用新的Spring Cloud Stream Kafka功能模型发送消息?

不推荐的方式是这样的。

public interface OutputTopic {

    @Output("output")
    MessageChannel output();
}

@Autowired
OutputTopic outputTopic;

public someMethod() {
    
    outputTopic.output().send(MessageBuilder.build());
}

但是我如何以函数式风格发送消息呢?

应用yml公司

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-out-0:
          destination: output
          binder: kafka
@Configuration
public class Configuration {

    @Bean
    Supplier<Message<String>> process() {
        return () -> {
            return MessageBuilder.withPayload("foo")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()).build();
        };
    }

我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

共有1个答案

窦涵忍
2023-03-14

您可以使用StreamBridge或反应器API-请参阅将任意数据发送到输出(例如外部事件驱动源)

 类似资料:
  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 我正在使用Spring Cloud Stream和RabbitMq在不同的微服务之间交换消息。 这是我发布消息的设置。 . 以及接收消息的设置 . 我能够成功地使用此设置交换消息。我希望,发送消息和接收消息的ID是相等的。但它们总是不同的UUID。 有没有一种方法可以让消息从生产者通过RabbitMq一直保持相同的ID到消费者?

  • 所以我不确定如何做到这一点。我已经使用Qukus和MicroProfile反应式消息传递框架以及javax.websocket库的东西做到了这一点,但我不确定如何将其移植到使用Kafka Streams。使用MP反应式消息传递,我可以在其他类中的一个通道上有一个@外出注释,然后使用我的WebSocket服务,我可以像这样从该通道注入。 Kafka流有可能做到这一点吗?

  • 是否可以记录来自流函数的入站消息?有什么拦截器能让我这么做吗?

  • 我正在努力实现以下目标: 使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。 我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程