当前位置: 首页 > 知识库问答 >
问题:

如何在java中使用sasl机制和安全协议sasl_SSL配置kafka consumer?

姜景焕
2023-03-14

我想创建kafka consumer,它使用安全协议SASL_SSL和SASL merchanism PLAIN。有人能帮我配置这些详细信息吗?

我已经阅读了很多关于如何配置SASL细节的文档,但仍然没有弄清楚如何配置SASL。这里我附加了我用来创建Kafka消费者的代码

Properties props = new Properties();
props.put("bootstrap.servers", "servers");
String consumeGroup = "consumer_group";
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\"");
props.put("group.id", consumeGroup);
props.put("client.id", "client_id");
props.put("security.protocol", "SASL_SSL");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "101");
props.put("max.partition.fetch.bytes", "135");
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",      "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);

堆栈跟踪

    14:56:12.767 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Starting the Kafka consumer
    14:56:12.776 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-2, kafka-events-nonprod-ds1.i, 9092), Node(-3, kafka-events-nonprod-ds1-3.io, 9092), Node(-1, kafka-events-nonprod-ds1-1.io, 9092)], partitions = [])
    14:56:12.789 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-client_id
    14:56:12.845 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-client_id
    14:56:12.846 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-client_id
    14:56:12.846 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-client_id
    14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-client_id
    14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-client_id
    14:56:12.847 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-client_id
    14:56:12.861 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
    14:56:12.862 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name join-latency
    14:56:12.862 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name sync-latency
    14:56:12.865 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name commit-latency
    14:56:12.873 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
    14:56:12.874 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-fetched
    14:56:12.879 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
    14:56:12.881 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-lag
    14:56:12.882 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
    14:56:12.883 [main] WARN  o.a.k.c.consumer.ConsumerConfig - The configuration sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password" was supplied but isn't a known config.
    14:56:12.885 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
    14:56:12.885 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
    14:56:12.886 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Kafka consumer created
    14:56:12.887 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Subscribed to topic(s): topic_name
    14:56:12.887 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker -2
    14:56:12.918 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -2 at kafka-events-nonprod-ds1.i:9092.
    14:56:13.336 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
    14:56:13.336 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
    14:56:13.337 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
    14:56:13.339 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -2
    14:56:13.343 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=client_id}, body={topics=[topic_name]}), isInitiatedByNetworkClient, createdTimeMs=1568193973342, sendTimeMs=0) to node -2
    14:56:13.986 [main] DEBUG o.a.kafka.common.network.Selector - Connection with kafka-events-nonprod-ds1-2.octanner.i/10.84.20.85 disconnected
    java.io.EOFException: null
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286) ~[kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857) [kafka-clients-0.9.0.0.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) [kafka-clients-0.9.0.0.jar:na]
        at kafka.Consumer.processRecords(Consumer.java:54) [classes/:na]
        at kafka.Consumer.execute(Consumer.java:22) [classes/:na]
        at kafka.Consumer.main(Consumer.java:15) [classes/:na]

反序列化函数:

private static void processRecords(KafkaConsumer<String, Object> consumer) throws InterruptedException {
    while (true) {
        ConsumerRecords<String, Object> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
        long lastOffset = 0;
        for (ConsumerRecord<String, Object> record : records) {
            System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
            lastOffset = record.offset();
        }
        System.out.println("lastOffset read: " + lastOffset);
        process();
    }
}

共有1个答案

岳安福
2023-03-14

在Kafka 0.10中添加了对平原机制的支持。Kafka 0.9,您正在使用的版本,只支持GSSAPI机制。

切换到最新版本后,您只需设置至少以下配置

Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, <BROKERS>);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");

请注意,saslconfig。Kafka 0.10.2中添加了SASL_JAAS_CONFIG支持。在此之前,您需要使用JAAS文件。有关详细信息,请参阅Kafka“未在JAAS配置中指定登录模块”。

如果可能的话,我建议你开始使用最新的Kafka版本。

 类似资料:
  • 我是阿帕奇Kafka的新手,以下是我迄今为止所做的, > 下载kafka_2.12-2.1.0 制作批处理文件供Zookeeper运行zookeeper服务器: 为阿帕奇Kafka服务器制作批处理文件 < code >启动Kafka _ 2.12-2 . 1 . 0 \ bin \ windows \ Kafka-server-start . bat Kafka _ 2.12-2 . 1 . 0

  • 我试图在Spring增加网络安全性,但我不希望过滤器适用于某些事情。java是如何实现的? 也许有更好的方法来实现这一点,因为我创建了一个自定义过滤器,但这是我唯一能想到的实例化它的方法,因为它的依赖性。 总的来说,我想做的是: 不应该通过过滤器,(POST)不应该通过过滤器,其他一切都应该通过过滤器 通过各种例子,我发现整个Spring,我能够想出这个作为一个开始,但它显然不起作用:

  • 我试图通过https://docs.confluent.io/platform/current/security/security_tutorial.htmlSSL密钥和用户名/密码来设置集群,就像描述的那样。 但是未能找到一种合适的方法来设置密钥的dname和代理的参数“super.users” 它被告知创建一个密钥: 稍后配置代理服务器。设置超级用户所需的属性: 因为本教程将代理间安全协议配置

  • 我正在尝试使用SASL SSL安全协议(SCRAM-SHA-512)将Mule应用程序与Kafka集成。以下来自Mulesoft的知识文章说(只有版本3.0.6、3.0.7、3.0.8、3.0.9和3.0.10的Kafka连接器支持带有GSSAPI机制的SASL_SSL安全协议。当前的版本4.0.0不支持SASL_SSL,因为存在已知问题SE-15680。)。这是那篇文章的链接 https://h

  • 如何定制协议 实际上制定自己的协议是比较简单的事情。简单的协议一般包含两部分: 区分数据边界的标识 数据格式定义 一个例子 协议定义 这里假设区分数据边界的标识为换行符”\n”(注意请求数据本身内部不能包含换行符),数据格式为Json,例如下面是一个符合这个规则的请求包。 {"type":"message","content":"hello"}   注意上面的请求数据末尾有一个换行字符(在PHP中

  • 我有一个Spark应用程序,我正试图将其打包为fat jar并使用部署到本地集群。我正在使用Typesafe config为各种部署环境-、和创建配置文件,并尝试提交我的JAR。 我正在运行的命令如下: 我通过一个接一个地添加选项,以增量方式构建了该命令。使用,日志提示文件正在上载到Spark,但当我添加时,提交失败,找不到文件。 代码: