---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: ${ZK_HOST}
hostname: ${ZK_HOST}
ports:
- "${ZK_PORT}:${ZK_PORT}"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: ${ZK_PORT}
ZOOKEEPER_CLIENT_SECURE: 'true'
ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/zookeeper/secrets/kafka.keystore.jks
ZOOKEEPER_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/zookeeper/secrets/kafka.truststore.jks
ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
volumes:
- ./secrets:/etc/zookeeper/secrets
kafka-ssl:
image: confluentinc/cp-kafka:latest
container_name: ${BROKER_HOST}
hostname: ${BROKER_HOST}
ports:
- "${BROKER_PORT}:${BROKER_PORT}"
depends_on:
- ${ZK_HOST}
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
KAFKA_ADVERTISED_LISTENERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: cert_creds
KAFKA_SSL_KEY_CREDENTIALS: cert_creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: cert_creds
KAFKA_SSL_CLIENT_AUTH: 'required'
KAFKA_SECURITY_PROTOCOL: SSL
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./secrets:/etc/kafka/secrets
schema-registry:
image: confluentinc/cp-schema-registry
container_name: ${SR_HOST}
hostname: ${SR_HOST}
depends_on:
- ${ZK_HOST}
- ${BROKER_HOST}
ports:
- "${SR_PORT}:${SR_PORT}"
environment:
SCHEMA_REGISTRY_HOST_NAME: ${SR_HOST}
SCHEMA_REGISTRY_LISTENERS: 'https://0.0.0.0:${SR_PORT}'
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '${ZK_HOST}:${ZK_PORT}'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
volumes:
- ./secrets:/etc/schema-registry/secrets
connect:
build:
context: .
dockerfile: Dockerfile
image: chethanuk/kafka-connect:5.3.1
hostname: ${SR_CON}
container_name: ${SR_CON}
depends_on:
- ${ZK_HOST}
- ${BROKER_HOST}
- ${SR_HOST}
ports:
- "${SR_CON_PORT}:${SR_CON_PORT}"
environment:
CONNECT_LISTENERS: 'https://0.0.0.0:${SR_CON_PORT}'
CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
CONNECT_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
CONNECT_REST_ADVERTISED_HOST_NAME: ${SR_CON}
CONNECT_REST_PORT: ${SR_CON_PORT}
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://${SR_HOST}:${SR_PORT}
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
CONNECT_SSL_CLIENT_AUTH: 'true'
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_KEY_PASSWORD: ${SSL_SECRET}
CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
CONNECT_PRODUCER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL
CONNECT_CONSUMER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
volumes:
- ./secrets:/etc/kafka/secrets
[2021-05-21 05:13:50,157] INFO Requested thread factory for connector MySqlConnector, id = myql named = db-history-config-check (io.debezium.util.Threads)
[2021-05-21 05:13:50,160] INFO ProducerConfig values:
acks = 1
batch.size = 32768
bootstrap.servers = [broker:29092]
buffer.memory = 1048576
client.dns.lookup = default
client.id = myql-dbhistory
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 10000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 1
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
(org.apache.kafka.clients.producer.ProducerConfig)
[2021-05-21 05:13:50,162] WARN Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker (org.apache.kafka.clients.ClientUtils)
[2021-05-21 05:13:50,162] INFO [Producer clientId=myql-dbhistory] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,163] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:235)
at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:40)
at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:90)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:94)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
... 14 more
[2021-05-21 05:13:50,164] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
变量
build
和image
不应一起使用。您没有显示您的Dockerfile,所以不清楚您在那里做什么,但它可以解释为什么没有实际加载变量
bootstrap.servers = [broker:29092]
在您的连接配置中,您没有使用kafka-ssl:9092
作为连接字符串
请注意,您的键和值序列化程序使用的是String,而不是Avro设置。拦截器列表为空,SSL设置似乎没有应用,等等
这是YAML文件中的Kafka Producer属性。当我启用SSL时,我的kafka生产者无法工作。它无法识别经纪人的主题。但当我使用PLAINTEXT时,我的Kafka制作人工作正常。我是否缺少SSL配置的内容。 PS:对于SSL和PLAINTEXT,Bootsrap服务器是不同的。 这是我的Kafka制作人配置 这是在Spring boot控制台上为kafka prodcuer返回的值
我正在尝试将KAFKA与Spring集成,我的JAVA应用程序正在与KAFKA服务器通信,当我使用HTTP运行应用程序时,我也会收到消息。 现在我想使用 Spring 在 KAFKA 上添加 SSL,我已经完成了在 SSL KAFKA 和 SPRING KAFKA 上指定的更改 当我使用命令行(使用 SSL)运行生产者和消费者时,通信会正常发生,但是当我更改 Java 应用程序的配置并尝试生成和使
我想让SSL和Kafka一起运行,让它更安全。我下载了Kafka并安装了它。我按照说明为SSL创建证书和信任库,没有任何问题。我将以下内容添加到我的config/server.properties中 启动Zookeeper后,我在启动kafak时收到此错误:[2017-12-07 16:02:52,155]ERROR[Controller id=0, targetBrokerId=0]连接到节点0
我需要捕获异步发送到Kafka时的异常。Kafka producer Api附带一个函数send(ProducerRecord记录、回调)。但当我针对以下两种情况进行测试时: Kafka经纪人倒下 主题没有预创建回调没有被调用。相反,我在代码中收到发送不成功的警告(如下所示)。 问题: > 那么回调是否只针对特定的异常调用? Kafka客户端何时尝试在异步发送时连接到Kafka代理:每次批处理发送
我试图配置SASL/PLAIN与SSL在我们的Kafka环境。SSL部分已经完成,但是我在启动动物园管理员时遇到了以下错误。 有人为动物园管理员和经纪人配置了SASL/PLAIN和SSL吗? 服务器代理配置 动物园管理员配置 JAAS动物园管理员配置文件 JAAS代理配置文件 启动错误 代理启动命令 环境变量 第1学期 在2号航站楼
从服务器上,我能够连接并从配置了SSL的远程kafka服务器主题获取数据。 如果我指向GCS上的证书,当证书指向Google存储桶时,它会抛出错误。 其次是:Truststore和Google Cloud Dataflow 更新的代码将SSL truststore、keystore位置指向本地机器的/tmp目录认证,以防KafkaIO需要从文件路径读取。它没有抛出FileNotFounderRor