当前位置: 首页 > 知识库问答 >
问题:

Kafka连接中的ACL配置不工作

宣弘新
2023-03-14

我为3节点Kafka集群设置了ACL,并能够通过生产者控制台和消费者控制台发送和接收主题。现在我想用ACL配置Kafka连接。我尝试了SASL_PLAINTEXT组合和连接。日志文件,它显示以下错误。它没有从源表同步到主题,请在我缺少任何配置的地方提供帮助。

错误日志

[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to flush, timed out while waiting for producer to flush outstanding 1
messages (org.apache.kafka.connect.runtime.WorkerSourceTask:448)
[2020-10-14 07:24:35,874] ERROR WorkerSourceTask{id=oracle-jdbc-source-mtx_domains_acl5-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCo
mmitter:116)"

我的配置如下文件所示。我在jaas中提到过用户。conf文件并设置到环境中。

1: zookeeper.properties

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
zookeeper.set.acl=true
jaasLoginRenew=3600000

2:服务器属性

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://<server_name>:9092
host.name=server_ip

3:schema-registry.properties

kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.sasl.mechanism=PLAIN
metadataServerUrls=SASL_PLAINTEXT://<server_ip>:9092
zookeeper.set.acl=true
kafkastore.group.id=schema-registry-3

4:connect-avro-distributed.properties

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

5:源连接器脚本

curl -X POST -H "Content-Type: application/json" --data '{    "name":"oracle-jdbc-source-mtx_domains_acl5",    "config":{       "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",       "tasks.max":"1",       "connection.url":"jdbc:oracle:thin:@<ip>:<port>:<dbname>",       "connection.user":"<username>",        "connection.password":"password",     "numeric.mapping":"best_fit",       "table.whitelist":"TABLENAME",       "mode":"timestamp",       "timestamp.column.name":"CREATED_ON",      "topic.prefix":"",       "validate.non.null":"false",       "transforms":"createKey",       "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",       "transforms.createKey.fields":"DOMAIN_CODE", "sasl.mechanism":"PLAIN", "security.protocol":"SASL_PLAINTEXT","producer.sasl.mechanism":"PLAIN", "producer.security.protocol":"SASL_PLAINTEXT","producer.request.timeout.ms":50000,
"producer.retry.backoff.ms":500, "offset.flush.timeout.ms":50000,"producer.buffer.memory":100,
"sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";",
"producer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer\";", "key.converter.schemas.enable":"true",       "value.converter.schemas.enable":"true","delete.enabled":"true","key.converter":"io.confluent.connect.avro.AvroConverter",       "key.converter.schema.registry.url":"http://localhost:8081",       "value.converter":"io.confluent.connect.avro.AvroConverter",       "value.converter.schema.registry.url":"http://localhost:8081"    } }' http://localhost:8083/connectors

共有1个答案

西门鹏程
2023-03-14

您需要将以下属性添加到连接分布式属性中

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="connect" \
  password="connect-secret";

来源:Kafka连接安全文档

 类似资料:
  • 设置 我在端口8082上有一个正在运行的服务器(wilfly swarm,keycloack authenticated),它承载了我的日志功能。我可以通过REST将日志线推送到这个服务器。在幕后,Kafka的制作人正在运行和传播给Kafka的信息。 我在2181口有动物园管理员 我有一个代理在端口9092运行 日志服务器在端口8082上运行 当我试图通过Java生成器(和控制台生成器)生成消息时

  • 那是我学习Kafka的初期。我正在检查我本地机器中的每一个Kafka属性/概念。 所以我遇到了属性,下面是我的理解。如果我误解了什么,请纠正我。 将消息发送到主题后,必须将消息写入至少关注者数。 还包括引导。 如果可用活动代理的数量(间接地,在同步副本中)少于指定的,则生产者将引发发布消息失败的异常。 以下是我创建上述场景所遵循的步骤 在本地启动了3个代理,代理ID为0、1和2 创建了主题insy

  • 我一直在测试kafka连接。但是对于每个连接器,我都必须去阅读连接器留档以了解连接器所需的配置。就我阅读kafka连接API留档而言,我已经看到API以获取连接器相关数据。 -返回Kafka Connect集群中安装的连接器插件列表。请注意,API仅检查处理请求的工作人员上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加了新的连接器罐。 根据配置定义验证提供的配置值。此

  • 我正在做一个Spring Boot应用程序,并试图以编程方式配置kafka,但由于某些原因,我仍然在从应用程序获取属性。yaml而不是我通过编程设置的

  • 我正在浏览Kafka连接,我试图得到一些概念。 假设我有kafka集群(节点k1、k2和k3)设置并且正在运行,现在我想在不同的节点上运行kafka连接工作器,比如分布式模式下的c1和c2。 很少有问题。 1) 要在分布式模式下运行或启动kafka connect,我需要使用命令,这在kaffa集群节点中可用,所以我需要从任何一个kafka集群节点启动kafka连接?或者我启动kafka conn

  • 我有一个kafka connect插件,部署在kafka集群中(在独立模式下,仅用于测试,目的是分布式完成)。这个Kafka连接插件使用curator连接到集群的zookeper,并从中提取一些信息,以决定如何处理这些消息。 代码如下: 在treeCache启动时超时,配置根路径存在于本地zookeeper中(已确认在zookeeper外壳中执行ls,对于我尝试使用的zkConnection字符串