当前位置: 首页 > 知识库问答 >
问题:

kafka客户端正在向代理发生故障的分区发送请求

闽涵蓄
2023-03-14

我正在使用kafka节点模块向kafka发送消息。在集群环境中,我有一个主题,其中有3个分区,复制因子为3。

主题描述是-

Topic:clusterTopic      PartitionCount:3        ReplicationFactor:3    Configs:min.insync.replicas=2,segment.bytes=1073741824
        Topic: clusterTopic     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: clusterTopic     Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
        Topic: clusterTopic     Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 1,2,3

生产者配置 -

        "requireAcks": 1,
        "attributes": 2,
        "partitionerType": 2,
        "retries": 2

当我发送数据时,它遵循分区类型作为循环(2)像轮循机制一样

当我按照以下步骤操作时

  • 获取连接到Kafka:9092,Kafka:9093的高级处理器实例
  • 发送消息
  • 手动停止Kafka服务器:9092
  • 尝试使用高级处理器发送另一条消息,send() 将触发回调并显示错误:超时错误:请求在 30000 毫秒后超时

我期望的是,如果一个分区不可访问(代理关闭时),生产者应该自动将数据发送到下一个可用分区,但由于异常,我正在丢失消息

例外情况如下-

  TimeoutError: Request timed out after 3000ms
    at new TimeoutError (\package\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.timeoutId._createTimeout [as _onTimeout] (\package\node_modules\kafka-node\lib\kafkaClient.js:980:14)
    at ontimeout (timers.js:424:11)
    at tryOnTimeout (timers.js:288:5)
    at listOnTimeout (timers.js:251:5)
    at Timer.processTimers (timers.js:211:10)
(node:56416) [DEP0079] DeprecationWarning: Custom inspection function on Objects via .inspect() is deprecated
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +0ms

共有1个答案

焦博实
2023-03-14

请发送引导服务器进行确认,但根据手头的信息,我相信您正在经历的情况如下:

  • 你min.insync.replicas设置为2
  • 您将acks设置为1

通过这些设置,生产者将把事件发送到领导者副本,并假定消息是安全的。

如果在发送后立即失败,并且在关注者赶上之前,您将丢失消息,因为您只等待一个ack。

但是,从代理的角度来看,您指定主题可用的要求是 2 个同步副本。默认情况下,仅允许在同步副本中被选为领导者。由于第一个主题的失败将导致关注者不同步,因此您的主题可能会被迫脱机。您可以在测试中验证这一点,它假设了一些设置。

要纠正,请尝试以下操作:

  1. 如果高可用性是最重要的,请设置min.insync。副本到1,acks到1
  2. 如果数据丢失不可接受,请设置min.insync。复制副本到2,备份到所有

您还可以将unclean.leader.election.enable设置为true以获得高可用性,因为这将允许不同步的副本被选为领导者,但也有可能丢失数据。

 类似资料:
  • 我正在使用 kafka模板向 kafka 主题发送消息。我遇到了一个要求,如果将消息发送到 kafka 主题时出现故障,那么我应该重试在具有相同偏移量的同一分区上发送消息。请帮助如何使用Kafka模板实现这一点?

  • 我使用的是Apache Artemis V2.12.0,在两个VM中启动了两个broker实例 broker.xml(myhost1)[myhost2的broker.xml与此类似,只是我使用的端口是61616] 步骤2:java客户机开始向代理发送消息 步骤3:从myhost1的控制台,我看到推送到队列中的消息 步骤4:停止myhost1中的代理实例 客户端代码执行日志消息:当客户端启动时,my

  • 我在firefox web Browser中使用Rest Client add on。我想测试一个处理HTTP POST请求并使用JSON的web服务。我如何使用Rest Client测试它? 如果在请求正文中添加json,将得到一个*HTTP 415不受支持的媒体类型错误*。 这样做的正确方法是什么?

  • 我有这个代码: 我一直在犯这样的错误: java:不兼容的类型:com。应用句子分类请求。无法将生成器转换为com。应用句子分类请求 我已经使用Maven插件生成了gRPC Java文件。在看了多个例子后,我不确定我的问题是什么。

  • 我希望在调试这个问题时得到一些帮助。如果我将以下JSON发送到后端,它将正常工作: 但是,如果我现在发送以下内容: 我得到了上面的错误。在我的后端代码中,我有以下代码: API 方法签名如下所示: 我认为我遇到的问题是JSON不能被解析为ZonedDateTime。有人对以下两个问题有什么建议吗?( json字符串格式ZonedDateTime自动接受什么时间,( 2)如何创建DTO来解析zone

  • 我有一个ASP.NET Core1.0Web API应用程序,并试图弄清楚如果我的控制器调用的函数出错,如何将异常消息传递给客户端。 我确实看到了一些使用的文档,但是为了使用它,我必须安装compat shim。在Core1.0中有没有一种新的方法来做这些事情? 这是我一直在尝试的垫片,但它不起作用: 当抛出时,我查看客户端,在内容中找不到我正在发送的消息。