当前位置: 首页 > 知识库问答 >
问题:

Kafka:使用java更改特定主题的分区数

潘琨
2023-03-14

我是Kafka的新手,与新的KafkaProducer和KafkaConsumer合作,版本:0.9.0.1

在java中是否有任何方法可以在创建特定主题后更改/更新该主题的分区数。

我不是用zookeeper来制造话题。当发布请求到达时,我的KafkaProducer会自动创建主题。

共有1个答案

秦宏盛
2023-03-14

是的,有可能。您必须访问kafka2.11-0.9.0.1.jar中的Adminutilsscala类才能添加分区。

adminutils支持主题中的分区数只能增加。类路径中可能需要kafka2.11-0.9.0.1.jarzk-client-0.8.jarscala-library-2.11.8.jarscala-parser-combinators2.11-1.0.4.jarjar。

下面的部分代码借鉴了kafka-cloudera示例。

package org.apache.kafka.examples;

import java.io.Closeable;

import org.I0Itec.zkclient.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode.Enforced$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class Test {

    static final Logger logger = LogManager.getLogger();

    public Test() {
        // TODO Auto-generated constructor stub
    }

    public static void addPartitions(String zkServers, String topic, int partitions) {

        try (AutoZkClient zkClient = new AutoZkClient(zkServers)) {
            ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

            if (AdminUtils.topicExists(zkUtils, topic)) {
                logger.info("Altering topic {}", topic); 
                try {
                    AdminUtils.addPartitions(zkUtils, topic, partitions, "", true, Enforced$.MODULE$);
                    logger.info("Topic {} altered with partitions : {}", topic, partitions); 
                } catch (AdminOperationException aoe) {
                    logger.info("Error while altering partitions for topic : {}", topic, aoe); 
                } 
            } else {
                logger.info("Topic {} doesn't exists", topic); 
            } 
        } 
    }

    // Just exists for Closeable convenience 
    private static final class AutoZkClient extends ZkClient implements Closeable { 

        static int sessionTimeout = 30_000;
        static int connectionTimeout = 6_000;

        AutoZkClient(String zkServers) {
            super(zkServers, sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$);
        }
    }

    public static void main(String[] args) {

        addPartitions("localhost:2181", "hello", 20);
    }
}
 类似资料:
  • 问题内容: 我是Kafka的新手,正在使用新的KafkaProducer和KafkaConsumer,版本:0.9.0.1 在创建特定主题之后,java中是否有任何方法可以更改/更新特定主题的分区数。 我没有使用Zookeeper创建主题。当发布请求到达时,我的KafkaProducer会自动创建主题。 如果还不够,我还可以提供更多详细信息 问题答案: 是的,有可能。您必须在中访问scala类以添

  • Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。

  • 简单问题: 假设我有一个具有3个分区的主题:Topic:StateEvents P1、P2和P3。 让我们假设生产者生成20条消息: 1, 2, 3, ..........20 我的问题是: 当制作人生成这些消息时: 1)每个消息将只在且仅在1个分区?也就是说,1在P1,2在P2,3在P3,然后4在P1,5在P2,6在P3,以此类推? 2)如果#1为真,当消费者订阅时,它将订阅所有分区,以便获得所

  • 我创建了一个包含3个分区的主题 我使用Java制作人同步写入主题 我有一个Java的用户订阅并阅读它 我的键总是一组固定的3个不同的字符串(k1、k2、k3)。但是我的消息总是去分区1或分区2——k1和k2去分区1,k3去分区2。 为什么分区0未使用?

  • 通过Kafka文档和各种其他资源,我了解到Kafka中的消息被组织成主题。此外,主题可以分解为多个分区,每个分区可以托管在不同的服务器上。这提供了冗余和可伸缩性。 我不确定这里的“破碎”这个词是什么意思。这是否意味着,如果添加到主题的消息是,例如“1 2 3 4 5 6 7”,那么在将其分解为分区后,我们将有一个分区仅包含整个主题的子部分。就像一个分区有“1 2 3”,而另一个分区有“4 5 6”

  • 我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p