当前位置: 首页 > 知识库问答 >
问题:

Spring-Cloud-Stream-Kafka-Binder 函数式忽略自定义 De/Serializer 和/或使用本机编码?

康照
2023-03-14

我们刚刚升级到Spring-Cloud-Stream的3 . 0 . 0-版本,遇到了以下问题:

像这样使用函数式风格时:

public class EventProcessor {

    private final PriceValidator priceValidator;

    @Bean
    public Function<Flux<EnrichedValidationRequest>, Flux<ValidationResult>> validate() {
        return enrichedValidationRequestFlux -> enrichedValidationRequestFlux
                .map(ProcessingContext::new)
                .flatMap(priceValidator::validateAndMap);
    }
}

application.yaml如下所示:

spring.cloud.stream:
  default-binder: kafka
  kafka:
    binder:
      brokers: ${kafka.broker.prod}
      auto-create-topics: false
  function.definition: validate

# INPUT: enrichedValidationRequests
spring.cloud.stream.bindings.validate-in-0:
  destination: ${kafka.topic.${spring.application.name}.input.enrichedValidationRequests}
  group: ${spring.application.name}.${STAGE:NOT_SET}
  consumer:
    useNativeDecoding: true


spring.cloud.stream.kafka.bindings.validate-in-0:
  consumer:
    configuration:
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: de.pricevalidator.deserializer.EnrichedValidationRequestDeserializer


# OUTPUT: validationResults
spring.cloud.stream.bindings.validate-out-0:
  destination: validationResultsTmp
  producer:
    useNativeEncoding: true

spring.cloud.stream.kafka.bindings.validate-out-0:
  producer:
    compression.type: lz4
    messageKeyExpression: payload.offerKey
    configuration:
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: de.pricevalidator.serializer.ValidationResultSerializer

似乎序列化做了两次——当我们截取kafka主题中产生的消息时,消费者只是将它们显示为JSON(字符串),但现在它是一个不可读的字节[]。此外,生产中的下游消费者不能再反序列化消息。奇怪的是,输入消息的反序列化似乎工作得很好,无论我们在消费者属性中放入什么(无论是在binder上还是在默认的kafka级别上)我们都有一种感觉,这个错误“回来了”,但我们无法在代码中找到确切的位置:https://github . com/spring-cloud/spring-cloud-stream/issues/1536

我们的(丑陋的)解决方法:

@Slf4j
@Configuration
public class KafkaMessageConverterConfiguration {

    @ConditionalOnProperty(value = "spring.cloud.stream.default-binder", havingValue = "kafka")
    @Bean
    public MessageConverter validationResultConverter(BinderTypeRegistry binder, ObjectMapper objectMapper) {
        return new AbstractMessageConverter(MimeType.valueOf("application/json")) {
            @Override
            protected boolean supports(final Class<?> clazz) {
                return ValidationResult.class.isAssignableFrom(clazz);
            }

            @Override
            protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
                return payload;
            }
        };
    }
}

有没有一种“合适的”方法来设置自定义序列化程序或像以前一样获得本机编码?

共有1个答案

梁丘权
2023-03-14

所以这是3.0.0后报告的问题https://github.com/spring-cloud/spring-cloud-stream/commit/74aee8102898dbff96a570d2d2624571b259e141.它已经解决,几天后将在3.0.1.RELEASE(Horsham.SR1)中提供。

 类似资料:
  • 因此,我实现了一个自定义SerDe,它从Confluent提供的扩展到每当与模式注册表通信超时时,都会尝试重试。我已将Spring Cloud Streams Kafka binders配置为默认使用: 今天我在日志中看到了这个错误: 这告诉我Kafka Streams使用的SerDe不是我上面定义的SerDe,而是基类SpecificAvroSerde(它包装SpecificAvroSerial

  • 我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA

  • 我们正在使用Avro输入/输出记录通过Spring Cloud Stream功能支持测试Kafka Streams的使用,但设置和,以便在执行Avro转换的地方使用自定义的。 默认的 和值的 。 当我们只使用KStream到KStream函数时,一切都没问题,例如: 但是当我们尝试一个稍微复杂一点的例子,涉及一个像这样的输入KTable: ( 类只有两个成员:) 收到第一条记录时,将引发此异常:

  • 我有一个rest postendpoint,它使用Spring Cloud Stream Kafka绑定器来使用数据和写到Kafka。现在我们没有任何错误处理到位。但是我们希望通过在没有将数据写入Kafka时添加一个额外的检查来使这个endpoint容错。当数据没有写入Kafka时,我们打算发送一个异常信息对象。我试图用这种方法使用全局错误来实现这一点 我的怀疑有两个方面: 当我们写入Kafka的

  • 我们在Spring Boot的基础上开发了一个内部公司框架,我们希望通过Spring Cloud Stream支持Kafka Streams。我们需要自动向所有出站消息注入一些头。我们通过标准的Spring Cloud Stream Kafka Binder注册了一个定制的,实现了这一点,但这不适用于Kafka Streams,因为它们似乎遵循不同的路径。 对于Spring Cloud Strea

  • 我们有一个Kafka过程,在这个过程中,我们消费来自一个主题的消息,然后进行一些充实,然后我们将消息发布到另一个主题。以下是事件 消费者-消费信息 我正在使用Spring Cloud kafka活页夹版本3.0.0-RELEASE,事情进展顺利。最近我们引入了幂等生产者,并包含了transactionIdPrefix属性,我们观察到我们开始出现性能问题。以下是统计数据。 在transactionI