Docker部署kafka并使用Java客户端连接

裴劲
2023-12-01

docker部署kafka(单机)

注意:ip 换成你宿主机的真实ip !!!

环境搭建

第一步 搭建zookeeper环境
docker pull zookeeper
docker run -d --name zookeeper -p 2181:2181 -t zookeeper
第二步 创建kafka环境 (ip换成你宿主机的真实ip)
docker pull wurstmeister/kafka 
docker run  -d --name kafka -p 9092:9092  --env KAFKA_ADVERTISED_HOST_NAME=localhost  -e KAFKA_ZOOKEEPER_CONNECT=ip:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -e KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"  wurstmeister/kafka
第三步 验证kafka是否正确安装
docker exec -it kafka bash 
cd /opt/kafka_*/bin
# 创建topic
./kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 3 --topic test 
第四步 创建生产者去生产消息
./kafka-console-producer.sh --broker-list ip:9092 --topic test
第五步 创建消费者去消费消息
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
第六步 搭建kafka管理平台
docker pull sheepkiller/kafka-manager
docker run -it -d --rm  -p 9000:9000 -e ZK_HOSTS="ip:2181"  sheepkiller/kafka-manager

​ 创建成后,在浏览器中访问http://ip:9000

Java客户端远程连接

配置pom.xml文件
	<!-- 添加kafka依赖包(根据自己使用的版本选择版本号)-->
	<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>
创建生产者
package com.frank.exercise.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {
    private final static String IP = "your ip"
        
    public static void main(String[] args){
        Properties prop = new Properties();
        //kafka连接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,IP+":9092");
        //用于实现Serializer接口的密钥的串行器类。
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //生产者数据安全
        prop.put(ProducerConfig.ACKS_CONFIG,"-1");

        //创建生产者对象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(prop);
        //生成10条数据
        for (int i = 0; i <10; i++) {
            //创建消息对象
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello world" + i);
            //调用生产者消息发送方法
            producer.send(producerRecord);
            try {
                //每条消息间隔100毫秒
                Thread.sleep(100);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
        }
        System.out.println("game over!!!");

    }
}
在服务器端查看Topic分区内数据条数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ip:9092 --topic test -time -1 --offsets 1
在服务器端消费数据
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
创建消费者
package com.frank.exercise.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class CustomConsumer {
    private final static String IP = "your ip"

    public static void main(String[] args) {
        Properties prop = new Properties();
        //kafka连接地址       
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IP + ":9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息      
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        //false:非自动提交偏移量     \
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //自动提交偏移量周期 
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //earliest:拉取最早的数据        //latest:拉取最新的数据        //none:报错 
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //一个消费者组G1里只有一个消费者    
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
        //创建kafka消费者对象    
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        //设置自动分配topic与消费者对象      
        consumer.subscribe(Collections.singleton("test"));
        while (true) {
            //消费数据,一次10条    
            ConsumerRecords<String, String> poll = consumer.poll(10);
            //遍历输出       
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record.offset() + "\t" + record.key() + "\t" + record.value());
            }
        }
    }
}

参考文章

  • https://blog.csdn.net/weixin_38468167/article/details/110823356
  • https://zhuanlan.zhihu.com/p/354217865
 类似资料: