当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect将同一任务分配给多个工作人员

终翔
2023-03-14

我在分布式模式下使用 Kafka Connect。我现在多次观察到的一个奇怪行为是,一段时间后(可能是几个小时,可能是几天),似乎发生了平衡错误:相同的任务被分配给多个工人。因此,它们同时运行,并且根据连接器的性质,失败或产生“不可预测”的输出。

我能够用来重现该行为的最简单配置是:两个 Kafka Connect 工作线程,两个连接器,每个连接器只有一个任务。Kafka Connect 已部署到 Kubernetes 中。Kafka 本身在 Confluent Cloud 中。Kafka Connect 和 Kafka 的版本相同 (5.3.1)。

日志中的相关消息:

工人甲:

[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)

工人 B:

[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-mqtt-source], taskIds=[some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)

在上面的日志提取中,您可以观察到相同的任务(一些-mqtt-Source e-0)被分配给两个工作人员。在此消息之后,我还可以看到两个工作人员上的任务实例的日志消息。

这种行为不依赖于连接器(我在其他任务中也观察到了它)。它也不会在工人开始工作后立即发生,而是在一段时间后发生。

我的问题是,这种行为的原因是什么?

编辑1:我尝试运行3个工作人员,而不是两个,认为这可能是一个分布式共识问题。它似乎不是,拥有3个工作人员并不能解决问题。

编辑2:我注意到,就在工作人员A被分配一个最初在工作人员B上运行的任务之前,该工作人员(B)观察到加入组的错误。例如,如果任务在第N代中“重复”,工作人员B将不会在日志中显示“成功加入第N代组”消息。更重要的是,在第N-1代和第N 1代之间,工作人员B通常会记录错误,如尝试心跳失败,因为成员id组协调器bx-xxx-xxxxx.europe-west1.gcp.confluent.cloud:9092(id: 1234567890机架:null)不可用或无效。工人B通常会在第N代之后不久加入第N 1代(有时只需大约3秒)。现在很清楚是什么触发了该行为。然而:

>

  • 虽然我知道可能存在这样的临时问题,并且在一般情况下它们可能是正常的,但为什么在所有服务器成功加入下一代后重新平衡不能解决问题?尽管随之而来的是更多的恢复 - 它不能正确分配任务,并永远保留“重复项”(直到我重新启动工作线程)。

    在某些时期,再平衡几乎每几个小时发生一次,而在其他时期,每5分钟(精确到几秒)发生一次;原因可能是什么?什么是正常的?

    鉴于我使用的是Confluent云,“组协调器不可用或无效”错误的原因可能是什么,并且是否有任何配置参数可以在Kafka Connect中进行调整,以使其在此错误方面更具弹性?我知道有 session.timeout.msheartbeat.interval.ms,但文档是如此简约,甚至不清楚将这些参数更改为更小或更大的值的实际影响是什么。

    编辑3:我观察到这个问题对接收器任务并不重要:尽管相同的接收器任务被分配给多个工作人员,但相应的消费者被分配到不同的分区,因为他们通常应该这样做,一切都几乎正常工作——我只是得到了比我最初要求的更多的任务。然而,在源任务的情况下,行为是中断的——任务同时运行,并在源端争夺资源。

    编辑4:与此同时,我将Kafka Connect降级到2.2版(融合平台5.2.3)——一个“增量合作再平衡”之前的版本。它在过去的两天里工作正常。所以,我认为这种行为与新的再平衡机制有关。


  • 共有1个答案

    墨寂弦
    2023-03-14

    如评论中所述,Jira Kafka-9184 就是为了解决这个问题而制作的,并且已经解决了。

    该修补程序在2.3.2及更高版本中可用。

    因此,现在的答案是:升级到最新版本应该可以防止出现此问题。

     类似资料:
    • 我目前正在努力设置产品变化作为压缩产品目录的一种手段...我们目前有几千个单独的产品页面和变化,我正在寻找一种方法来压缩每个产品类型到一个单一的变化。销售的产品与车辆相关,因此适合每个车型的年份范围: WooCommerce的罐装可变产品为每种属性组合创造了新的变化。例如,“Body Style A”将创建两个相同的变体,我更希望有一个变体,在“年份”是2002年或2003年,而不是只指定一个值。

    • 假设我正在从S3文件夹中读取100个文件。每个文件的大小为10 MB。当我执行<code>df=spark.read时。parquet(s3路径),文件(或更确切地说分区)如何在任务之间分布?E、 g.在这种情况下,<code>df</code>将有100个分区,如果spark有10个任务正在运行以将该文件夹的内容读取到数据帧中,那么这些分区是如何分配给这10个任务的?它是以循环方式进行的,还是每

    • 因此,我昨天开始了一个问题:基于同一行中的值的多个pandas赋值,我想知道如何对一行数据进行排序,并将排序分配给同一行中的不同列。我已经按照Ed Chum的建议解决了这个问题:如何一次将一个函数应用于pandas数据帧中的多个列。 它确实起作用了,但我注意到我在这一过程中创建了错误的列。一旦我修复了这个bug,它就不再工作了。。。。 因此,我尝试在一个玩具示例上重现这个问题,但在玩具示例上也不起

    • 我有一个Excel工作簿,其中包含36个不同的工作表,我每两周收到一次,工作表在所有标签上都有共同的标题,并且每个标签上都有不同的唯一标题,但每条记录都有一个唯一的ID,可以有多个记录。 我要做的是从所有的工作表中提取唯一的id,然后将每个工作表中的数据提取到一个工作表中,其中包含所有的公共标题和唯一标题。 我正在考虑使用下面帖子中的代码将其导入Access。连接表并将其导出回Excel中的一个工

    • 我正在学习使用可拆分DOFN。我预计我的工作将分配给500名员工,但Dataflow只运行了1或2名员工。我是否错误地理解或实现了可拆分DoFn? 我的beam版本是2.16.0

    • 我有一个numpy数组,一个定义数组中范围的开始/结束索引列表,以及一个值列表,其中值的数量与范围的数量相同。在循环中执行此赋值当前非常慢,因此我想以矢量化的方式将值赋给数组中的相应范围。这可能吗? 这是一个具体的简化示例: <代码>a=np。零([10]) 下面是定义a中范围的开始索引和结束索引列表,如下所示: 这是我想分配给每个范围的值列表: <代码>值=[1、2、3、4] 我有两个问题。首先