规划:
cancer01 1
cancer02 2
cancer03 3
说明1:在cancer01主机上配置好kafka目录后,复制到其他主机再修改下即可。
说明2:每台主机上都要安装zookeeper,配置好zookeeper集群。
解压:
tar -xzvf kafka_2.13-2.7.0.tgz
mv kafka_2.13-2.7.0 /usr/local/kafka
更改所有者
chown -R hadoop:hadoop /usr/local/kafka
配置环境
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
修改配置文件
(broker.id和host.name每台主机不相同)
vim /usr/local/kafka/conf/server.properties
#broker的全局唯一编号,不能重复 broker.id=1 #用来监听链接的端口,producer或consumer将在此端口建立连接 port=9092 #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘IO的线程数量 num.io.threads=8 #topic在当前broker上的分片个数 num.partitions=3 #用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接受套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka消息存放的路径 log.dirs=/usr/local/kafka/logs #partion buffer中,消息的条数达到阈值,将触发flush到磁盘 log.flush.interval.messages=10000 #消息buffer的时间,达到阈值,将触发flush到磁盘 log.flush.interval.ms=3000 #segment文件保留的最长时间,超时将被删除 log.retention.hours=168 #滚动生成新的segment文件的最大时间 log.roll.hours=168 #日志文件中每个segment的大小,默认为1G log.segment.bytes=1073741824 #周期性检查文件大小的时间 log.retention.check.interval.ms=300000 #日志清理是否打开 log.cleaner.enable=true #broker需要使用zookeeper保存meta数据 zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 #zookeeper链接超时时间 zookeeper.connection.timeout.ms=6000 #删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除 delete.topic.enable=true
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出错误! host.name=192.168.31.101 advertised.host.name=192.168.31.101 advertised.listeners=PLAINTEXT://192.168.31.101:9092 default.replication.factor=3 auto.create.topics.enable=true
message.max.byte=5242880 replica.fetch.max.bytes=5242880
|
说明:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用java发送数据时就会抛出异常。因为在没有配置advertised.host.name的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置hosts,所以自然是连接不上这个hostname的。当使用java客户端访问远程的kafka时,一定要把集群中所有的端口打开,否则会连接超时
vim /usr/local/kafka/conf/producer.properties
metadata.broker.list=192.168.31.101:9092,192.168.31.102:9092,192.168.31.103:9092 |
vim /usr/local/kafka/conf/consumer.properties
zookeeper.connect=192.168.31.101:2181,192.168.31.102:2181,192.168.31.103:2181 |
分发到其他主机
scp -r /usr/local/kafka cancer02:/usr/local/
scp -r /usr/local/kafka cancer03:/usr/local/
修改其他主机配置
vim /etc/profile
vim /usr/local/kafka/conf/server.properties
启动zookeeper
zkServer.sh start
zkServer.sh status
启动kafka
kafka-server-start.sh $KAFKA_HOME/config/server.properties 1>/dev/null 2>&1 &
kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
或者
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 1>/dev/null 2>&1 &
kafka-server-start.sh -daemon ../config/server.properties 1>/dev/null 2>&1 &
或者
nohup kafka-server-start.sh ../config/server.properties &>> /usr/local/kafka/kafka.log &
验证
创建topic
kafka-topics.sh --create --zookeeper cancer01:2181 --replication-factor 1 --partitions 1 --topic test
查看topic是否创建成功
kafka-topics.sh --list -zookeeper cancer01:2181
创建一个名为“test”的Topic,只有一个分区和备份
kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --replication-factor 3 --partitions 3 --topic test
命令解析:
--create: 指定创建topic动作
--topic: 指定新建topic的名称
--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--config: 指定当前topic上有效的参数值,参数列表参考文档为: http://kafka.apache.org/082/documentation.html#brokerconfigs
--partitions:指定当前创建的kafka分区数量,默认为1个
--replication-factor:指定每个分区的复制因子个数,默认1个
查看已创建的topic信息
kafka-topics.sh --list --zookeeper localhost:2181
查看topic描述信息
kafka-topics.sh --describe –zookeeper localhost:2181 --topic test
命令解析:
--describe: 指定是展示详细信息命令
--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--topic:指定需要展示数据的topic名称
为topic增加副本
kafka-reassign-partitions.sh -zookeeper cancer01:2181,cancer02:2181,cancer03:2181 -reassignment-json-file json/partitions-to-move.json -execute
为topic增加partition
kafka-topics.sh –zookeeper cancer01:2181,cancer02:2181,cancer03:2181 –alter –partitions 20 –topic test
修改Topic信息
kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --config max.message.bytes=128000
kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --delete-config max.message.bytes
kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --partitions 10
kafka-topics.sh --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --alter --topic test --partitions 3 ## Kafka分区数量只允许增加,不允许减少
删除Topic
默认情况下Kafka的Topic是没法直接删除的,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:
方式一:通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
方式二:配置server.properties文件,给定参数delete.topic.enable=true,重启kafka服务,此时执行delete命令表示允许进行Topic的删除
kafka-topics.sh --delete --topic test --zookeeper cancer01:2181,cancer02:2181,cancer03:2181
kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test --zookeeper cancer01:2181,cancer02:2181,cancer03:2181
你可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic
此时你若想真正删除它,可以如下操作:
(1)登录zookeeper客户端:命令:./bin/zookeeper-client
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,
如果你删除了此处的topic,那么marked for deletion 标记消失
发送消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
kafka-console-producer.sh --broker-list cancer01:9092,cancer02:9092,cancer03:9092 --topic test
消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来。
kafka-console-consumer.sh --bootstrap-server cancer01:9092,cancer02:9092,cancer03:9092 --topic test --from-beginning
(--from-beginning如果去掉不会出现消费者启动之前的消息)
下线broker
kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --broker #brokerId# --num.retries 3 --retry.interval.ms 60
shutdown broker
查看consumer组内消费的offset
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --group test --topic test
创建topic,备份3份
kafka-topics.sh --create --zookeeper cancer01:2181,cancer02:2181,cancer03:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看topic运行情况
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分区的摘要
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
#提供一个分区信息,因为我们只有一个分区,所以只有一行。
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
#“leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
#“replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
#“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader
其中Replicas和Isr中的1,2,0就对应着3个broker他们的broker.id属性
干掉leader,测试集群容错
首先查询谁是leader
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分区的摘要
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
//提供一个分区信息,因为我们只有一个分区,所以只有一行。
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
可以看到Leader的broker.id为1,找到对应的Broker
[root@administrator bin]# jps -m
5130 Kafka ../config/server.properties
4861 QuorumPeerMain ../config/zookeeper.properties
1231 Bootstrap start start
7420 Kafka ../config/server-2.properties
7111 Kafka ../config/server-1.properties
9139 Jps -m
查询到Leader的PID(Kafka ../config/server-1.properties)为7111,杀掉该进程
//杀掉该进程
kill -9 7111
//再查询一下,确认新的Leader已经产生,新的Leader为broker.id=0
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
//备份节点之一成为新的leader,而broker1已经不在同步备份集合里了
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
再次消费消息,确认消息没有丢失
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2
消息依然存在,故障转移成功!!
Producer生产者
发送消息的方式,只管发送,不管结果:只调用接口发送消息到 Kafka 服务器,但不管成功写入与否。由于 Kafka 是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息
同步发送:调用 send() 方法返回一个 Future 对象,我们可以使用它的 get() 方法来判断消息发送成功与否
异步发送:调用 send() 时提供一个回调方法,当接收到 broker 结果后回调此方法
public class MyProducer { private static KafkaProducer<String, String> producer; //初始化 static { Properties properties = new Properties(); //kafka启动,生产者建立连接broker的地址 properties.put("bootstrap.servers", "127.0.0.1:9092"); //kafka序列化方式 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定义分区分配器 properties.put("partitioner.class", "com.imooc.kafka.CustomPartitioner"); producer = new KafkaProducer<>(properties); }
/** * 创建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 * --replication-factor 1 --partitions 1 --topic kafka-study * 创建消费者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 * --topic imooc-kafka-study --from-beginning */ //发送消息,发送完后不做处理 private static void sendMessageForgetResult() { ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "ForgetResult"); producer.send(record); producer.close(); } //发送同步消息,获取发送的消息 private static void sendMessageSync() throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study", "name", "sync"); RecordMetadata result = producer.send(record).get(); System.out.println(result.topic());//imooc-kafka-study System.out.println(result.partition());//分区为0 System.out.println(result.offset());//已发送一条消息,此时偏移量+1 producer.close(); } /** * 创建topic:.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 * --replication-factor 1 --partitions 3 --topic kafka-study-x * 创建消费者:.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 * --topic kafka-study-x --from-beginning */ private static void sendMessageCallback() { ProducerRecord<String, String> record = new ProducerRecord<>("kafka-study-x", "name", "callback"); producer.send(record, new MyProducerCallback()); //发送多条消息 record = new ProducerRecord<>("kafka-study-x", "name-x", "callback"); producer.send(record, new MyProducerCallback()); producer.close(); } //发送异步消息 //场景:每条消息发送有延迟,多条消息发送,无需同步等待,可以执行其他操作,程序会自动异步调用 private static class MyProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); return; } System.out.println("*** MyProducerCallback ***"); System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); } } public static void main(String[] args) throws Exception { //sendMessageForgetResult(); //sendMessageSync(); sendMessageCallback(); } } |
自定义分区分配器:决定消息存放在哪个分区.。默认分配器使用轮询存放,轮到已满分区将会写入失败。
public class CustomPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取topic所有分区 List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int numPartitions = partitionInfos.size(); //消息必须有key if (null == keyBytes || !(key instanceof String)) { throw new InvalidRecordException("kafka message must have key"); } //如果只有一个分区,即0号分区 if (numPartitions == 1) {return 0;} //如果key为name,发送至最后一个分区 if (key.equals("name")) {return numPartitions - 1;} return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1); } @Override public void close() {} @Override public void configure(Map<String, ?> map) {} }
|
Kafka消费者(组)
* 自动提交位移 * 手动同步提交当前位移 * 手动异步提交当前位移 * 手动异步提交当前位移带回调 * 混合同步与异步提交位移
public class MyConsumer { private static KafkaConsumer<String, String> consumer; private static Properties properties; //初始化 static { properties = new Properties(); //建立连接broker的地址 properties.put("bootstrap.servers", "127.0.0.1:9092"); //kafka反序列化 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //指定消费者组 properties.put("group.id", "KafkaStudy"); }
//自动提交位移:由consume自动管理提交 private static void generalConsumeMessageAutoCommit() { //配置 properties.put("enable.auto.commit", true); consumer = new KafkaConsumer<>(properties); //指定topic consumer.subscribe(Collections.singleton("kafka-study-x")); try { while (true) { boolean flag = true; //拉取信息,超时时间100ms ConsumerRecords<String, String> records = consumer.poll(100); //遍历打印消息 for (ConsumerRecord<String, String> record : records) { System.out.println(String.format( "topic = %s, partition = %s, key = %s, value = %s", record.topic(), record.partition(), record.key(), record.value() )); //消息发送完成 if (record.value().equals("done")) { flag = false; } } if (!flag) { break; } } } finally { consumer.close(); } }
//手动同步提交当前位移,根据需求提交,但容易发送阻塞,提交失败会进行重试直到抛出异常 private static void generalConsumeMessageSyncCommit() { properties.put("auto.commit.offset", false); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("kafka-study-x")); while (true) { boolean flag = true; ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format( "topic = %s, partition = %s, key = %s, value = %s", record.topic(), record.partition(), record.key(), record.value() )); if (record.value().equals("done")) { flag = false; } } try { //手动同步提交 consumer.commitSync(); } catch (CommitFailedException ex) { System.out.println("commit failed error: " + ex.getMessage()); } if (!flag) { break; } } }
//手动异步提交当前位移,提交速度快,但失败不会记录 private static void generalConsumeMessageAsyncCommit() { properties.put("auto.commit.offset", false); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("kafka-study-x")); while (true) { boolean flag = true; ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format( "topic = %s, partition = %s, key = %s, value = %s", record.topic(), record.partition(), record.key(), record.value() )); if (record.value().equals("done")) { flag = false; } } //手动异步提交 consumer.commitAsync(); if (!flag) { break; } } }
//手动异步提交当前位移带回调 private static void generalConsumeMessageAsyncCommitWithCallback() { properties.put("auto.commit.offset", false); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("kafka-study-x")); while (true) { boolean flag = true; ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format( "topic = %s, partition = %s, key = %s, value = %s", record.topic(), record.partition(), record.key(), record.value() )); if (record.value().equals("done")) { flag = false; } } //使用java8函数式编程 consumer.commitAsync((map, e) -> { if (e != null) { System.out.println("commit failed for offsets: " + e.getMessage()); } }); if (!flag) { break; } } }
//混合同步与异步提交位移 @SuppressWarnings("all") private static void mixSyncAndAsyncCommit() { properties.put("auto.commit.offset", false); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("kafka-study-x")); try { while (true) { //boolean flag = true; ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format( "topic = %s, partition = %s, key = %s, " + "value = %s", record.topic(), record.partition(), record.key(), record.value() )); //if (record.value().equals("done")) { flag = false; } } //手动异步提交,保证性能 consumer.commitAsync(); //if (!flag) { break; } } } catch (Exception ex) { System.out.println("commit async error: " + ex.getMessage()); } finally { try { //异步提交失败,再尝试手动同步提交 consumer.commitSync(); } finally { consumer.close(); } } }
public static void main(String[] args) { //自动提交位移 generalConsumeMessageAutoCommit(); //手动同步提交当前位移 //generalConsumeMessageSyncCommit(); //手动异步提交当前位移 //generalConsumeMessageAsyncCommit(); //手动异步提交当前位移带回调 //generalConsumeMessageAsyncCommitWithCallback() //混合同步与异步提交位移 //mixSyncAndAsyncCommit(); } } |