当前位置: 首页 > 工具软件 > Kraft > 使用案例 >

[Kafka with kraft] SASL_PLAINTEXT认证

梁丘经艺
2023-12-01

本文基于kafka3.3.1使用kraft作为仲裁,进行测试,kafka with zookeeper使用也是一样的。

修改服务器配置

1.创建kafka_server_jaas.conf文件,写入如下内容

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-sec"
    user_admin="admin-sec"
    user_producer="prod-sec"
    user_consumer="cons-sec";
};
  • username: 定义了一个公共的用户名密码,用于节点间通信

  • user_xxx: 自定义用户,主要用于客户端连接kafka,所以可以使用的用户必须在此定义。等号后面是密码,xxx是用户名。格式不能错。

  • 注意倒数第一行和倒数第二行都有一个分号

2.修改kafka-server-start.sh启动脚本的最后一行,将上述文件作为参数传递个kafka:exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"。注意修改文件路径。

3.修改 kafka 配置文件下的server.properties

# 这两行主要是修改协议
listeners=SASL_PLAINTEXT://0.0.0.0:9193 
advertised.listeners=SASL_PLAINTEXT://12.120.20.3:9092

# inter.broker.listener.name=PLAINTEXT # 如果这一行默认是开启的注释掉这一行

# 增加下面四行配置
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true

4.重启kafka节点

修改控制台生产者消费者配置

这一节主要配置控制台生产者和消费者。主要用于测试,如果直接用程序调试这一节可以跳过。

5.消费者新建jaas文件kafka_client_consumer_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="consumer"
    password="cons-sec";
};

6.生产者新建jaas文件kafka_client_producer_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="producer"
    password="prod-sec";
};

7.修改生产者客户端脚本kafka-console-producer.sh,指定配置文件.注意文件路径

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/users/yulei/kafka_2.12-3.3.1/config/kraft/kafka_client_producer_jaas.conf kafka.tools.ConsoleProducer "$@"

8.修改消费者客户端脚本kafka-console-consumer.sh

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data/users/yulei/kafka_2.12-3.3.1/config/kraft/kafka_client_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"

9.创建生产者配置文件producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

10.创建消费者配置文件consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

11.控制台启动消费者,生产者测试

bin/kafka-console-producer.sh --topic aa --bootstrap-server localhost:9092 --producer.config config/kraft/producer.propertis
bin/kafka-console-consumer.sh --topic aa --from-beginning --bootstrap-server localhost:8082 --consumer.config config/kraft/consumer.properties

Springboot集成带SASL_PLAINTEXT认证的kafka

springboot集成kafka只需要修改配置文件即可,业务代码不需要修改

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 消费者主要添加如下四行,注意用户名密码与第一步中的kafka_server_jaas.conf相同
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="cons-sec";
    producer:
    # 生产者主要添加如下四行,注意用户名密码与第一步中的kafka_server_jaas.conf相同
      properties:
        sasl.mechanism: PLAIN
        security.protocol: SASL_PLAINTEXT
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="prod-sec";
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Flink集成SASL_PLAINTEXT认证的kafka

flink集成带SASL_PLAINTEXT认证的kafka同样只需要下面三个配置,业务代码不需要修改。

/**
     * 构建kafka数据源,结果格式为ImageResult
     *
     * @param address 集群地址
     * @param topics  topic
     * @return 图片结果数据源
     */
    private static KafkaSource<ImageResult> getImageResultSource(String address, String... topics) {
        final Properties props = new Properties();
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "PLAIN");
        props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"consumer\" password=\"cons-sec\";");

        return KafkaSource.<ImageResult>builder()
                .setBootstrapServers(address)
                .setTopics(topics)
                .setGroupId("results-group")
                .setProperties(props)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleImageResultSchema())
                .build();
    }

 类似资料: