我们刚刚升级到Spring-Cloud-Stream的3 . 0 . 0-版本,遇到了以下问题:
像这样使用函数式风格时:
public class EventProcessor {
private final PriceValidator priceValidator;
@Bean
public Function<Flux<EnrichedValidationRequest>, Flux<ValidationResult>> validate() {
return enrichedValidationRequestFlux -> enrichedValidationRequestFlux
.map(ProcessingContext::new)
.flatMap(priceValidator::validateAndMap);
}
}
application.yaml如下所示:
spring.cloud.stream:
default-binder: kafka
kafka:
binder:
brokers: ${kafka.broker.prod}
auto-create-topics: false
function.definition: validate
# INPUT: enrichedValidationRequests
spring.cloud.stream.bindings.validate-in-0:
destination: ${kafka.topic.${spring.application.name}.input.enrichedValidationRequests}
group: ${spring.application.name}.${STAGE:NOT_SET}
consumer:
useNativeDecoding: true
spring.cloud.stream.kafka.bindings.validate-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: de.pricevalidator.deserializer.EnrichedValidationRequestDeserializer
# OUTPUT: validationResults
spring.cloud.stream.bindings.validate-out-0:
destination: validationResultsTmp
producer:
useNativeEncoding: true
spring.cloud.stream.kafka.bindings.validate-out-0:
producer:
compression.type: lz4
messageKeyExpression: payload.offerKey
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: de.pricevalidator.serializer.ValidationResultSerializer
似乎序列化做了两次——当我们截取kafka主题中产生的消息时,消费者只是将它们显示为JSON(字符串),但现在它是一个不可读的字节[]。此外,生产中的下游消费者不能再反序列化消息。奇怪的是,输入消息的反序列化似乎工作得很好,无论我们在消费者属性中放入什么(无论是在binder上还是在默认的kafka级别上)我们都有一种感觉,这个错误“回来了”,但我们无法在代码中找到确切的位置:https://github . com/spring-cloud/spring-cloud-stream/issues/1536
我们的(丑陋的)解决方法:
@Slf4j
@Configuration
public class KafkaMessageConverterConfiguration {
@ConditionalOnProperty(value = "spring.cloud.stream.default-binder", havingValue = "kafka")
@Bean
public MessageConverter validationResultConverter(BinderTypeRegistry binder, ObjectMapper objectMapper) {
return new AbstractMessageConverter(MimeType.valueOf("application/json")) {
@Override
protected boolean supports(final Class<?> clazz) {
return ValidationResult.class.isAssignableFrom(clazz);
}
@Override
protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
return payload;
}
};
}
}
有没有一种“合适的”方法来设置自定义序列化程序或像以前一样获得本机编码?
所以这是3.0.0后报告的问题https://github.com/spring-cloud/spring-cloud-stream/commit/74aee8102898dbff96a570d2d2624571b259e141.它已经解决,几天后将在3.0.1.RELEASE(Horsham.SR1)中提供。
因此,我实现了一个自定义SerDe,它从Confluent提供的扩展到每当与模式注册表通信超时时,都会尝试重试。我已将Spring Cloud Streams Kafka binders配置为默认使用: 今天我在日志中看到了这个错误: 这告诉我Kafka Streams使用的SerDe不是我上面定义的SerDe,而是基类SpecificAvroSerde(它包装SpecificAvroSerial
我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。 我按照链接中的示例,尝试了下面的代码。 应用属性 哈格。JAVA Transporter.java EGSRC处理器。JAVA
我们正在使用Avro输入/输出记录通过Spring Cloud Stream功能支持测试Kafka Streams的使用,但设置和,以便在执行Avro转换的地方使用自定义的。 默认的 和值的 。 当我们只使用KStream到KStream函数时,一切都没问题,例如: 但是当我们尝试一个稍微复杂一点的例子,涉及一个像这样的输入KTable: ( 类只有两个成员:) 收到第一条记录时,将引发此异常:
我有一个rest postendpoint,它使用Spring Cloud Stream Kafka绑定器来使用数据和写到Kafka。现在我们没有任何错误处理到位。但是我们希望通过在没有将数据写入Kafka时添加一个额外的检查来使这个endpoint容错。当数据没有写入Kafka时,我们打算发送一个异常信息对象。我试图用这种方法使用全局错误来实现这一点 我的怀疑有两个方面: 当我们写入Kafka的
我们在Spring Boot的基础上开发了一个内部公司框架,我们希望通过Spring Cloud Stream支持Kafka Streams。我们需要自动向所有出站消息注入一些头。我们通过标准的Spring Cloud Stream Kafka Binder注册了一个定制的,实现了这一点,但这不适用于Kafka Streams,因为它们似乎遵循不同的路径。 对于Spring Cloud Strea
我们有一个Kafka过程,在这个过程中,我们消费来自一个主题的消息,然后进行一些充实,然后我们将消息发布到另一个主题。以下是事件 消费者-消费信息 我正在使用Spring Cloud kafka活页夹版本3.0.0-RELEASE,事情进展顺利。最近我们引入了幂等生产者,并包含了transactionIdPrefix属性,我们观察到我们开始出现性能问题。以下是统计数据。 在transactionI