当前位置: 首页 > 知识库问答 >
问题:

带有SSL的Kafka在producer中失败

姚建树
2023-03-14
---
  version: '3'
  services:
    zookeeper:
      image: confluentinc/cp-zookeeper:latest
      container_name: ${ZK_HOST}
      hostname: ${ZK_HOST}
      ports:
        - "${ZK_PORT}:${ZK_PORT}"
      environment:
        ZOOKEEPER_SERVER_ID: 1
        ZOOKEEPER_CLIENT_PORT: ${ZK_PORT}
        ZOOKEEPER_CLIENT_SECURE: 'true'
        ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/zookeeper/secrets/kafka.keystore.jks
        ZOOKEEPER_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/zookeeper/secrets/kafka.truststore.jks
        ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      volumes:
        - ./secrets:/etc/zookeeper/secrets

    kafka-ssl:
      image: confluentinc/cp-kafka:latest
      container_name: ${BROKER_HOST}
      hostname: ${BROKER_HOST}
      ports:
        - "${BROKER_PORT}:${BROKER_PORT}"
      depends_on:
        - ${ZK_HOST}
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
        KAFKA_ADVERTISED_LISTENERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
        KAFKA_SSL_KEYSTORE_CREDENTIALS: cert_creds
        KAFKA_SSL_KEY_CREDENTIALS: cert_creds
        KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
        KAFKA_SSL_TRUSTSTORE_CREDENTIALS: cert_creds
        KAFKA_SSL_CLIENT_AUTH: 'required'
        KAFKA_SECURITY_PROTOCOL: SSL
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      volumes:
        - ./secrets:/etc/kafka/secrets
  
    schema-registry:
      image: confluentinc/cp-schema-registry
      container_name: ${SR_HOST}
      hostname: ${SR_HOST}
      depends_on:
        - ${ZK_HOST}
        - ${BROKER_HOST}
      ports:
        - "${SR_PORT}:${SR_PORT}"
      environment:
        SCHEMA_REGISTRY_HOST_NAME: ${SR_HOST}
        SCHEMA_REGISTRY_LISTENERS: 'https://0.0.0.0:${SR_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '${ZK_HOST}:${ZK_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
        SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
        SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
        SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
        SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
        SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
      volumes:
        - ./secrets:/etc/schema-registry/secrets

    connect:
      build:
        context: .
        dockerfile: Dockerfile
      image: chethanuk/kafka-connect:5.3.1
      hostname: ${SR_CON}
      container_name: ${SR_CON}
      depends_on:
        - ${ZK_HOST}
        - ${BROKER_HOST}
        - ${SR_HOST}
      ports:
        - "${SR_CON_PORT}:${SR_CON_PORT}"
      environment:
        CONNECT_LISTENERS: 'https://0.0.0.0:${SR_CON_PORT}'
        CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
        CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
        CONNECT_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_REST_ADVERTISED_HOST_NAME: ${SR_CON}
        CONNECT_REST_PORT: ${SR_CON_PORT}
        CONNECT_GROUP_ID: compose-connect-group
        CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
        CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
        CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
        CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
        CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
        CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
        CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://${SR_HOST}:${SR_PORT}
        CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
        CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
        CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
        CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
        CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
        CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
        CONNECT_SSL_CLIENT_AUTH: 'true'
        CONNECT_SECURITY_PROTOCOL: SSL
        CONNECT_SSL_KEY_PASSWORD: ${SSL_SECRET}
        CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
        CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
        CONNECT_PRODUCER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL
        CONNECT_CONSUMER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      volumes:
        - ./secrets:/etc/kafka/secrets
[2021-05-21 05:13:50,157] INFO Requested thread factory for connector MySqlConnector, id = myql named = db-history-config-check (io.debezium.util.Threads)
[2021-05-21 05:13:50,160] INFO ProducerConfig values: 
    acks = 1
    batch.size = 32768
    bootstrap.servers = [broker:29092]
    buffer.memory = 1048576
    client.dns.lookup = default
    client.id = myql-dbhistory
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 10000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
 (org.apache.kafka.clients.producer.ProducerConfig)
[2021-05-21 05:13:50,162] WARN Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker (org.apache.kafka.clients.ClientUtils)
[2021-05-21 05:13:50,162] INFO [Producer clientId=myql-dbhistory] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,163] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:235)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:40)
    at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:90)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:94)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
    ... 14 more
[2021-05-21 05:13:50,164] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

变量

  • ssl_secret=
  • zk_host=zookeeper
  • zk_port=2181
  • broker_host=kafka-ssl
  • broker_port=9092
  • sr_host=schema-registry
  • sr_port=8181
  • sr_con=connect
  • sr_con_port=8083
  • host=localhost

共有1个答案

郑琦
2023-03-14

buildimage不应一起使用。您没有显示您的Dockerfile,所以不清楚您在那里做什么,但它可以解释为什么没有实际加载变量

bootstrap.servers = [broker:29092]

在您的连接配置中,您没有使用kafka-ssl:9092作为连接字符串

请注意,您的键和值序列化程序使用的是String,而不是Avro设置。拦截器列表为空,SSL设置似乎没有应用,等等

 类似资料:
  • 这是YAML文件中的Kafka Producer属性。当我启用SSL时,我的kafka生产者无法工作。它无法识别经纪人的主题。但当我使用PLAINTEXT时,我的Kafka制作人工作正常。我是否缺少SSL配置的内容。 PS:对于SSL和PLAINTEXT,Bootsrap服务器是不同的。 这是我的Kafka制作人配置 这是在Spring boot控制台上为kafka prodcuer返回的值

  • 我正在尝试将KAFKA与Spring集成,我的JAVA应用程序正在与KAFKA服务器通信,当我使用HTTP运行应用程序时,我也会收到消息。 现在我想使用 Spring 在 KAFKA 上添加 SSL,我已经完成了在 SSL KAFKA 和 SPRING KAFKA 上指定的更改 当我使用命令行(使用 SSL)运行生产者和消费者时,通信会正常发生,但是当我更改 Java 应用程序的配置并尝试生成和使

  • 我想让SSL和Kafka一起运行,让它更安全。我下载了Kafka并安装了它。我按照说明为SSL创建证书和信任库,没有任何问题。我将以下内容添加到我的config/server.properties中 启动Zookeeper后,我在启动kafak时收到此错误:[2017-12-07 16:02:52,155]ERROR[Controller id=0, targetBrokerId=0]连接到节点0

  • 我需要捕获异步发送到Kafka时的异常。Kafka producer Api附带一个函数send(ProducerRecord记录、回调)。但当我针对以下两种情况进行测试时: Kafka经纪人倒下 主题没有预创建回调没有被调用。相反,我在代码中收到发送不成功的警告(如下所示)。 问题: > 那么回调是否只针对特定的异常调用? Kafka客户端何时尝试在异步发送时连接到Kafka代理:每次批处理发送

  • 我试图配置SASL/PLAIN与SSL在我们的Kafka环境。SSL部分已经完成,但是我在启动动物园管理员时遇到了以下错误。 有人为动物园管理员和经纪人配置了SASL/PLAIN和SSL吗? 服务器代理配置 动物园管理员配置 JAAS动物园管理员配置文件 JAAS代理配置文件 启动错误 代理启动命令 环境变量 第1学期 在2号航站楼

  • 从服务器上,我能够连接并从配置了SSL的远程kafka服务器主题获取数据。 如果我指向GCS上的证书,当证书指向Google存储桶时,它会抛出错误。 其次是:Truststore和Google Cloud Dataflow 更新的代码将SSL truststore、keystore位置指向本地机器的/tmp目录认证,以防KafkaIO需要从文件路径读取。它没有抛出FileNotFounderRor