如果我们有一个主题在Kafka,它有5个分区,我们可以增加到30个分区。另外,在增加分区的数量之后,我们按照代理ID的顺序更改每个分区的领导者,并为该特定主题重新平衡集群。我们怎么能那样做?
我发现了它的工作原理。
1)首先查找已有主题的信息
2)找出集群中的所有代理及其ID
提供所有代理ID的列表
3)./kafka-topics.sh--alter--zookeeper localhost:2181--topic dummytopic--partitions 30
增加分区数
开始生成&&topic=dummytopic num_partitions=30brokerid_start=1022 replica_count=3./kafka-resignment-gen
这将生成一个json,我们可以将它用于expand-cluster-reassignment.json。看起来是这样的
{“版本”:1,“分区”:[{“话题”:“dummyTopic”,“分区”:0,“副本”:[1001,1002,1003]},{“话题”:“dummyTopic”,“分区”:1,“副本”:[1002,1003,1004]},{“话题”:“dummyTopic”,“分区”:2,“副本”:[1003,1004,1005]},{“话题”:“dummyTopic”,“分区”:3,“副本”:[1004,1005,1006]},{“话题”:“dummyTopic”,“分区”:4,“副本”:[1005,1006,1007]},{“1017]},{“topic”:“dummytopic”,“partition”:15,“replicas”:[1016,1017,1018]},{“topic”:“dummytopic”,“partition”:16,“replicas”:[1017,1018,1019]},{“topic”:“dummytopic”,“partition”:17,“replicas”:[1018,1019,1020]},{“topic”:“dummytopic”,“partition”:18,“replicas”:[1019,1020,1021]},{“topic”:“
./kafka-reassign-partitions.sh--zookeeper localhost:2181--reassignment-json-file expand-cluster-reassignment.json--execute
这将执行集群重新分配,并将分区领导者更改为您所期望的。
我在Databricks delta中有一个表,它是按< code>transaction_date分区的。我想将分区列更改为< code>view_date。我尝试删除该表,然后使用< code > PARTITIONED BY(view _ date)创建一个新的分区列。 然而,我的尝试失败了,因为实际文件位于S3中,即使我删除了一个配置单元表,分区也保持不变。是否有任何方法可以更改现有Del
可能是Kafka的复制品——该服务器不是该主题分区的领导者,但没有公认的答案,也没有明确的解决方案。 我有一个简单的java程序来向Kafka传达信息: 我得到了以下例外: 当我尝试使用时,我得到以下错误: 当我描述我的主题时,我有以下信息: 我试着创建一个新的主题,并按照《快速入门指南》中提到的那样生成消息,然后上述步骤都很有效。 我应该在或producer configuration(生产者配
我遇到了一件关于Kafka再平衡的奇怪事情。如果我增加某个主题的分区,而该主题是由一些java使用者(在同一个组中)订阅的,则不会发生使用者再平衡。在那之后,我试图通过启动一个新的消费者(或杀死一个消费者)来实现重新平衡,但在这个重新平衡中无法分配新增加的分区。我发现只有在停止所有使用者并启动它们之后,才能分配新分区。我不知道这是正常还是有任何解释。 下面是我在电脑上的测试: 1.启动Kafka,
我需要从一个Hive表中读取数据并将其插入到另一个Hive表中。两个表的架构是相同的。该表按日期分区 步骤1:从Spark的源表中读取数据。 第 2 步:按列(国家、日期)和分区数重新分区为 4。 我只得到每个国家代码1个分区
我正在使用名称:kafka2.12版本:2.3.0。根据流量/负载,我想更改主题的最大分区数。Kafka一上来,有没有可能做这种改变,用代码能做到吗?
在消费者之间重新平衡分区的代价有多大。我期待着每隔几秒钟就有一个新的消费者结束或加入同一个消费者群体。所以我只想知道一个再平衡操作的开销和延迟。 假设使用者C1具有分配给它的分区P1、P2、P3,并且它正在处理来自分区P1的消息M1。现在消费者C2加入了这个群体。C1和C2之间的分区是如何划分的。是否有可能拒绝C1的(可能需要一些时间将其消息提交给Kafka)对M1的提交,而M1将被视为一个新的消