当前位置: 首页 > 知识库问答 >
问题:

无法在启用SSL的Kafka集群中注册Debezium(Kafka-Connect)连接器

呼延庆
2023-03-14

我正在尝试在启用SSL的Kafka集群中注册MySql Debezium连接器。我为此目的使用的卷曲是:

curl -k -X POST -H "Accept:application/json"  -H "Content-Type:application/json" https://<IPADDRESS>:8083/connectors/  -d '{ "name": "test-eds-extactor-profile", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<DBHOSTNAME>", "database.port": "3306", "database.user": "debezium", "database.password": "*****", "database.server.id": "1", "database.server.name": "MySQL-Database-Docker", "database.history.kafka.bootstrap.servers": "<IPADDRESS>:9094", "database.history.kafka.topic": "dbhistory.profile" , "include.schema.changes": "true", "table.whitelist": "test_eds_extraction_src_db_mock.profile", "database.history.producer.security.protocol": "SASL_PLAINTEXT", "database.history.producer.ssl.keystore.location": "path/to/server.jks", "database.history.producer.ssl.keystore.password": "******", "database.history.producer.ssl.truststore.location": "path/to//server.jks", "database.history.producer.ssl.truststore.password": "******", "database.history.producer.ssl.key.password": "******", "database.history.consumer.security.protocol": "SASL_PLAINTEXT", "database.history.consumer.ssl.keystore.location": "path/to/server.jks", "database.history.consumer.ssl.keystore.password": "******", "database.history.consumer.ssl.truststore.location": "path/to/server.jks", "database.history.consumer.ssl.truststore.password": "******", "database.history.consumer.ssl.key.password": "******" } }'

Debezium无法创建数据库。历史记录主题,它将失败,并出现以下错误:

{"name":"test-eds-extactor-profile","connector":{"state":"RUNNING","worker_id":"<IPADDRESS>:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"<IPADDRESS>:8083","trace":"org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)\n\tat io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)\n\tat io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)\n\tat io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)\n\t... 9 more\nCaused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)\n\tat org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)\n\tat org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)\n\tat org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)\n\tat org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)\n\tat org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)\n\t... 15 more\nCaused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)\n\tat org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)\n\tat org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)\n\t... 20 more\n"}],"type":"source"}

美化错误:

Failed to construct kafka producer
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:297)
    at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:171)
    at io.debezium.connector.mysql.MySqlSchema.start(MySqlSchema.java:161)
    at io.debezium.connector.mysql.MySqlTaskContext.start(MySqlTaskContext.java:255)
    at io.debezium.connector.mysql.MySqlConnectorTask.createAndStartTaskContext(MySqlConnectorTask.java:330)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:136)
    ... 9 more
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:140)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:108)
    at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:437)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:418)
    ... 15 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config
    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)\n\tat org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:142)
    ... 20 more

共有2个答案

施阳曜
2023-03-14

此错误表示jaas配置对kafka客户端不可见。要解决此问题,可以导出以下变量:

export KAFKA_OPTS="-Djava.security.auth.login.config=path/to/jaas.conf"
方风华
2023-03-14

注册连接器时,需要在 JSON 正文中添加 SSL 属性:

database.history.producer.security.protocol=SSL
database.history.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.producer.ssl.keystore.password=test1234
database.history.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.producer.ssl.truststore.password=test1234
database.history.producer.ssl.key.password=test1234
database.history.consumer.security.protocol=SSL
database.history.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
database.history.consumer.ssl.keystore.password=test1234
database.history.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
database.history.consumer.ssl.truststore.password=test1234
database.history.consumer.ssl.key.password=test1234

参考 : https://debezium.io/docs/connectors/mysql/(滚动到结尾)

 类似资料:
  • 问题内容: 题: 如何在Kafka Connect中正确注册SqlServer连接器以连接到独立的SQL Server实例?注意:我不在Docker中运行SQL Server。 错误: 错误:由以下原因引起:com.microsoft.sqlserver.jdbc.SQLServerException:到主机127.0.0.1的TCP / IP连接,端口1433失败。错误:“连接被拒绝(连接被拒绝

  • 问题: 如何使用Kafka Connect正确注册SqlServer连接器,以连接到独立的SqlServer实例?注意:我没有在Docker中运行SQL Server。 错误: 错误:由以下原因引起:com.microsoft.sqlserver.jdbc.SQLServer异常:到主机127.0.0.1端口1433的TCP/IP连接失败。错误:"连接拒绝(连接拒绝)。请验证连接属性。请确保SQL

  • 我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息 1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html) 2)将文件解压缩到/home/ec2-user/plugins/

  • 我考虑在同一集群内使用Kafka Connect复制器来丰富事件。 这个想法是让SMT来丰富事件,之后需要将事件发送到Mongo DB 我的问题是:这是一个“有意义”的设计,还是我在这里遗漏了什么? 谢谢。

  • 我有一个现有的2个kafka服务器加载了mysql连接器。它起作用了。此外,我需要添加MongoDB连接器。我已经在我的Kafka服务器(Centos7)上安装了confluent schema registry,它可以工作,我停止/启动/重新启动,看起来没有什么问题。我在这里下载并提取了debezium Mongo插件/usr/连接器/插件/debezium连接器mongodb/ 我编辑了 /e

  • 我正在尝试使用Debezium和kafka。我已经在属性中注册了一个包含3个表的连接器,并且它正常工作。下面是运行连接器的配置。 命令: 配置json: 这将成功执行,但当我列出Kafka主题时,只有最后3个主题出现,2个新的主题没有添加。