1.pom文件导入依赖
<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
2.application.yml文件配置
spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址 bindings: xxx_output: // 通道名称 destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) xxx_input: destination: xxx // 消息发往的目的地,对应topic group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道 @Service public class MqService { @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) private MessageChannel oesWorkbenchChannel; /** * 发送一条kafka消息 */ public boolean sendLifeData(Object object) { return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } // 发布通道 public interface Source { @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel }
4.创建消息监听者
@Slf4j @EnableBinding(Sink.class) public class WorkbenchStreamListener { @Resource private FileService fileService; @StreamListener(KafkaConstants.xxx_input) // 监听接受通道 public void receiveData(MoveMessage moveMessage) { } } // 接受通道 public interface Sink { @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel }
接下来就可以愉快的发送监听消息了
到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索小牛知识库以前的文章或继续浏览下面的相关文章希望大家以后多多支持小牛知识库!
spring-cloud-stream-kafka-elasticsearch The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-se
Spring Cloud Stream应用程序的配置是否正确。我们有Spring Cloud Stream文档解释与Kafka的SSL连接吗?
我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA
我试图创建一个简单的程序来打印一个Kafka主题的Kstream。我不断地得到一个NPE和完全没有想法。 我已经使用了spring cloud-stream-binder-kafka-streams依赖项,并且我正在使用spring cloud的最新版本“Finchley.m9”。 我写的代码是: Application.Properties具有: 当我启动服务时,我在控制台上不断得到以下错误:
我最近开始为Kafka研究Spring Cloud Stream,并且一直在努力使TestBinder与Kstreams一起工作。这是一个已知的限制,还是我忽略了什么? 这很好: 字符串处理器: 字符串测试: 但当我试图在流程中使用KStream时,我无法让TestBinder正常工作。 Kstream处理器: KStream测试: 正如您可能已经注意到的,我从Kstream处理器中省略了@Str
试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。