我正在努力实现以下目标:
使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。
我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程序的每个实例创建了一个匿名的、唯一的消费者组。spring boot应用程序只有一个配置为侦听输入通道的流侦听器。
测试示例案例:
配置了一个示例spring cloud流应用程序的输入通道,它绑定到一个Kafka主题。使用spring rest控制器向输入通道发送消息,期望该消息将被传递到运行的每个spring boot应用程序实例。在启动时的两个应用程序中,我可以看到正确分配了kafka分区。
问题:
但是,当我使用output().send()发送消息时,spring甚至不向配置的Kafka主题发送消息,而是在同一个线程中触发同一个应用程序实例的@StreamListener方法。
在调试期间,我看到spring代码有两个消息的处理程序。StreamListenerMessageHandler和KafkaProducerMessageHandler。spring只是简单地将它们链接起来,如果first handler以成功结束,那么它甚至不会更进一步。StreamListenerMessageHandler只是在同一个线程中调用我的@StreamListener方法,消息永远不会到达Kafka。
问题:
这是故意的吗?既然如此,那又是为什么呢?我怎样才能达到文章开头提到的行为呢?
ps.如果我使用KafkaTemplate和@kafkalistener方法,那么它可以按照我的要求工作。消息被发送到Kafka主题,两个应用实例接收消息并在Kafka listener注释方法中进行处理。
代码:
流侦听器方法的配置方式如下:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableTransactionManagement
public class ProcessorApplication {
private Logger logger =
LoggerFactory.getLogger(this.getClass().getName());
private PersonRepository repository;
public ProcessorApplication(PersonRepository repository) {
this.repository = repository;
}
public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
logger.info("Received event={}", data);
Person person = new Person();
person.setName(data.getName());
logger.info("Saving person={}", person);
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
logger.info("Sent event={}", event);
return event;
}
}
向输入通道发送消息:
@RestController()
@RequestMapping("/persons")
public class PersonController {
@Autowired
private Sink sink;
@PostMapping("/events")
public void createPerson(@RequestBody PersonEvent event) {
sink.input().send(MessageBuilder.withPayload(event).build());
}
}
spring cloud流配置:
spring:
cloud.stream:
bindings:
output.destination: person-event-output
input.destination: person-event-input
sink.input().send
您将绕过绑定器,直接将其发送到流侦听器。
您需要将消息发送到kafka(发送到Person-Event-Input
主题),然后每个流侦听器将从kafka接收消息。
您需要配置另一个output
绑定并将其发送到那里,而不是直接发送到输入通道。
问题内容: 如何要求机器人将消息发送到与机器人接收命令不同的另一个通道(特定通道)? 假设bot 在channel中接收到消息,并且如果操作完成,则bot发送到channel 。 代码: 代码: 问题答案: 答案很简单:
如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!
我想向bot已加入的所有频道发送消息 当特定文件的散列结果与原始结果不同时,我希望将消息发送到bot加入的所有不协调通道。我知道如何向频道发送回复信息 用这个代码,对吗?但我想发送消息到所有的通道,机器人已加入当一些事件发生。
我有一个YouTube频道。我想为我的网站添加新功能: 用户使用其google帐户进行身份验证。 用户创建自己的视频播客(使用此小部件),并将其上载到自己的youtube帐户。 之后,我使用YouTube API将用户视频移动到我的频道 有没有可能允许一些用户在我的频道上传他们账户的视频?也许用户可以通过Google Plus做到这一点? 附注:我看过一些文章
使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf
如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。