我想使用ErrorHandlingDeserializer来处理反序列化错误,并使用spring cloud stream binder kafka将错误记录发送到kafka binder的默认DLQ主题。
spring:
application:
name: my-service
cloud:
## ====================================================== Vault Setting Omitted ====================================================
vault:
## ====================================================== Vault Setting Omitted ====================================================
schemaRegistryClient:
enabled: true
endpoint: my-schema-registry.com
basic:
auth:
# user: This property is resolved from Vault.
# password: This property is resolved from Vault.
stream:
kafka:
heartbeat:
interval:
ms: 1000
binder:
brokers: my-kafka-broker.com:9094
## =============================================== SerDes ====================================================
consumer-properties:
allow.auto.create.topics: false
specific.avro.reader: true
schema.registry.url: ${spring.cloud.schemaRegistryClient.endpoint}
auto.register.schemas: false
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: ${spring.cloud.schemaRegistryClient.basic.auth.user}:${spring.cloud.schemaRegistryClient.basic.auth.password}
producer-properties:
allow.auto.create.topics: false
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.reflection: true
schema.registry.url: ${spring.cloud.schemaRegistryClient.endpoint}
auto.register.schemas: false
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: ${spring.cloud.schemaRegistryClient.basic.auth.user}:${spring.cloud.schemaRegistryClient.basic.auth.password}
## =========================================== Kafka Security Omitted ================================================
configuration:
## =========================================== Kafka Security Omitted ================================================
bindings:
my-in-0:
consumer:
enableDlq: true
autoCommitOnError: true
configuration:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.key.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
default:
binder: kafka
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
function:
definition: myFunction
bindings:
my-in-0:
binder: kafka
destination: my-in-topic
content-type: application/*+avro
group: myGroup
my-out-0:
binder: kafka
destination: my-out-topic
content-type: application/*+avro
group: myGroup
禁用绑定中的dlq设置,并使用配置了死信发布恢复器的SeekTocurErrorHandler配置侦听器容器(使用ListenerContainerCustomizer bean)。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-信件
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。
我正在努力实现以下目标: 使用spring cloud Stream2.1.3.Release with Kafka binder将消息发送到输入通道,并实现发布订阅行为,其中每个消费者都将得到通知,并且能够处理发送到Kafka主题的消息。 我明白,在Kafka,如果每一个消费者都属于自己的消费者群体,就能从一个话题中读到每一条信息。在我的案例中,spring为我运行的spring boot应用程
我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如
如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!
在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1