本文基于kafka3.3.1使用kraft作为仲裁,进行测试,kafka with zookeeper使用也是一样的。
1.创建kafka_server_jaas.conf文件,写入如下内容
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};
username: 定义了一个公共的用户名密码,用于节点间通信
user_xxx
: 自定义用户,主要用于客户端连接kafka,所以可以使用的用户必须在此定义。等号后面是密码,xxx
是用户名。格式不能错。
注意倒数第一行和倒数第二行都有一个分号
2.修改kafka-server-start.sh
启动脚本的最后一行,将上述文件作为参数传递个kafka:exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"
。注意修改文件路径。
3.修改 kafka 配置文件下的server.properties
# 这两行主要是修改协议
listeners=SASL_PLAINTEXT://0.0.0.0:9193
advertised.listeners=SASL_PLAINTEXT://12.120.20.3:9092
# inter.broker.listener.name=PLAINTEXT # 如果这一行默认是开启的注释掉这一行
# 增加下面四行配置
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
4.重启kafka节点
这一节主要配置控制台生产者和消费者。主要用于测试,如果直接用程序调试这一节可以跳过。
5.消费者新建jaas文件kafka_client_consumer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};
6.生产者新建jaas文件kafka_client_producer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
7.修改生产者客户端脚本kafka-console-producer.sh
,指定配置文件.注意文件路径
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/users/yulei/kafka_2.12-3.3.1/config/kraft/kafka_client_producer_jaas.conf kafka.tools.ConsoleProducer "$@"
8.修改消费者客户端脚本kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/users/yulei/kafka_2.12-3.3.1/config/kraft/kafka_client_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"
9.创建生产者配置文件producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
10.创建消费者配置文件consumer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
11.控制台启动消费者,生产者测试
bin/kafka-console-producer.sh --topic aa --bootstrap-server localhost:9092 --producer.config config/kraft/producer.propertis
bin/kafka-console-consumer.sh --topic aa --from-beginning --bootstrap-server localhost:8082 --consumer.config config/kraft/consumer.properties
SASL_PLAINTEXT
认证的kafkaspringboot集成kafka只需要修改配置文件即可,业务代码不需要修改
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消费者主要添加如下四行,注意用户名密码与第一步中的kafka_server_jaas.conf相同
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="cons-sec";
producer:
# 生产者主要添加如下四行,注意用户名密码与第一步中的kafka_server_jaas.conf相同
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="prod-sec";
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
flink集成带SASL_PLAINTEXT认证的kafka同样只需要下面三个配置,业务代码不需要修改。
/**
* 构建kafka数据源,结果格式为ImageResult
*
* @param address 集群地址
* @param topics topic
* @return 图片结果数据源
*/
private static KafkaSource<ImageResult> getImageResultSource(String address, String... topics) {
final Properties props = new Properties();
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"consumer\" password=\"cons-sec\";");
return KafkaSource.<ImageResult>builder()
.setBootstrapServers(address)
.setTopics(topics)
.setGroupId("results-group")
.setProperties(props)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleImageResultSchema())
.build();
}