当前位置: 首页 > 知识库问答 >
问题:

如何替代/过渡一个Kafka消费群体?

罗心思
2023-03-14

我有一个包含多个Kafka作品的资源库。我想将其中一个流提取到它自己的存储库中。但是,我不确定如何处理那个流的消费群体。我的意思是:在新的存储库中,流将有一个不同的< code>application.id。据我理解,消费者组的名称是基于< code>application.id设置的。如果我简单地关闭旧流,对于每个主题的每个分区,新流将从第零个偏移量开始,而不是从旧流停止的偏移量开始。这将导致输出主题中出现重复的消息。

是否有一些关于如何处理此问题的一般规则/最佳实践?我是否需要:

    < li >关闭旧流; < li >检查每个主题的每个分区的旧流的使用者组偏移量; < li >“告诉”新流从各自的偏移量开始。我怎么“讲”:)它?我的意思是——如果我使用< code > Kafka-console-consumer ,有< code>partition和< code>offset选项,但是流在后台启动自己的consumer,所以我不确定如何控制它。

流使用了相当多的输入主题(大约20个,幸运的是每个主题都有一个分区),因此我不确定如何准确地处理这个问题。

共有1个答案

梅安平
2023-03-14

(0)如果可能,我建议尝试保留application.id,所有问题都会消失。在这种情况下,您可以先停止旧应用程序,然后启动新应用程序,甚至可以先启动所有新实例,然后拆除所有旧实例。由于新旧应用程序都连接到同一个Kafka集群,因此将进行无缝切换。

(1) 只有当Kafka Streams应用程序是无状态的时,您想做的事情才有效;对于有状态的应用程序,需要更多的步骤来将状态传递到新的应用程序实例。或者,新实例将以空状态开始(根据应用程序的要求,这也可以)。

(2)首先,您需要停止所有旧的应用程序实例,并接收旧的< code>application.id的每个输入主题分区的最后提交的偏移量(通过< code > bin/Kafka-consumer-groups . sh 使用< code> - describe - group选项),然后,您需要使用新的< code>application.id提交这些偏移量(同样,您可以使用< code > bin/Kafka-consumer-groups . sh 使用< code> - to-offset选项)。(有关https://cwiki.apache.org/confluence/display/KAFKA/KIP-122:添加重置用户组偏移工具的详细信息)之后,您可以启动新的应用程序实例,该实例将为新的< code>application.id获取提交的偏移。

 类似资料:
  • 我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable

  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • 当我只打开一次处理时,我会得到以下错误。注意:我们的应用程序非常安全,我们只允许Kafka用户和消费者访问他们明确需要的资源。 只有一次处理kafka流是否在所有流任务中使用每个流任务的消费者组而不是消费者组?

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了

  • 我有两个Kafka集群说A和B,B是A的复制品。仅当 A 关闭且相反,我才希望使用来自集群 B 的消息。然而,使用来自两个集群的消息会导致重复的消息。那么,有什么办法可以将我的 kafka 使用者配置为仅从一个集群接收消息。 谢谢-

  • 嗨,我正在使用KafkaCLI,以清楚地了解Kafka的工作原理。我对消费者群体感到困惑。我用三个分区创建了主题。我将创建producer,为主题提供一些数据。第一次我添加了一些数据,如下所示。 现在我的理解是user1、user2、user3会随机到三个不同的分区。 创建消费群时,如下所示。 这将给我所有的user1、user2、user3。 现在,在一个消费者组中,我可以有许多消费者。如果消费