我正在尝试实现一个Spring Cloud数据流,它从数据库中读取记录,将这些记录传递给转换为Avro模式的处理器,然后将其传递给接收器应用程序使用。
我有数据从SQL数据库流向我的源应用程序,并通过Kafka绑定器传递数据,没有问题,我遇到了问题,将数据从处理器发送到Sink应用程序序列化/反序列化与Avro。
我创建了一个名为ech的avro模式。avsc和已使用处理器内的avro maven插件为其生成了一个名为EchRecord的类。
我已将以下依赖项添加到处理器和接收器的pom中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
我已将处理器的属性设置为
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
在Sink端,属性看起来像spring.cloud.stream.schemaRealstryClient.endpoint=http://192.168.99.100:8990
处理器应用程序代码如下所示:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
在Sink端,它的代码如下所示:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
我有一个Spring Schema注册表服务器正在运行并可由两个应用程序访问,我可以在查询注册表时看到架构已交付给服务器。
我可以查看是否在接收到的消息中正确设置了contentType的接收器应用程序上启用调试日志记录:contentType=应用程序/vnd。echrecord。v1 avro
在Sink应用程序中,我设置了一个带有@StreamListener注释的方法来检索接收对象并打印出数据和类类型的消息,它似乎在检索字节数组。
如何更改Sink应用程序的代码以将Avro消息反序列化为可以从中检索设置数据的内容?
这里有几件事要尝试。在生产端,由于您的类型已经是Avro类型(专用记录或通用记录),您不需要DynamicSchemaGener
标志,这是为基于反射的编写器设计的,主要用于测试,因为它会影响性能。
由于您的接收器可以看到您发布的正确类型,您现在需要的是将您的类型放在接收器上。因此,例如,在接收器上添加类型,并使用适当的类型对方法进行注释:EchRecord
,这将为您提供正确的类型。
您还可以将其设置为genericord,以便能够使用记录像对象容器一样访问它。获取(
本文向大家介绍springcloud实现注册中心Eureka,包括了springcloud实现注册中心Eureka的使用技巧和注意事项,需要的朋友参考一下 Eureka是Netflix开源的一款提供服务注册和发现的产品,它提供了完整的Service Registry和Service Discovery实现。也是springcloud体系中最重要最核心的组件之一。 背景介绍 服务中心 服务中心又称注
本文向大家介绍SpringCloud融入Python的实现,包括了SpringCloud融入Python的实现的使用技巧和注意事项,需要的朋友参考一下 前言 该篇文章分享如何将Python Web服务融入到Spring Cloud微服务体系中,并调用其服务,Python Web框架用的是Tornado 构建Python web服务 引入py-eureka-client客户端 manage.py 大
本文向大家介绍SpringCloud Eureka Provider及Consumer的实现,包括了SpringCloud Eureka Provider及Consumer的实现的使用技巧和注意事项,需要的朋友参考一下 Eureka-Provider 服务的提供者 新建一个服务提供者项目 1、导入pom文件 2、在启动类上加注解 上边那个@EnableDiscoverClient 注解加不加都行的
我相信,这不是实施这种转变的最佳方式。我想知道是否有更好的方法从数据流作业中进行查找。 跟进: 试图将用例实现为侧输入:
本文向大家介绍SpringCloud使用Feign实现服务调用,包括了SpringCloud使用Feign实现服务调用的使用技巧和注意事项,需要的朋友参考一下 Spring Cloud Feign简介 Spring Cloud Feign也是一个基础工具类,它整合了Spring Cloud Ribbon和Spring Cloud Hystrix,除了提供这两者的强大功能以外,它还提供了一种声明式的
本文向大家介绍SpringCloud Bus如何实现配置刷新,包括了SpringCloud Bus如何实现配置刷新的使用技巧和注意事项,需要的朋友参考一下 要想实现配置刷新,首先得有项目基础结构 项目一: 注册中心 项目二: 配置中心 项目三: 客户端 先启动注册中心 然后启动配置中心 然后在不同端口启动客户端的多个实例,这些实例都是通过bootstrap.properties连接到配置中心后,加