当前位置: 首页 > 知识库问答 >
问题:

spring cloud流不使用Kafka频道绑定器发送消息

姜宏放
2023-03-14

我正在努力实现以下目标:

使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。

我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程序的每个实例创建了一个匿名的、唯一的消费者组。spring boot应用程序只有一个配置为侦听输入通道的流侦听器。

测试示例案例:

配置了一个示例spring cloud流应用程序的输入通道,它绑定到一个Kafka主题。使用spring rest控制器向输入通道发送消息,期望该消息将被传递到运行的每个spring boot应用程序实例。在启动时的两个应用程序中,我可以看到正确分配了kafka分区。

问题:

但是,当我使用output().send()发送消息时,spring甚至不向配置的Kafka主题发送消息,而是在同一个线程中触发同一个应用程序实例的@StreamListener方法。

在调试期间,我看到spring代码有两个消息的处理程序。StreamListenerMessageHandler和KafkaProducerMessageHandler。spring只是简单地将它们链接起来,如果first handler以成功结束,那么它甚至不会更进一步。StreamListenerMessageHandler只是在同一个线程中调用我的@StreamListener方法,消息永远不会到达Kafka。

问题:

这是故意的吗?既然如此,那又是为什么呢?我怎样才能达到文章开头提到的行为呢?

ps.如果我使用KafkaTemplate和@kafkalistener方法,那么它可以按照我的要求工作。消息被发送到Kafka主题,两个应用实例接收消息并在Kafka listener注释方法中进行处理。

代码:

流侦听器方法的配置方式如下:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableTransactionManagement
public class ProcessorApplication {

private Logger logger = 
LoggerFactory.getLogger(this.getClass().getName());

private PersonRepository repository;

public ProcessorApplication(PersonRepository repository) {

    this.repository = repository;
}

public static void main(String[] args) {
    SpringApplication.run(ProcessorApplication.class, args);

}

@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
    logger.info("Received event={}", data);
    Person person = new Person();
    person.setName(data.getName());

    logger.info("Saving person={}", person);

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    logger.info("Sent event={}", event);
    return event;
   }
  }

向输入通道发送消息:

@RestController()
@RequestMapping("/persons")
public class PersonController {

@Autowired
private Sink sink;


  @PostMapping("/events")
  public void createPerson(@RequestBody PersonEvent event) {

      sink.input().send(MessageBuilder.withPayload(event).build());
  }


}

spring cloud流配置:

spring:
  cloud.stream:
    bindings:
        output.destination: person-event-output
        input.destination: person-event-input

共有1个答案

邹英发
2023-03-14

sink.input().send

您将绕过绑定器,直接将其发送到流侦听器。

您需要将消息发送到kafka(发送到Person-Event-Input主题),然后每个流侦听器将从kafka接收消息。

您需要配置另一个output绑定并将其发送到那里,而不是直接发送到输入通道。

 类似资料:
  • 问题内容: 如何要求机器人将消息发送到与机器人接收命令不同的另一个通道(特定通道)? 假设bot 在channel中接收到消息,并且如果操作完成,则bot发送到channel 。 代码: 代码: 问题答案: 答案很简单:

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我想向bot已加入的所有频道发送消息 当特定文件的散列结果与原始结果不同时,我希望将消息发送到bot加入的所有不协调通道。我知道如何向频道发送回复信息 用这个代码,对吗?但我想发送消息到所有的通道,机器人已加入当一些事件发生。

  • 我有一个YouTube频道。我想为我的网站添加新功能: 用户使用其google帐户进行身份验证。 用户创建自己的视频播客(使用此小部件),并将其上载到自己的youtube帐户。 之后,我使用YouTube API将用户视频移动到我的频道 有没有可能允许一些用户在我的频道上传他们账户的视频?也许用户可以通过Google Plus做到这一点? 附注:我看过一些文章

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。