当前位置: 首页 > 知识库问答 >
问题:

当kafka代理重新启动时,队列中的所有消息都无法传递到代理

昝光临
2023-03-14

我正在为Kafka工作客户:librdkafka。图书馆在这里https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp.我的程序正在向代理写入2000000条消息。在此过程中,我重新启动了代理。有时,没有消息无法传递到代理。有时,大约100000条消息未能传递到代理。队列缓冲。最大消息数=100000。似乎out队列中的所有消息都丢失了?错误是RdKafka::消息传递报告:本地:未知分区。

我发现了新的问题:(1)有时,大约有200条消息被发送到代理两次。(2) 有时,已经向代理发送了一条消息,但是调用了dr_cb()。它告诉我该消息未能传递给代理。我想弄清楚这是经纪人的问题还是客户的问题。有人有类似的问题吗?事实上,我需要客户端和代理服务器之间可靠的传输和交付报告。我正在考虑使用C客户端。不确定这个问题是否会再次发生。。。

经纪人的日志是:

[2015-07-21 17:48:33,471]INFO 0成功当选为领导者(kafka.server.ZookeeperHeaderVotor)

[2015-07-21 17:48:33717]信息新领导者为0(kafka.server.zookeperLeaderElector$LeaderChangeListener)

[2015-07-21 17:48:33,718]处理请求时的错误[KafkaApi-0]错误Name: TopicMetadataRequest; Version: 0;相关ID: 5017; ClientId: rdkafka; Topics: test(kafka.server.KafkaApis)kafka.admin.Adminop异常:复制因子:比可用代理大1:0在kafka.admin.AdminUtils$.任务复制库(AdminUtils.scala:70)在kafka.admin.AdminUtils$. createTopic(AdminUtils.scala:171)在kafka.server.KafkaApis$$anonfuny$19.apply(KafkaApis.scala:520)在kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)在scala.collection.Traversablelike$$anonfun$map$1.apply(TraversableLike.scala:194)在scala.collection.Traversablelike$$anonfun$map$1.apply(TraversableLike.scala:194)在scala.collection.immutable.设置$Set1.foreach(Set. scala: 86)在scala.集合。Traversablelike$class. map(Traversablelike. scala: 194)在scala. Collection. immutable。设置$Set1. scala$集合$Setlike$$超级$地图(Set. scala: 73)在scala.集合。在scala. Collection. immutable上设置像$class. map(Setlike. scala: 93)。在kafka. server上设置$Set1. map(Set. scala: 73)。kafkaApis. getTopicMetadata(KafkaApis. scala: 503)在kafka. server.kafkaApis. handleTopicMetadataRequest(KafkaApis. scala: 542)at kafka. server.在kafka. server上处理(KafkaApis. scala: 62)。在java. lang运行(KafkaResestHandler. scala: 59)。线程.运行(线程. java: 745)

[2015-07-21 17:48:33743]信息注册经纪人0,路径为/brokers/ids/0,地址为cyclops-9803:9092。(kafka.utils.ZkUtils$)

[2015-07-21 17:48:33,759]INFO[Kafka Server 0],启动(kafka.server.KafkaServer)

[2015-07-21 17:48:33,803]INFO关闭到/127.0.0.1的套接字连接。(kafka.network.处理器)

[2015-07-21 17:48:33858]信息[broker 0上的ReplicaFetcherManager]删除了分区[test,0]的获取程序(kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34000]信息[broker 0上的ReplicaFetcherManager]删除了分区[test,0]的获取程序(kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34017]信息关闭与/127.0的插座连接。0.1. (Kafka.网络.处理器)

我的生产者配置是:

client.id=rdkafka

元数据。经纪人list=localhost:9092

message.max.bytes=4000000

接收消息最大字节数=100000000

元数据。要求超时。ms=900000

topic.metadata.refresh.interval.ms=-1

话题元数据。刷新快速的cnt=10

topic.metadata.refresh.fast.interval.ms=250

话题元数据。刷新稀疏=假

socket.timeout.ms=300000

socket.send.buffer.bytes=0

socket.receive.buffer.bytes=0

插座保持活力。启用=错误

插座最大故障数=10

broker.address.ttl=300000

经纪人住址家庭=任何

统计数字间隔ms=0

错误\u cb=0x5288a60

stats_cb=0x5288ba0

log_cb=0x54942a0

log_level=6

socket_cb=0x549e6c0

打开\u cb=0x54acf90

不透明=0x9167898

internal.termination.signal=0

queued.min.messages=100000

排队。最大消息数。千字节=1000000

fetch.wait.max.ms=100

fetch.message.max.bytes=1048576

取来最小字节数=1

取来错误退避。ms=500

queue.buffering.max.messages=100000

队列缓冲。最大ms=1000

message.send.max.retries=10

重试。退避。ms=100

压缩。编解码器=无

一批num.messages=1000

传送汇报只有错误=真

要求必修的。acks=1

enforce.isr.cnt=0

request.timeout.ms=5000

消息超时。ms=300000

produce.offset.report=假

汽车犯罪启用=真

汽车犯罪间隔ms=60000

auto.offset.reset=最大

抵消百货商店路径=。

offset.store.sync.interval.ms=-1

offset.store.method文件

消费回拨。最大消息数=0

消费者产出为:

[2015-07-22 20:57:21,052]WARN从代理[id: 0,主机: cyclops-9803,端口: 9092]获取主题元数据,相关id为1,主题[Set(test)]失败(kafka.client.ClientUtils$)java.nio.channels.ClosedChannelExcure

kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

在Kafka。制作人同步制作人。liftedTree1$1(SyncProducer.scala:73)

在kafka.producer.同步roducer.kafka$生产者$同步生产者$$多发(同步roducer.scala:72)

在Kafka。制作人同步制作人。发送(SyncProducer.scala:113)

在kafka.client.ClientUtils$. finchTopicMetadata(ClientUtils.scala:58)

在Kafka。客户ClientUtils$。fetchTopicMetadata(ClientUtils.scala:93)

在kafka.consumer.消费者FetcherManager$领导FinderThread.do工作(消费者FetcherManager.scala:66)

在Kafka。乌提尔斯。可关闭线程。运行(ShutdownableThread.scala:60)

[2015-07-22 20:57:21073]警告[console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread],未能找到集合的引线([test,0])(kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)kafka。常见的KafkaException:从代理[ArrayBuffer(id:0,主机:cyclops-9803,端口:9092)]获取主题[Set(test)]的主题元数据失败

在Kafka。客户ClientUtils$。fetchTopicMetadata(ClientUtils.scala:72)

在Kafka。客户ClientUtils$。fetchTopicMetadata(ClientUtils.scala:93)

在kafka.consumer.消费者FetcherManager$领导FinderThread.do工作(消费者FetcherManager.scala:66)

在Kafka。乌提尔斯。可关闭线程。运行(ShutdownableThread.scala:60)的原因是:java。尼奥。频道。ClosedChannel异常

kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

在Kafka。制作人同步制作人。liftedTree1$1(SyncProducer.scala:73)

在kafka.producer.同步roducer.kafka$生产者$同步生产者$$多发(同步roducer.scala:72)

在Kafka。制作人同步制作人。发送(SyncProducer.scala:113)

在kafka.client.ClientUtils$. finchTopicMetadata(ClientUtils.scala:58)

欢迎提出任何建议。谢谢

共有1个答案

贺栋
2023-03-14

在asyn模式下,客户端应该处理此类问题。不知道如何确保out队列中的消息能够以100%的概率传递给代理。我们可以做的是确保消息在out队列中。如果传递失败,我们应该再次将消息放入队列。如果未能传递,则调用dr_cb()。在此函数中,尝试再次将消息放入out队列。也许这不是最好的办法。但是现在,我用这种方式。

 类似资料:
  • null 为了更具体地说明我试图实现的目标: 推迟发送,直到成功持久化接收到的消息--有效地扩大QoS级别,直到我的订阅应用程序保证消息得到处理--这是一个好的/有效的想法吗? 以及对于例如持久化错误(数据库超时),是否会发送,这将自动导致重新传递此类消息。 最诚挚的问候

  • 问题内容: 我注意到,在一些服务器上重新启动/关闭后,SQL Agent不会自动启动(SQL Server启动正常)。该服务设置为在Windows中自动启动。当我们手动启动SQL代理时,它会正常启动。 问题答案: 在以下路径中添加Windows级用户: 单击开始-运行 -Secpol.msc 转到: 本地策略-用户权限分配 确保您在其下运行SQL Agent Service的用户帐户具有以下分配:

  • 我有一个特定的要求,我需要将消息发送到服务器,而服务器并不总是可用的。 为此,我使用了特定于ActiveMQ的代理网络。 目标是有一个本地应用程序A(仅限生产者),它将消息推送到另一个中央应用程序B(仅限消费者)。然而,网络并不总是可用的。因此,应用程序A的代理必须存储消息并等待连接,然后才能将消息发送到应用程序B。所以基本上,A是一个代理,需要在消息可用时将消息转发给B Broker 的 B 配

  • 基本上,我有一个简单的源/汇图: 源将在我的行中插入代理“CMproduto”,并将从我的数据库中获取代理的特征。然后,我的代理将按条件对象将其处理到相应队列(第1/2/3行)的行进行排序,代理将通过条件逐一退出我的模型。但是,我的数据库可以更改(根据我的其余模拟),因此,我需要销毁所有3个队列中的所有当前代理(以及我的CMproduto种群),并通过“源”重新插入它们,但使用来自我更新的数据库的

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

  • 我将sping-boot(2.1.6.RELEASE)与sping-kafka(2.2.7.RELEASE)一起使用,并且我使用KafkaTemplate向我的kafka集群发送消息。但是有时(通常是当我重新启动kafka代理或进行重新平衡时),我在发送消息时会看到这样的错误: 由于默认的Kafka生产者配置,我期望发送失败重试,但他们没有。默认Kafka生成器配置: 我的配置是这样的: 我发出这