当前位置: 首页 > 知识库问答 >
问题:

带有融合模式注册表的Spring Cloud Stream Kafka无法将消息发送到通道输出

洪研
2023-03-14

我想用Confluent Schema Registry和Avro Schema evolution测试Spring Cloud Stream,以将其与我的应用程序集成。我发现Spring Cloud Stream不支持到融合模式注册中心的安全连接,实现仍然非常基础。因此,我决定将Confluent Schema Registry Client与Spring Kafka一起用于模式注册表部分,其余部分使用Spring Cloud Stream。以下是消费者和生产者的代码以及Avro模式:

消费者:

import com.example.Sensor;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {

    private final Log logger = LogFactory.getLog(getClass());

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void process(Sensor data) {
        logger.info(data);
    }

    @Configuration
    static class ConfluentSchemaRegistryConfiguration {
        @Bean
        public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
            CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(endpoint, 100);
            return client;
        }
    }
}

应用yaml:

spring.kafka.properties.schema.registry.url: kafka-url:kafka-port
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: sensor-topic
      schemaRegistryClient:
        endpoint: https://user:password@schema-registry-url:schema-registry-port
      schema:
        avro:
          schema-locations: classpath:avro/sensor.avsc
      kafka:
        binder:
          brokers: SSL://kafka-url:kafka-port
          configuration:
            security.protocol: SSL
            ssl.truststore.type: JKS
            ssl.truststore.location: client.truststore.jks
            ssl.truststore.password: secret
            ssl.keystore.type: PKCS12
            ssl.keystore.location: client.keystore.p12
            ssl.keystore.password: secret
            ssl.key.password: secret
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: user:password
server.port: 9999

生产商:

import com.example.Sensor;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;
import java.util.UUID;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class Producer1Application {

    @Autowired
    private Source source;

    private Random random = new Random();

    public static void main(String[] args) {
        SpringApplication.run(Producer1Application.class, args);
    }

    @RequestMapping(value = "/messages", method = RequestMethod.POST)
    public String sendMessage() {
        source.output().send(MessageBuilder.withPayload(randomSensor()).build());
        return "ok, have fun with v1 payload!";
    }

    private Sensor randomSensor() {
        Sensor sensor = new Sensor();
        sensor.setId(UUID.randomUUID().toString() + "-v1");
        sensor.setAcceleration(random.nextFloat() * 10);
        sensor.setVelocity(random.nextFloat() * 100);
        sensor.setTemperature(random.nextFloat() * 50);
        return sensor;
    }

    //Another convenience POST method for testing deterministic values
    @RequestMapping(value = "/messagesX", method = RequestMethod.POST)
    public String sendMessageX(@RequestParam(value="id") String id, @RequestParam(value="acceleration") float acceleration,
                               @RequestParam(value="velocity") float velocity, @RequestParam(value="temperature") float temperature) {
        Sensor sensor = new Sensor();
        sensor.setId(id + "-v1");
        sensor.setAcceleration(acceleration);
        sensor.setVelocity(velocity);
        sensor.setTemperature(temperature);
        source.output().send(MessageBuilder.withPayload(sensor).build());
        return "ok, have fun with v1 payload!";
    }

    @Configuration
    static class ConfluentSchemaRegistryConfiguration {
        @Bean
        public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
            CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(endpoint, 100);
            return client;
        }
    }

}

应用亚马尔

spring.kafka.properties.schema.registry.url: kafka-url:kafka-port
spring:
  cloud:
    stream:
      bindings:
        output:
          contentType: application/*+avro
          destination: sensor-topic
      schemaRegistryClient:
        endpoint: https://user:password@schema-registry-url:schema-registry-port
      schema:
        avro:
          schema-locations: classpath:avro/sensor.avsc
      kafka:
        binder:
          brokers: SSL://kafka-url:kafka-port
          configuration:
            security.protocol: SSL
            ssl.truststore.type: JKS
            ssl.truststore.location: client.truststore.jks
            ssl.truststore.password: secret
            ssl.keystore.type: PKCS12
            ssl.keystore.location: client.keystore.p12
            ssl.keystore.password: secret
            ssl.key.password: secret
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            basic.auth.credentials.source: USER_INFO
            basic.auth.user.info: user:password
server.port: 9009

使用者架构:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]},
    {"name":"externalTemperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0}
  ]
}

生产者架构:

{
  "namespace" : "com.example",
  "type" : "record",
  "name" : "Sensor",
  "fields" : [
    {"name":"id","type":"string"},
    {"name":"temperature", "type":"float", "default":0.0},
    {"name":"acceleration", "type":"float","default":0.0},
    {"name":"velocity","type":"float","default":0.0}
  ]
}

我坚持以下例外。如果你能帮助我理解问题所在,我将不胜感激。

java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"id": "a50a7646-a1c9-49a2-a7f6-f09b77fc3116-v1", "temperature": 10.430967, "acceleration": 5.434994, "velocity": 70.19337}, headers={id=de6e40e4-ba8f-9dca-1753-c06a1751e2d4, contentType=application/*+avro, timestamp=1556160207806}]' to outbound message.
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:325) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:353) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at sample.producer1.Producer1Application.sendMessage(Producer1Application.java:39) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:90) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_191]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_191]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.17.jar:9.0.17]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]

P、 S:我使用的是Confluent Client 5.2.1、Avro 1.8.2、Spring Boot 2.1.4。释放和Spring的云流鱼城。释放。

共有1个答案

艾安和
2023-03-14

我相信您使用的是Spring Cloud Stream版本2.0。如果这是真的,您可能会面临内容类型协商的问题。在错误消息中,GenericMessage中的content type标头是application/*avro。自2.0以来的默认内容类型是应用程序/json。

您需要添加一个org.springframework.messaging.converter.MessageConverter来处理配置中缺少的Avro内容。参考:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-preface-content-type-negotiation-improvements

--更新--

转换器已经存在,只需要为出站消息显式启用,可以通过配置完成:

spring.cloud.stream.bindings.output.contentType=application/*+avro

消费者代码看起来很好,使用SchemaRegistryClient。

文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_avro_schema_registry_client_message_converters

 类似资料:
  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我使用来自Confluent的Kafka Connect来使用Kafka流并以拼花格式写入HDFS。我正在1个节点中使用架构注册表服务,它运行良好。现在我想将模式注册表分发到集群模式以处理故障转移。关于如何实现这一点的任何链接或片段都将非常有用。

  • 我们正在开发并尝试删除主题的模式,因为更改与旧模式不兼容。 我们删除了模式/主题,并尝试使用相同的主题名称创建新模式,成功创建了模式。 然而,当我们运行应用程序时,它仍然指向相同的模式ID。 旧模式ID(主题“topic1”):51 新架构ID(主题“topic1”):52 应用程序在反序列化消息时出错 <代码>root@bas:/#curl-khttps://schemaregistry:443

  • 如何使用Spring Kafka通过合流模式注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它。

  • 我正在与Avro/Kafka和Confluent的Avro模式注册中心合作。 我使用avsc文件和avdl制作了一些基本模式和具有基本类型的主题。 我正在查看Confluent制作的API文档,以尝试将模式发展到版本2。特别是本部分: https://docs.confluent.io/current/schema-registry/using.html#register-a-new-versio

  • 有可能将融合模式注册与AWS MSK集成吗?如果你以前这样做过,你能提供一些你实现它的方法/博客吗?