当前位置: 首页 > 工具软件 > kafka_ex > 使用案例 >

kafka_2.13-2.7.0安装

何峰
2023-12-01

Kafka集群安装

规划:

cancer01       1

cancer02       2

cancer03       3

 

说明1:在cancer01主机上配置好kafka目录后,复制到其他主机再修改下即可。

说明2:每台主机上都要安装zookeeper,配置好zookeeper集群。

 

解压:

tar -xzvf kafka_2.13-2.7.0.tgz

mv kafka_2.13-2.7.0 /usr/local/kafka

 

更改所有者

chown -R hadoop:hadoop /usr/local/kafka

 

配置环境

vim /etc/profile

export KAFKA_HOME=/usr/local/kafka

export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

 

修改配置文件

(broker.id和host.name每台主机不相同)

vim /usr/local/kafka/conf/server.properties

#broker的全局唯一编号,不能重复

broker.id=1

#用来监听链接的端口,producer或consumer将在此端口建立连接

port=9092

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线程数量

num.io.threads=8

#topic在当前broker上的分片个数

num.partitions=3

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka消息存放的路径

log.dirs=/usr/local/kafka/logs

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘

log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘

log.flush.interval.ms=3000

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#滚动生成新的segment文件的最大时间

log.roll.hours=168

#日志文件中每个segment的大小,默认为1G

log.segment.bytes=1073741824

#周期性检查文件大小的时间

log.retention.check.interval.ms=300000

#日志清理是否打开

log.cleaner.enable=true

#broker需要使用zookeeper保存meta数据

zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181

#zookeeper链接超时时间

zookeeper.connection.timeout.ms=6000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除

delete.topic.enable=true

 

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出错误!

host.name=192.168.31.101

advertised.host.name=192.168.31.101

advertised.listeners=PLAINTEXT://192.168.31.101:9092

default.replication.factor=3

auto.create.topics.enable=true

 

message.max.byte=5242880

replica.fetch.max.bytes=5242880

 

说明:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异常。因为在没有配置advertised.host.name的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置hosts,所以自然是连接不上这个hostname的。当使用java客户端访问远程的kafka时,一定要把集群中所有的端口打开,否则会连接超时

 

vim /usr/local/kafka/conf/producer.properties

metadata.broker.list=192.168.31.101:9092,192.168.31.102:9092,192.168.31.103:9092

vim /usr/local/kafka/conf/consumer.properties

zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181

 

分发到其他主机

scp -r /usr/local/kafka cancer02:/usr/local/

scp -r /usr/local/kafka cancer03:/usr/local/

 

修改其他主机配置

vim /etc/profile

vim /usr/local/kafka/conf/server.properties

 

启动zookeeper

zkServer.sh start

zkServer.sh status

 

启动kafka

kafka-server-start.sh $KAFKA_HOME/config/server.properties 1>/dev/null 2>&1 &

kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

或者

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 1>/dev/null 2>&1 &

kafka-server-start.sh -daemon ../config/server.properties 1>/dev/null 2>&1 &

或者

nohup kafka-server-start.sh ../config/server.properties &>> /usr/local/kafka/kafka.log &

 

验证

创建topic

kafka-topics.sh --create --zookeeper cancer01:2181 --replication-factor 1 --partitions 1 --topic test

查看topic是否创建成功

kafka-topics.sh --list -zookeeper cancer01:2181

 

Kafka使用

创建一个名为“test”的Topic,只有一个分区和备份

kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test

kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --replication-factor 3 --partitions 3 --topic test

命令解析:

--create: 指定创建topic动作

--topic:  指定新建topic的名称

--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--config: 指定当前topic上有效的参数值,参数列表参考文档为: http://kafka.apache.org/082/documentation.html#brokerconfigs

--partitions:指定当前创建的kafka分区数量,默认为1个

--replication-factor:指定每个分区的复制因子个数,默认1个

 

查看已创建的topic信息

kafka-topics.sh --list --zookeeper localhost:2181

 

查看topic描述信息

kafka-topics.sh --describe –zookeeper localhost:2181 --topic test

命令解析:

--describe: 指定是展示详细信息命令

--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--topic:指定需要展示数据的topic名称

 

为topic增加副本

kafka-reassign-partitions.sh -zookeeper cancer01:2181,cancer02:2181,cancer03:2181 -reassignment-json-file json/partitions-to-move.json -execute 

 

为topic增加partition

kafka-topics.sh –zookeeper cancer01:2181,cancer02:2181,cancer03:2181 –alter –partitions 20 –topic test

 

修改Topic信息

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --config max.message.bytes=128000

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --delete-config max.message.bytes

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --partitions 10

kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --partitions 3 ## Kafka分区数量只允许增加,不允许减少

 

删除Topic

默认情况下Kafka的Topic是没法直接删除的,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:

       方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可

      方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除

kafka-topics.sh --delete --topic test --zookeeper cancer01:2181,cancer02:2181,cancer03:2181

kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper cancer01:2181,cancer02:2181,cancer03:2181

 

你可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list   来查看所有topic

此时你若想真正删除它,可以如下操作:

(1)登录zookeeper客户端:命令:./bin/zookeeper-client

(2)找到topic所在的目录:ls /brokers/topics

(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,

如果你删除了此处的topic,那么marked for deletion 标记消失

 

发送消息

Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。

kafka-console-producer.sh --broker-list cancer01:9092,cancer02:9092,cancer03:9092 --topic test

 

消费消息

Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。

kafka-console-consumer.sh --bootstrap-server cancer01:9092,cancer02:9092,cancer03:9092 --topic test --from-beginning

(--from-beginning如果去掉不会出现消费者启动之前的消息)

 

下线broker

kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60

shutdown broker

 

查看consumer组内消费的offset

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --group test --topic test

 

创建topic,备份3份

kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看topic运行情况

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

//所有分区的摘要

Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:

#提供一个分区信息,因为我们只有一个分区,所以只有一行。

Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

#“leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。

#“replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。

#“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader

其中Replicas和Isr中的1,2,0就对应着3个broker他们的broker.id属性

 

干掉leader,测试集群容错

  首先查询谁是leader

kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

//所有分区的摘要

Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:

//提供一个分区信息,因为我们只有一个分区,所以只有一行。

Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

可以看到Leader的broker.id为1,找到对应的Broker

[root@administrator bin]# jps -m

5130 Kafka ../config/server.properties

4861 QuorumPeerMain ../config/zookeeper.properties

1231 Bootstrap start start

7420 Kafka ../config/server-2.properties

7111 Kafka ../config/server-1.properties

9139 Jps -m

查询到Leader的PID(Kafka ../config/server-1.properties)为7111,杀掉该进程

//杀掉该进程

kill -9 7111

//再查询一下,确认新的Leader已经产生,新的Leader为broker.id=0

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3    Configs:

//备份节点之一成为新的leader,而broker1已经不在同步备份集合里了

Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2

再次消费消息,确认消息没有丢失

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

cluster message 1

cluster message 2

消息依然存在,故障转移成功!!

 

Producer生产者

发送消息的方式,只管发送,不管结果:只调用接口发送消息到 Kafka 服务器,但不管成功写入与否。由于 Kafka 是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息

同步发送:调用 send() 方法返回一个 Future 对象,我们可以使用它的 get() 方法来判断消息发送成功与否

异步发送:调用 send() 时提供一个回调方法,当接收到 broker 结果后回调此方法

public class MyProducer {

    private static KafkaProducer<String, String> producer;

    //初始化

    static {

        Properties properties = new Properties();

        //kafka启动,生产者建立连接broker的地址

        properties.put("bootstrap.servers", "127.0.0.1:9092");

        //kafka序列化方式

        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //自定义分区分配器

        properties.put("partitioner.class", "com.imooc.kafka.CustomPartitioner");

        producer = new KafkaProducer<>(properties);

    }

 

    /**

     * 创建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181

     * --replication-factor 1 --partitions 1 --topic kafka-study

     * 创建消费者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092

     * --topic imooc-kafka-study --from-beginning

     */

    //发送消息,发送完后不做处理

    private static void sendMessageForgetResult() {

        ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "ForgetResult");

        producer.send(record);

        producer.close();

    }

    //发送同步消息,获取发送的消息

    private static void sendMessageSync() throws Exception {

        ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "sync");

        RecordMetadata result = producer.send(record).get();

        System.out.println(result.topic());//imooc-kafka-study

        System.out.println(result.partition());//分区为0

        System.out.println(result.offset());//已发送一条消息,此时偏移量+1

        producer.close();

    }

    /**

     * 创建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181

     * --replication-factor 1 --partitions 3 --topic kafka-study-x

     * 创建消费者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092

     * --topic kafka-study-x --from-beginning

     */

    private static void sendMessageCallback() {

        ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study-x", "name", "callback");

        producer.send(record, new MyProducerCallback());

        //发送多条消息

        record = new ProducerRecord<>("kafka-study-x", "name-x", "callback");

        producer.send(record, new MyProducerCallback());

        producer.close();

    }

    //发送异步消息

    //场景:每条消息发送有延迟,多条消息发送,无需同步等待,可以执行其他操作,程序会自动异步调用

    private static class MyProducerCallback implements Callback {

        @Override

        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

            if (e != null) {

                e.printStackTrace();

                return;

            }

            System.out.println("*** MyProducerCallback ***");

            System.out.println(recordMetadata.topic());

            System.out.println(recordMetadata.partition());

            System.out.println(recordMetadata.offset());

        }

    }

    public static void main(String[] args) throws Exception {

        //sendMessageForgetResult();

        //sendMessageSync();

        sendMessageCallback();

    }

}

自定义分区分配器:决定消息存放在哪个分区.。默认分配器使用轮询存放,轮到已满分区将会写入失败。

public class CustomPartitioner implements Partitioner {

    @Override

    public int partition(String topic, Object key, byte[] keyBytes,

                         Object value, byte[] valueBytes, Cluster cluster) {

        //获取topic所有分区

        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);

        int numPartitions = partitionInfos.size();

        //消息必须有key

        if (null == keyBytes || !(key instanceof String)) {

            throw new InvalidRecordException("kafka message must have key");

        }

        //如果只有一个分区,即0号分区

        if (numPartitions == 1) {return 0;}

        //如果key为name,发送至最后一个分区

        if (key.equals("name")) {return numPartitions - 1;}

        return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);

    }

    @Override

    public void close() {}

    @Override

    public void configure(Map<String, ?> map) {}

}

 

Kafka消费者(组)

* 自动提交位移 * 手动同步提交当前位移 * 手动异步提交当前位移 * 手动异步提交当前位移带回调 * 混合同步与异步提交位移

public class MyConsumer {

    private static KafkaConsumer<String, String> consumer;

    private static Properties properties;

    //初始化

    static {

        properties = new Properties();

        //建立连接broker的地址

        properties.put("bootstrap.servers", "127.0.0.1:9092");

        //kafka反序列化

        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //指定消费者组

        properties.put("group.id", "KafkaStudy");

    }

 

    //自动提交位移:由consume自动管理提交

    private static void generalConsumeMessageAutoCommit() {

        //配置

        properties.put("enable.auto.commit", true);

        consumer = new KafkaConsumer<>(properties);

        //指定topic

        consumer.subscribe(Collections.singleton("kafka-study-x"));

        try {

            while (true) {

                boolean flag = true;

                //拉取信息,超时时间100ms

                ConsumerRecords<String, String> records = consumer.poll(100);

                //遍历打印消息

                for (ConsumerRecord<String, String> record : records) {

                    System.out.println(String.format(

                            "topic = %s, partition = %s, key = %s, value = %s",

                            record.topic(), record.partition(), record.key(), record.value()

                    ));

                    //消息发送完成

                    if (record.value().equals("done")) { flag = false; }

                }

                if (!flag) { break; }

            }

        } finally {

            consumer.close();

        }

    }

 

    //手动同步提交当前位移,根据需求提交,但容易发送阻塞,提交失败会进行重试直到抛出异常

    private static void generalConsumeMessageSyncCommit() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        while (true) {

            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(String.format(

                        "topic = %s, partition = %s, key = %s, value = %s",

                        record.topic(), record.partition(), record.key(), record.value()

                ));

                if (record.value().equals("done")) { flag = false; }

            }

            try {

                //手动同步提交

                consumer.commitSync();

            } catch (CommitFailedException ex) {

                System.out.println("commit failed error: " + ex.getMessage());

            }

            if (!flag) { break; }

        }

    }

 

    //手动异步提交当前位移,提交速度快,但失败不会记录

    private static void generalConsumeMessageAsyncCommit() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        while (true) {

            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(String.format(

                        "topic = %s, partition = %s, key = %s, value = %s",

                        record.topic(), record.partition(), record.key(), record.value()

                ));

                if (record.value().equals("done")) { flag = false; }

            }

            //手动异步提交

            consumer.commitAsync();

            if (!flag) { break; }

        }

    }

 

    //手动异步提交当前位移带回调

    private static void generalConsumeMessageAsyncCommitWithCallback() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        while (true) {

            boolean flag = true;

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.println(String.format(

                        "topic = %s, partition = %s, key = %s, value = %s",

                        record.topic(), record.partition(), record.key(), record.value()

                ));

                if (record.value().equals("done")) { flag = false; }

            }

            //使用java8函数式编程

            consumer.commitAsync((map, e) -> {

                if (e != null) {

                    System.out.println("commit failed for offsets: " + e.getMessage());

                }

            });

            if (!flag) { break; }

        }

    }

 

    //混合同步与异步提交位移

    @SuppressWarnings("all")

    private static void mixSyncAndAsyncCommit() {

        properties.put("auto.commit.offset", false);

        consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("kafka-study-x"));

        try {

            while (true) {

                //boolean flag = true;

                ConsumerRecords<String, String> records = consumer.poll(100);

                for (ConsumerRecord<String, String> record : records) {

                    System.out.println(String.format(

                            "topic = %s, partition = %s, key = %s, " + "value = %s",

                            record.topic(), record.partition(),

                            record.key(), record.value()

                    ));

                    //if (record.value().equals("done")) { flag = false; }

                }

                //手动异步提交,保证性能

                consumer.commitAsync();

                //if (!flag) { break; }

            }

        } catch (Exception ex) {

            System.out.println("commit async error: " + ex.getMessage());

        } finally {

            try {

                //异步提交失败,再尝试手动同步提交

                consumer.commitSync();

            } finally {

                consumer.close();

            }

        }

    }

 

    public static void main(String[] args) {

        //自动提交位移

        generalConsumeMessageAutoCommit();

        //手动同步提交当前位移

        //generalConsumeMessageSyncCommit();

        //手动异步提交当前位移

        //generalConsumeMessageAsyncCommit();

        //手动异步提交当前位移带回调

        //generalConsumeMessageAsyncCommitWithCallback()

        //混合同步与异步提交位移

        //mixSyncAndAsyncCommit();

    }

}

 

 

 类似资料: