我们正在使用Kafka和Spring Cloud Stream,我们需要连接到Spring Boot组件中的Confluent Schema Registry,请参阅https://github.com/donalthurley/KafkaConsumeScsAndConfluent.
我们添加了以下配置来创建所需的 ConfluentSchemaRegistryClient bean see https://github.com/donalthurley/KafkaConsumeScsAndConfluent/blob/master/src/main/java/com/example/kafka/KafkaConfig.java,它应该覆盖 Spring Cloud Stream 中的默认模式注册表。
但是,在某些部署之后,我们间歇性地看到以下故障。
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel
根本原因显示此堆栈跟踪
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:71)
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:238)
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:179)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:322)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:611)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
DefaultSchemaRegistryClient 由 AvroSchemaRegistryClientMessageConverter 调用的事实向我们表明,我们的 ConfluentSchemaRegistryClient bean 的接线存在问题。
在我们的配置中还需要其他东西来确保 ConfluentSchemaRegistryClient Bean 正确连接吗?
现在,我已经将@Enable组分注册客户端注释移动到项目应用程序类(请参见https://github.com/donalthurley/KafkaConsumeScsAndConfluent/commit/b4cf5427d7ab0a4fed619fe54b042890f5ccb594并重新部署),这修复了在我们的环境中部署时遇到的问题。
我一直在用@EnableSchemaRegistryClient注释来注释我的生产者和消费者类。
在我所有的本地测试中,这一直针对我的本地 docker 融合模式注册表工作。但是,在部署到我们的环境时,它大部分时间都在工作,但在一些部署后偶尔会失败。
我没有成功地在本地复制这个。
我还注意到,在本地测试中,如果删除了Confluent Schema Registry的配置,我会得到相同的空指针异常堆栈跟踪。
所以我想我看到的问题是,当项目应用程序类中不存在@EnableSchemaRegistryClient注释时,AvroSchemaRegistryClientMessageConverter bean没有与融合模式注册表bean连接。
我不明白为什么这是必要的,但我认为它可能已经解决了这个问题。
它对我有用。这就是我所做的:
>
@EnableSchemaRegistryClient
注释按如下方式设置属性:
spring.cloud.stream.kafka.bindings.channel.consumer.configuration.schema.registry.url=注册表的地址spring.cloud.stream.kafka.bindings.channel.consumer.configuration.specific.avro.reader=true
其中频道对应于您应用程序中的频道名称。
我认为最后一个属性非常重要,告诉 Spring 使用 Avro 序列化程序而不是默认序列化程序。
我使用的是Spring-Cloud-Stream埃尔姆赫斯特。RELEASE,因此如果使用其他版本,属性的名称可能会略有不同。
嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?
在开始协议前,客户端在授权服务器注册。客户端在授权服务器上注册所通过的方式超出了本规范,但典型的涉及到最终用户与HTML注册表单的交互。 客户端注册不要求客户端与授权服务器之间的直接交互。在授权服务器支持时,注册可以依靠其他方式来建立信任关系并获取客户端的属性(如重定向URI、客户端类型)。例如,注册可以使用自发行或第三方发行声明或通过授权服务器使用信任通道执行客户端发现完成。 当注册客户端时,客
前面我们对基于 MINA 的服务端架构有了一个大体认识,现在我们看一下客户端的情况。客户端需要连接到一个服务端,发送消息并处理响应 客户端首先创建一个 IOConnector (用以连接 Socket 的 MINA Construct (构件)),开启一个服务器的绑定 在连接创建时,一个 Session 会被创建并关联到该连接 应用或者客户端写入 Session,导致数据在穿越 Filter Ch
我们在当前的基础架构中安装了普通的apache Kafka,并开始记录一些我们想要使用Kafka Connect处理的数据。目前,我们使用Avro作为消息格式,但我们的基础架构中没有模式注册表。将来,我们计划用Confluent替换当前堆栈,并使用Schema Registry和Connect,但在一段时间内,我们只需要为此部署Connect。 是否可以以某种方式配置连接接收器,以便它们使用显式a
我正在开发一个使用Kafka代理和模式注册表的GCP云数据流作业。我们的Kafka broker和Schema Registry需要TLS客户端证书。我在部署时面临着与模式注册中心的连接问题。任何建议都非常欢迎。 这是我为数据流工作做的。我为TLS配置创建消费者属性。 并通过update Consumer Properties更新使用者属性。 正如这个stackoverflow答案所建议的,我还将
我试图使用kafka-avro-console-producer 5.4.0-ccs不自动注册模式。我试着用: 但它仍然在注册模式。属性似乎正确:https://github.com/confluentinc/schema-registry/blob/a0a04628687a72ac6d01869d881a60fbde4177e7/avro-serializer/src/main/java/io/