当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream动态目的地使用本机编码的Avro不起作用

谯皓君
2023-03-14

我一直在尝试使用Spring Cloud Stream的动态目的地功能以Avro格式发布消息。然而,由于我使用的是本机编码(合流Avro序列化器),消息转换器无法处理这种情况。显然,当我使用静态目的地时,我能够通过在“绑定”级别使用“使用本机编码:true”参数来管理本机编码。然而,有了动态的目的地,我似乎没有这样的能力。

private boolean publishMessage(byte[] record, String target, String contentType, Schema schema) {
    return resolver.resolveDestination(target)
        .send(MessageBuilder
            .createMessage(record, new MessageHeaders(
                Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
  }

如果我对“application/*avro”内容类型使用以下方法,并且记录为byte[]格式,则会引发以下异常:

error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5c778504]; nested exception is org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema: \"bytes\"

如果缺少本机编码属性,通常会发生此异常。

如果我尝试在使用以下方法发布消息之前将字节数组反序列化为通用记录,则无法为其找到合适的消息转换器。

public static GenericRecord bytesToGenericAvro(byte[] bytes, Schema schema) {
    DatumReader<GenericRecord>
        datumReader = new GenericDatumReader<>(schema);

    GenericRecord record = null;
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    bais.reset();

    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
    try {
      record = datumReader.read(null, binaryDecoder);
    } catch (IOException e) {
      log.error("Unable to deserialize byte array to avro generic record", e.getMessage());
    } finally {
      try {
        bais.close();
      } catch (IOException e) {
        log.warn("Unable to close ByteArrayInputStream", e.getMessage());
      }
    }
    return record;
  }

更新:添加此bean后仍然面临相同的问题。当Spring Cloud Stream尝试将消息转换为Avro时,会引发异常!

  @Bean
  public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
    return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
      producerProperties.setUseNativeEncoding(true);
      producerProperties.setErrorChannelEnabled(true);
      producerProperties.setPartitionCount(3);
    });
  }

例外:

failed to send Message to channel 'output1'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={...}, headers={contentType=application/*+avro, id=c22bf171-c6ae-cedb-b0be-3aa0fcbdf762, timestamp=1567053746112}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:388)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:422)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at com.example.controller.PublisherController.publishMessage(PublisherController.java:90)
    at com.example.controller.PublisherController.replayRecord(PublisherController.java:72)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:118)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:853)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1587)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)

共有1个答案

应涵容
2023-03-14

您可以通过添加NewDestinationBindingCallback bean来修改动态绑定的绑定属性,并将其传递给解析器。请参阅文档。

如果预先知道频道名称,则可以像配置任何其他目标一样配置生产者属性。或者,如果注册一个NewDestinationBindingCallback

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例显示了如何使用RabbitMQ绑定器:

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

如果需要支持具有多个绑定器类型的动态目标,请将Object用于泛型类型,并根据需要强制转换扩展参数。

编辑

这是解析器中的一个bug;在创建和配置通道之前,不会调用回调来更新属性。它适用于大多数属性,但不适用于此属性。

以下是解决方法:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So57688303Application {

    public static void main(String[] args) {
        SpringApplication.run(So57688303Application.class, args);
    }

    @Bean
    public NewDestinationBindingCallback<KafkaProducerProperties> dynamicBindingConfigurer() {
        return ((channelName, channel, producerProperties, extendedProducerProperties) -> {
            producerProperties.setUseNativeEncoding(true);
            producerProperties.setErrorChannelEnabled(true);
            producerProperties.setPartitionCount(3);
            extendedProducerProperties.getConfiguration().put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    MySerializer.class.getName());
        });
    }


    @Bean
    public ApplicationRunner runner(BinderAwareChannelResolver resolver) {
        return args -> {
            MessageChannel channel = resolver.resolveDestination("dynamic");
            ((AbstractMessageChannel) channel).removeInterceptor(0); // only need to do this on the first resolution
            channel.send(new GenericMessage<>("foo"));
        };
    }

    public static class MySerializer implements Serializer<String> {

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }

        @Override
        public byte[] serialize(String topic, String data) {
            System.out.println("In my serializer with data of type " + data.getClass().getSimpleName());
            return data.getBytes();
        }

        @Override
        public void close() {
        }

    }

 }

In my serializer with data of type String
 类似资料:
  • 我已经完成了翻译我的应用到土耳其使用基地本地化。但是,我还需要翻译info.plist文件中的键。我做了我为其他所有事情所做的一切: 去找档案检查员检查土耳其语: 然后我转到new strings资源,通过将这个键添加到本地化文件中来完成我的翻译: 其他都是土耳其语,翻译得很好。只是位置许可不能翻译。我还看到了如何本地化iOS info.plist文件中的字符串?并将键(当然,作为,不是字面上的句

  • 问题内容: 最近,我更改了一些要通过ajax显示的页面,但对于为什么utf8编码现在在框内显示一个问号却不知道,我有些困惑。 举个例子。最初的页面是index.php。charset已显式设置为utf8,位于中。然后我用php查询数据库 这是原始的index.php页面: 但是,当我进行更改以添加通过ajax填充“ main_container”的菜单时,所有utf8编码均停止工作。这是新的代码:

  • 问题内容: 我正在将项目从Ant转换为Maven,并且在处理UTF-8字符的特定单元测试中遇到问题。问题与以下字符串有关: 问题是单元测试失败,因为字符串的读取方式如下: java类另存为UTF-8,并且我还在pom.xml中将构建编码指定为UTF-8。 这是我的 pom.xml 的摘录: 我在这里想念什么吗?如果有人可以在这里帮助我,那就太好了。 更新资料 关于测试代码: 以下逻辑不是真正相关的

  • 我有这样一个csv文件 我在读书

  • 问题内容: 该代码非常有用: 但是这段代码 不起作用 : 它引发此错误: ZMQError:没有这样的设备 为什么,zeromq无法使用localhost接口? 它只能在同一台计算机上的IPC上运行吗? 问题答案: 问题在于: 尝试更改为:

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?