Kafka在线动态调整topic分区partition

慕河
2023-12-01

我们在创建kafka topic的时候有时候会对topic的分区进行调整,调整topic分区可以用kafka自带的脚本kafka-topics.sh

kafka-topics.sh可以创建 删除 修改分区的一些属性

# ./kafka-topics.sh 
[2019-11-28 16:07:05,424] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                             cleanup.policy                        
                                             compression.type                      
                                             delete.retention.ms                   
                                             file.delete.delay.ms                  
                                             flush.messages                        
                                             flush.ms                              
                                             follower.replication.throttled.       
                                           replicas                             
                                             index.interval.bytes                  
                                             leader.replication.throttled.replicas 
                                             max.message.bytes                     
                                             message.format.version                
                                             message.timestamp.difference.max.ms   
                                             message.timestamp.type                
                                             min.cleanable.dirty.ratio             
                                             min.compaction.lag.ms                 
                                             min.insync.replicas                   
                                             preallocate                           
                                             retention.bytes                       
                                             retention.ms                          
                                             segment.bytes                         
                                             segment.index.bytes                   
                                             segment.jitter.ms                     
                                             segment.ms                            
                                             unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option).                    
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting       
                                           topics, the action will only execute 
                                           if the topic exists                  
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist         
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected                     
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being created.
--topic <String: topic>                  The topic to be create, alter or       
                                           describe. Can also accept a regular  
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--zookeeper <String: hosts>              REQUIRED: The connection string for    
                                           the zookeeper connection in the form 
                                           host:port. Multiple hosts can be     
                                           given to allow fail-over.

 

上面是kafka的一些帮助文档

查看topic的详细信息

./kafka-topics.sh --describe --zookeeper x.x.x.x:2181 --topic test

Topic:test    PartitionCount:10    ReplicationFactor:3    Configs:
    Topic: test    Partition: 0    Leader: 1    Replicas: 1,3,4    Isr: 1,3,4
    Topic: test    Partition: 1    Leader: 2    Replicas: 2,4,0    Isr: 0,2,4
    Topic: test    Partition: 2    Leader: 3    Replicas: 3,0,1    Isr: 0,1,3
    Topic: test    Partition: 3    Leader: 4    Replicas: 4,1,2    Isr: 1,2,4
    Topic: test    Partition: 4    Leader: 0    Replicas: 0,2,3    Isr: 0,2,3
    Topic: test    Partition: 5    Leader: 1    Replicas: 1,4,0    Isr: 1,4,0
    Topic: test    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: test    Partition: 7    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: test    Partition: 8    Leader: 4    Replicas: 4,2,3    Isr: 4,2,3
    Topic: test    Partition: 9    Leader: 0    Replicas: 0,3,4    Isr: 0,3,4
 

从上面信息可以看出 test是一个3副本 10分区的topic

修改分区数为13

# ./kafka-topics.sh --alter --zookeeper x.x.x.x:2181 --topic test --partitions 13

[2019-11-28 16:19:23,703] INFO Topic update Map(test-6 -> Vector(2, 0, 1), test-11 -> ArrayBuffer(2, 0, 1), test-10 -> ArrayBuffer(1, 4, 0), test-0 -> Vector(1, 3, 4), test-7 -> Vector(3, 1, 2), test-5 -> Vector(1, 4, 0), test-8 -> Vector(4, 2, 3), test-1 -> Vector(2, 4, 0), test-4 -> Vector(0, 2, 3), test-9 -> Vector(0, 3, 4), test-3 -> Vector(4, 1, 2), test-12 -> ArrayBuffer(3, 1, 2), test-2 -> Vector(3, 0, 1)) (kafka.zk.AdminZkClient)
Adding partitions succeeded!

查看修改后的状态

./kafka-topics.sh --describe --zookeeper x.x.x.x:2181 --topic test

Topic:test    PartitionCount:13    ReplicationFactor:3    Configs:
    Topic: test    Partition: 0    Leader: 1    Replicas: 1,3,4    Isr: 1,3,4
    Topic: test    Partition: 1    Leader: 2    Replicas: 2,4,0    Isr: 0,2,4
    Topic: test    Partition: 2    Leader: 3    Replicas: 3,0,1    Isr: 0,1,3
    Topic: test    Partition: 3    Leader: 4    Replicas: 4,1,2    Isr: 1,2,4
    Topic: test    Partition: 4    Leader: 0    Replicas: 0,2,3    Isr: 0,2,3
    Topic: test    Partition: 5    Leader: 1    Replicas: 1,4,0    Isr: 1,4,0
    Topic: test    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: test    Partition: 7    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
    Topic: test    Partition: 8    Leader: 4    Replicas: 4,2,3    Isr: 4,2,3
    Topic: test    Partition: 9    Leader: 0    Replicas: 0,3,4    Isr: 0,3,4
    Topic: test    Partition: 10    Leader: 1    Replicas: 1,4,0    Isr: 1,4,0
    Topic: test    Partition: 11    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: test    Partition: 12    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
已经变为13个分区了

扩容是没问题了 我们试试能不能减少分区数,我们把13改成10

./kafka-topics.sh --alter --zookeeper x.x.x.x:2181 --topic test --partitions 13

[2019-11-28 16:22:12,329] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic test currently has 13 partitions, 10 would not be an increase.
[2019-11-28 16:22:12,607] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test currently has 13 partitions, 10 would not be an increase.
报错了,报错可看出,只能增加

 

 

 类似资料: