当前位置: 首页 > 知识库问答 >
问题:

Kafka自定义AuthenticateCallbackHandler

司空海荣
2023-03-14

我试图实现一个准备在Kafka 2.0.0中发布的AuthenticateCallbackHandler,但没有成功-这是一个应该工作的设置吗?

在https://cwiki.apache.org/confluence/display/KAFKA/KIP-86:可配置的SASL回调处理器上我读到:

使用外部身份验证服务器进行SASL/PLAIN身份验证,使用Kafka中包含的PLAIN的SaslServer实现

定义一个实现身份验证回调处理程序的新类,该处理程序处理名称回调和普通身份验证回调,并将该类添加到代理的 sasl.server.callback.处理程序.class属性。将为代理创建此回调处理程序的单个实例。配置的回调处理程序负责验证客户端提供的密码,这可能使用外部身份验证服务器。

所以基本上我做的是创建一个类:

package com.example;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;

public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {
    private List<AppConfigurationEntry> jaasConfigEntries;

    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        this.jaasConfigEntries = jaasConfigEntries;
    }

    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }

    protected boolean authenticate(String username, char[] password) throws IOException {
        return username != null && username.equals("test") && new String(password).equals("test");
    }

    public void close() throws KafkaException {
    }
}

构建一个jar,并使其可用于Kafka,如以下docker-compose.yml所示:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0-beta1-1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SASL_ENABLED: "false"

  kafka:
    image: confluentinc/cp-kafka:5.0.0-beta1-1
    depends_on:
      - zookeeper
    volumes:
      - ./security:/etc/kafka/secrets
      - ./jars:/etc/kafka/jars
    ports:
      - "9092:9092"
    environment:
      CLASSPATH: /etc/kafka/jars/*
      ZOOKEEPER_SASL_ENABLED: "false"
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
      KAFKA_SUPER_USERS: User:admin
      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
      KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: com.example.CustomAuthenticateCallbackHandler

broker_jaas:

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  ;
};

cli-client.properties:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="test" \
  password="test";

然后我使用以下命令对其进行测试:

 > kafka-console-producer --broker-list kafka:9092 --topic test-topic --producer.config /etc/kafka/secrets/cli-client.properties
 This is a message
 This is another message

但是,我收到一个错误,生产者无法验证:

[2018-05-17 19:49:06,955]ERROR[Producer clientId=consolt-制作者]连接到节点-1的身份验证失败,原因是:身份验证失败:无效的用户名或密码(org.apache.kafka.clients.NetworkClient)

共有1个答案

左丘阳晖
2023-03-14

我从Kafka的邮件列表中得到了以下答案,下周将会尝试:

这项功能将是Kafka 2.0.0版本一部分。

公关博士来了:https://github.com/apache/kafka/pull/4890

此处配置:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java#L57

 类似资料:
  • 我正在学习kafka connect的教程,我想知道是否有可能接收一些类的类型的消息。 教程:https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1// 基于avro格式,我用Maven生成了一个类。 然后我用我的类型定义了消费者工厂: 和KafkaList

  • 我使用方法处理消息。 我已经找到了这个任务,它仍然是打开的。https://issues.apache.org/jira/browse/kafka-5632?src=confmacro

  • 目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子: 我创建了一个接口IWebsocketClientEndpoint 以及实现上述接口的类: WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。 目标是:如何在Kafka连接结构中调整我的websocket结构?我

  • 我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。

  • 我创建了一个自定义的Log4J2Kafka附加器,因为我需要以协议缓冲区格式发送消息。当我运行应用程序时,我看到以下警告。如何使自定义追加器覆盖默认追加器? 插件[kafka]已经映射到类org.apache.logging.log4j.core.appender.mom.kafka.kafkaappender,忽略了类com.abc.appender.kafkaappender 注意:我阅读了h

  • 在spring-kafka中,如何从包中添加类作为自定义头字段来信任? 消息是这样发送的: 接收端如下所示: 我不断得到的例外是: