当前位置: 首页 > 知识库问答 >
问题:

如何从Java代码中删除Kafka主题的消费组?

邴英毅
2023-03-14

我有一个Kafka主题T。

用户可以选择创建新条目A、B、C或删除它们。在创建每个条目A、B或C时;我将为kafka主题T创建一个同名的消费者组。

在删除条目A、B、C时;我想删除相应的消费者群体,而不会以任何方式影响主题T——但我想从Java模块中实现这一点。这怎么可能呢?

共有2个答案

年健
2023-03-14

我没有足够的意见来评论,但如果你只是想删除一个消费者组,唯一的方法是删除zookeeper条目< code >/consumers/[group _ id]。这里有一个链接,应该会给你指出正确的方向:在zookeeper中删除一个Kafka消费群

郭元明
2023-03-14

目前,只有有限的JavaAPI。正在讨论添加JavaAdminClient(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-117:为Kafka管理操作添加公共AdminClient API)

对于基于Zookeeper的消费者群体(即v0.8的老消费者),您可以查看ConsumerGroupCommand的代码——这是Scala代码,但您仍然可以从Java调用它。

对于基于代理的消费者组(即,从v0.9开始的新消费者),您根本无法删除消费者组(目前,需要WIP来添加对此的支持)。但是,如果消费者组不再活跃,经纪人会自动删除它们。您可以通过broker设置<code>offsets.retention配置此“延迟”。分钟(如果一个组的所有提交偏移量都被删除,则该组将被删除)。

 类似资料:
  • 因此,我是使用Apache Kafka的新手,我正在尝试创建一个简单的应用程序,以便我可以尝试更好地理解API。我知道这个问题在这里被问了很多,但是我该如何清除存储在主题上的消息/记录? 我看到的大部分回答都是说更改消息保留时间或者删除

  • 距今已过去数小时,话题仍未删除。 我看到了一些建议,建议我将放在我的中,然后重新启动Kafka。我试过这个。没奏效。 (为什么默认不设置这个?) 我可以关闭kafka和zookeeper,运行,然后再次启动zookeeper和kafka。但这是相当激烈的。确实应该有一些方法来说服实际上删除一个主题?

  • 向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?

  • 我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?

  • 我们执行以下步骤以删除主题-hgpo.llo.prmt.processed 但即使在12小时后,主题文件夹仍未从/var/kafka/kafka-logs中删除 注意-我们set-delete.topic.enable=true 在/var/kafka/kafka-logs下,我们有许多主题文件夹,如: ..