我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。
在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
consumer:
dlqName: input-topic-dlq
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092
我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,则将创建该主题。
如果我尝试从DLQ主题消费,我会收到如下错误:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic-dlq --property print.value=true --property print.key=true --from-beginning
[2021-03-19 10:17:09,936] WARN [Consumer clientId=consumer-console-consumer-85295-1, groupId=console-consumer-85295] Error while fetching metadata with correlation id 2 : {input-topic-dlq=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
此时,当我查询Zookeeper ls/brokers/topics时,我看到创建的Topic。
现在,我尝试将非JSON消息发布到输入主题(我的默认反序列化器是JSON)。
但我看不到任何消息中的ininting-Topite-dlq主题创建。
奇怪的是,我可以在默认的“error.input topic dlq.appId”主题中看到消息。
我做错了什么吗?
我设法弄清楚了。Spring Cloud Stream Kafka Streams Binder的当前文档中似乎有一个拼写错误。
绑定的目标应该在spring.cloud.streams上。绑定
级别类似于您已经拥有的,但特定于实现的使用者属性应该在spring.cloud.streams.kafka.streams上。绑定
级别。
因此,您的配置应如下所示:
spring:
cloud:
stream:
function:
definition: processor
bindings:
processor-in-0:
destination: input-topic
processor-out-0:
destination: output-topic
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
bindings:
processor-in-0:
consumer:
dlqName: input-topic-dlq
configuration:
metrics.recording.level: DEBUG
brokers:
- localhost:9092
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。
我想在我的spring boot项目中使用Kafka Streams实时处理。所以我需要Kafka Streams配置,或者我想使用KStreams或KTable,但我在互联网上找不到示例。 我做了制作人和消费者现在我想流实时。
我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流 在日志中,我得到了这个: 2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:0
现在我正在尝试用kafka创建消息服务功能以使用< code > spring-cloud-stream-bind-Kafka ,但效果不太好。 Spring罩1.4.2 当我使用此错误日志启动项目时失败 我在怀疑我的春靴版本。这么低配的版本。< br >我认为< code > spring-cloud-stream-binder-Kafka 在spring boot 2.0版本下无法使用或者其他
当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。