在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。
主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1 o.s.i.h.日志处理程序:250-org.springframework.messaging。MessageHandlingException:消息处理程序[org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder中发生错误$1@49abe531]; 嵌套异常为java.lang.RuntimeException:失败,failedMessage=GenericMessage
这是应用程序.yml
spring:
cloud:
stream:
bindings:
process-in-0:
destination: TOPIC
group: groupID
kafka:
binder:
brokers:
- xxx:9092
configuration:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
jaas:
loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
options:
username: username
password: pwd
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
process-in-0:
consumer:
configuration:
basic.auth.credentials.source: USER_INFO
schema.registry.url: registryUrl
schema.registry.basic.auth.user.info: user:pwd
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
max-attempts: 1
dlqProducerProperties:
configuration:
basic.auth.credentials.source: USER_INFO
schema.registry.url: registryUrl
schema.registry.basic.auth.user.info: user:pwd
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
deserializationExceptionHandler: sendToDlq
ackEachRecord: true
enableDlq: true
dlqName: TOPIC_DLT
autoCommitOnError: true
autoCommitOffhtml" target="_blank">set: true
我使用以下依赖关系:
Spring云依赖 - 2021.0.1 Spring
启动启动器父级 - 2.6.3
Spring云流绑定器Kafka卡模式注册表客户端 - 5.3.0 卡
夫卡序列化程序 - 5.3.0
我不知道我到底错过了什么。
在经历了大量的留档后,我发现为了让Spring完成发布DLQ的工作,我们需要为原始主题和DLT主题拥有相同数量的分区。如果做不到,那么我们需要将dlq分区设置为1或手动提供Dlq分区函数bean。通过提供dlq分区:1,所有消息都将转到分区0。
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动
应用程序A写入用户对象(json)下面的Kafka主题: 应用程序 B 正在尝试使用此用户对象(用户.java驻留在应用程序 B 项目中)与以下 application.yml(多个绑定器): 下面是我的Spring Cloud Stream处理器类的外观: 但是,它没有将Message有效负载转换为“用户”类,而是不断抛出错误,未找到 父类“User事件”。 有什么想法吗?
我是Spring云流的新手。我正在使用兔子MQ粘合剂。问题是当我在主类中使用单个 bean 启动应用程序时,它就成功启动了。但是,如果我在主类中注册了 1 个以上的 bean,则应用程序无法从以下日志启动。 application.yml文件