我正在尝试设置Kafka Connect,目的是运行Elasticsearch chSinkConntor。
Kafka安装程序,由3个使用Kerberos、SSL和ACL保护的代理程序组成。
到目前为止,我一直在尝试使用docker/docker-com的连接器运行连接框架和elasticserch-server本地化(使用Kafka 2.4连接到远程kafka安装(Kafka 2.0.1-实际上是我们的生产环境)。
KAFKA_OPTS: -Djava.security.krb5.conf=/etc/kafka-connect/secrets/krb5.conf
CONNECT_BOOTSTRAP_SERVERS: srv-kafka-1.XXX.com:9093,srv-kafka-2.XXX.com:9093,srv-kafka-3.XXX.com:9093
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: user-grp
CONNECT_CONFIG_STORAGE_TOPIC: test.internal.connect.configs
CONNECT_OFFSET_STORAGE_TOPIC: test.internal.connect.offsets
CONNECT_STATUS_STORAGE_TOPIC: test.internal.connect.status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: srv-kafka-1.XXX.com:2181,srv-kafka-2.XXX.com:2181,srv-kafka-3.XXX.com:2181
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/kafka-connect/secrets/kafka-connect.keytab" \
principal="<principal>;
CONNECT_SASL_MECHANISM: GSSAPI
CONNECT_SSL_TRUSTSTORE_LOCATION: <path_to_truststore.jks>
CONNECT_SSL_TRUSTSTORE_PASSWORD: <PWD>
< br >当启动connect-framework时,一切似乎都工作正常,我可以看到声明kerberos身份验证成功的日志等。< br>
当我试图使用curl启动连接作业时,问题出现了。< br >
curl -X POST -H "Content-Type: application/json" --data '{ "name": "kafka-connect", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": 1, "topics": "test.output.outage", "key.ignore": true, "connection.url": "http://elasticsearch1:9200", "type.name": "kafka-connect" } }' http://localhost:8083/connectors
工作似乎启动时没有问题,但一旦它即将开始消耗从Kafka-主题我得到:< br >
kafka-connect | [2020-04-06 10:35:33,482] WARN [Consumer clientId=connector-consumer-user-grp-2-0, groupId=connect-user-2] Bootstrap broker srv-kafka-1.XXX.com:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
在所有代理的连接日志中重复。
这个问题的性质是什么?与代理的通信似乎运行良好 - 连接作业按预期传达回 kafka,当连接框架重新启动时,作业似乎按预期恢复(即使仍然有故障)。
任何人都知道可能导致这种情况的原因吗?或者我应该如何调试它。
由于这是我们的生产环境,我更改服务器配置的可能性有限。但从我能看出的日志中没有任何内容似乎表明有问题。
提前致谢
根据文档,您还需要为 Kafka Connect 正在运行的连接器在使用者/生产者上配置安全性。您可以通过添加使用者
/生产者
前缀来执行此操作。因此,由于您使用的是 Docker,并且错误表明您正在创建一个接收器连接器(即需要使用者),请添加到您的配置中:
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_KERBEROS_SERVICE_NAME: "kafka"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/etc/kafka-connect/secrets/kafka-connect.keytab" \
principal="<principal>;
CONNECT_CONSUMER_SASL_MECHANISM: GSSAPI
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: <path_to_truststore.jks>
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: <PWD>
如果还要创建源连接器,则需要复制上述内容,但对于PRODUCER _
也是如此
断开mqtt连接,前提是必须已经通过Iot_id,Iot_pwd建立过一次mqtt连接。 请求方式: "|4|1|4|\r" 返回值: "|4|1|4|1|\r" 断开成功 "|4|1|4|2|\r" 断开失败 Arduino样例: softSerial.print("|4|1|4|\r");
问题内容: StackExchange.Redis的“ 基本用法”文档解释说,该方法是长期存在的,有望重新使用。 但是,当与服务器的连接断开时该怎么办?是否自动重新连接,或者是否有必要像此答案一样编写代码(引用该答案): 上面的代码是处理断开连接恢复的好方法,还是实际上会导致多个实例?同样,该属性应如何解释? [另外:我相信上面的代码是惰性初始化的一种非常糟糕的形式,尤其是在多线程环境中-请参阅
问题内容: 我正在使用HAProxy在子域上将请求发送到node.js应用程序。 我无法使WebSockets正常工作。到目前为止,我只能使客户端建立WebSocket连接,但是之后很快就会断开连接。 我在ubuntu上。 我一直在使用的各种版本和。客户端是Safari或Chrome的最新版本。HAProxy版本是1.4.8 这是我的HAProxy.cfg 我已经拖网和邮件列表,但无法获得任何建议
问题内容: 当我的MongoDB连接空闲几分钟后,下一个请求将错误结束。从命令行客户端,它看起来像这样: 我看到针对MongoHQ和MongoLab的沙箱实例的问题。 由于重新连接,下一个请求通过正常。这是我的网络应用程序中的一个问题,因为几分钟不活动后,在网络请求期间将出现此错误。有两件事让我感到惊讶: MongoDB连接被如此频繁和频繁地破坏,并且 驱动程序只是引发一个异常,而不是在重新连接后
我有一个kafka connect插件,部署在kafka集群中(在独立模式下,仅用于测试,目的是分布式完成)。这个Kafka连接插件使用curator连接到集群的zookeper,并从中提取一些信息,以决定如何处理这些消息。 代码如下: 在treeCache启动时超时,配置根路径存在于本地zookeeper中(已确认在zookeeper外壳中执行ls,对于我尝试使用的zkConnection字符串
我需要帮助消费者在Spring启动。当断开连接时,我需要停止应用程序,例如10分钟。当断开连接时 或者当无法连接时 我使用ConsumerFactory和ConcurrentKafkaListenerContainerFactory进行消费者的所有配置