当前位置: 首页 > 知识库问答 >
问题:

无法连接到和描述Kafka群集。Apache kafka连接

艾晋
2023-03-14

我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息

1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html)

2)将文件解压缩到/home/ec2-user/plugins/

bootstrap.servers=<my brokers>
plugin.path=/home/ec2-user/kafka-plugins
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=<My Topic>
s3.region=us-east-1
s3.bucket.name=vk-ingestion-dev
s3.part.size=5242880
flush.size=1
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
schema.compatibility=NONE

当我使用以上两个prop文件运行connect-standlone.sh时,它会等待某个时间并抛出以下错误。

[AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2019-10-22 19:28:36,789] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
[2019-10-22 19:28:36,796] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:124)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:81)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
    ... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

有什么安全方面的东西需要我去找吗?

共有1个答案

淳于思淼
2023-03-14

添加以下ssl配置后,它工作了。

security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

添加上述参数后,connector启动时没有错误,但数据没有上传到S3。

分别添加生产者和使用者配置参数。

producer.security.protocol=SSL
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks

consumer.security.protocol=SSL
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
 类似资料:
  • 我在从Lambda函数连接Elasticache时遇到问题,我已完成以下操作: 创建了一个新的安全组 Internet正常工作,我通过打开URL验证stackoverflow.com。但是Elasticache自动发现超时。是否需要进行任何额外的配置? 更新I将安全组的入站规则添加到端口6379,现在仍然超时。

  • 我有一个Spring启动应用程序,它使用来自 Kafka 集群中某个主题(例如 topic1)的消息。这就是我的代码目前的样子。 现在我想从另一个Kafka集群中的不同主题开始消费。一种方法是为此创建另一个bean。但是有更好的方法吗?

  • 我试图在我本地minikube集群上的一个简单示例中使用fabric8io/kubernetes-client,在这里我获得了一个pod的IP 是完成执行的pod的名称,如果执行则可见。 是一个kubernetes API主机,我从中获得该主机。 MiniKube只有1个节点。 Fabric8IO/Kubernetes-客户端版本为4.9.1 Java 11 我将此代码作为作业部署在的同一集群上。

  • 我正在尝试从MAC连接到安全的Azure Service Fabric群集,但我遇到了一些SSL问题 我可以通过Web浏览器通过相同的证书验证 /Explorer,但当我尝试与我的命令行相同,我得到SSl错误。有什么建议吗? 错误: 请求中出错,SSLError:HTTPSConnectionPool(host='mylinuxx.centralindia.cloudapp.azure.com',

  • null NewJenkins.Values具有以下特性。 报告的错误是。 我在谷歌上搜索了一段时间,很多网站都提到了服务帐户设置,但没有任何工作。 还有别的步骤吗?

  • 我使用他们的web UI在EMR上创建了一个AWS Spark2.2集群(这里是新手)。我知道我需要连接到主节点,以便开始发出pyspark命令来学习Spark。但是,当我尝试连接到主节点时,它给我一个错误。在浏览了internet之后,我发现使用可能有助于调试正在进行的操作,但我找不到任何有用的信息。下面是我的ssh调试日志。 有人能指出这里的问题是什么吗?编辑:我已经尝试过将端口22添加到安全