当前位置: 首页 > 知识库问答 >
问题:

FlinkSQL客户端连接到安全的kafka集群

慕容雅珺
2023-03-14

我想在由安全kafka集群的kafka主题支持的Flink SQL表上执行一个查询。我能够以编程方式执行查询,但无法通过Flink SQL客户端执行。我不知道如何通过Flink SQL客户端传递JAAS配置(java.security.auth.login.config)和其他系统属性。

FlinkSQL以编程方式查询

 private static void simpleExec_auth() {

        // Create the execution environment.
        final EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .withBuiltInCatalogName(
                        "default_catalog")
                .withBuiltInDatabaseName(
                        "default_database")
                .build();

        System.setProperty("java.security.auth.login.config","client_jaas.conf");
        System.setProperty("sun.security.jgss.native", "true");
        System.setProperty("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
        System.setProperty("javax.security.auth.useSubjectCredsOnly","false");

        TableEnvironment tableEnvironment = TableEnvironment.create(settings);
        String createQuery = "CREATE TABLE  test_flink11 ( " + "`keyid` STRING, " + "`id` STRING, "
                + "`name` STRING, " + "`age` INT, " + "`color` STRING, " + "`rowtime` TIMESTAMP(3) METADATA FROM 'timestamp', " + "`proctime` AS PROCTIME(), " + "`address` STRING) " + "WITH ( "
                + "'connector' = 'kafka', "
                + "'topic' = 'test_flink10', "
                + "'scan.startup.mode' = 'latest-offset', "
                + "'properties.bootstrap.servers' = 'kafka01.nyc.com:9092', "
                + "'value.format' = 'avro-confluent', "
                + "'key.format' = 'avro-confluent', "
                + "'key.fields' = 'keyid', "
                + "'value.fields-include' = 'EXCEPT_KEY', "
                + "'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.kerberos.kinit.cmd' = '/usr/local/bin/skinit --quiet', 'properties.sasl.mechanism' = 'GSSAPI', "
                + "'key.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', "
                + "'key.avro-confluent.schema-registry.subject' = 'test_flink6', "
                + "'value.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', "
                + "'value.avro-confluent.schema-registry.subject' = 'test_flink4')";
        System.out.println(createQuery);
        tableEnvironment.executeSql(createQuery);
        TableResult result = tableEnvironment
                .executeSql("SELECT name,rowtime FROM test_flink11");
        result.print();
    }

这很好。

通过SQL客户端Flink SQL查询

运行此命令将导致以下错误。

Flink SQL> CREATE TABLE test_flink11 (`keyid` STRING,`id` STRING,`name` STRING,`address` STRING,`age` INT,`color` STRING) WITH('connector' = 'kafka', 'topic' = 'test_flink10','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'kafka01.nyc.com:9092','value.format' = 'avro-confluent','key.format' = 'avro-confluent','key.fields' = 'keyid', 'value.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', 'value.avro-confluent.schema-registry.subject' = 'test_flink4', 'value.fields-include' = 'EXCEPT_KEY', 'key.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', 'key.avro-confluent.schema-registry.subject' = 'test_flink6', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.kerberos.kinit.cmd' = '/usr/local/bin/skinit --quiet', 'properties.sasl.mechanism' = 'GSSAPI');

Flink SQL> select * from test_flink11;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-6309821891889949793.conf

/tmp/jaas-6309821891889949793.conf 中没有任何内容,除了以下注释

# We are using this file as an workaround for the Kafka and ZK SASL implementation
# since they explicitly look for java.security.auth.login.config property
# Please do not edit/delete this file - See FLINK-3929

SQL客户端运行命令

bin/sql-client.sh embedded --jar  flink-sql-connector-kafka_2.11-1.12.0.jar  --jar flink-sql-avro-confluent-registry-1.12.0.jar

Flink集群命令

bin/start-cluster.sh

如何为SQL客户端传递此java.security.auth.login.config和其他系统属性(我在上面的java代码片段中设置的)?

共有1个答案

吕晟睿
2023-03-14

flink-conf.yaml

security.kerberos.login.use-ticket-cache: true
security.kerberos.login.principal: XXXXX@HADOOP.COM
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /path/to/kafka.keytab
security.kerberos.login.principal: XXXX@HADOOP.COM
security.kerberos.login.contexts: Client,KafkaClient

我还没有真正测试这个解决方案是否可行,你可以尝试一下,希望它能帮助你。

 类似资料:
  • 执行kafka客户端的生产者/消费者连接池有意义吗? kafka是否在内部维护已初始化并准备好使用的连接对象列表? 我们希望最小化连接创建的时间,这样在发送/接收消息时就不会有额外的开销。 目前,我们正在使用apache共享池库来保持连接。 任何帮助都将不胜感激。

  • 我正在尝试使用Apache Camel和Qpid JMS客户端连接到在两个不同节点(VM)中运行的ActiveMQ Artemis主动-主动集群。我正在使用ActiveMQ Artemis 2.17.0。 我正在试图找出我的组织的远程URI配置应该是什么。阿帕奇。qpid。jms。JmsConnectionFactory实例。使用<代码>ampq://host1:5672,ampq://host2

  • 我已经使用KOPS安装了kubernetes集群。 从kops安装kubectl的节点开始,kubectl全部工作完美(假设节点A)。 我正在尝试从另一个安装了kubectl的服务器(节点B)连接到kubernetes集群。我已经将~/.kube从A节点复制到B节点,但当我尝试执行以下基本命令时: 我的配置文件是: 感谢任何帮助

  • 我试图通过Psycopg2连接到AWS postgresql RDS。当我将安全组的入站规则设置为通过端口5432上的postgresql接受所有通信量时,我就可以连接了。 但是,当我将此端口上的postgresql入站规则限制为只接受来自客户端IP的通信量时,我无法连接并超时。我得到了错误: psycopg2.operationalerror:无法连接到服务器:连接超时服务器是否运行在主机“[h

  • 我正在java应用程序中使用Jetty 9.4.18库编写WebSocket客户端。 我对WebSockets非常陌生,所以我开始使用Jetty文档中的两个示例类进行测试,连接到echo。websocket。组织机构 当我在没有SSL的情况下连接时,测试运行正常,但是如果连接到时失败 我总是遇到同样的异常: 看起来服务器在没有响应握手请求的情况下关闭。 我知道SslContextFactory,但

  • 我想找出将消息从Kafka路由到连接到负载平衡应用服务器集群的web套接字客户端的最佳方法。我知道spring-kafka有助于消费和发布消息到kafka主题,但是当连接到分布式kafka主题时,这在负载平衡的应用服务器场景中如何工作。以下是我希望满足的需求,其总体目标是在具有非常非常大的用户量的应用程序中促进对等消息传递: null 我认为,应用服务器负载平衡可以通过将具有特定路由密钥的用户(用