当前位置: 首页 > 知识库问答 >
问题:

如何在春云流kafka中存储发送到主题失败的数据

强宾白
2023-03-14

我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka

brokers:  localhost:9092,localhost:9093,localhost:9094.

我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告

2020-10-06 11:44:20.840  WARN 2906 --- [ad | producer-2] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-2] Connection to node 0 (/192.168.1.78:9092) could not be established. Broker may not be available. 

有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动时从临时存储恢复写入主题?

共有1个答案

谷涵容
2023-03-14

您可以使用创建器用于将数据发送到群集的内部缓冲区。KafkaProducer 有一个隐藏式队列和一个专用的 I/O 线程,用于将数据实际发送到群集。

结合生产者配置重试(默认设置为0),您可能需要增加buffer.memory,如下所述

生产者可以用来缓冲等待发送到服务器的记录的内存总字节。如果记录发送的速度快于它们传递到服务器的速度,生产者将阻止

此设置应大致对应于生产者将使用的总内存,但不是硬限制,因为并非生产者使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用压缩)以及维护飞行中的请求。

然而,我认为让生产者自己处理集群的完全故障通常不是一个好主意。Kafka本身就是为了应对个别经纪人的失败而设计的,但如果你的所有经纪人同时无法控制地倒闭,你可能会遇到比丢失个别生产者的一些数据更大的问题。

如果在一段时间内只有一个代理是不可达的,那么什么都不用做,因为Kafka会在内部将主题的分区领导者切换到另一个代理(当然,如果分区是复制的)。

 类似资料:
  • 在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1

  • 当一个DLQ被设置为一个Spring云流Kafka消费者时,DLQ写入的主题可以被分区吗?我有一个要求,使密钥等于一个特定的字段,我想知道这将如何与Spring云流。

  • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码

  • 我有一个Kafka消费者。如果消费者未能阅读任何信息,我需要将其发送到死信主题。我使用的是Spring cloud Kafka stream,我在这样的配置中启用了DLQ。 但我的常规消费者话题与DLQ话题不同。有可能做到这一点吗?如果是,你能指导我完成配置吗?

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,