当前位置: 首页 > 知识库问答 >
问题:

Kafka连接(融合5.0、4.1.2或3.0)未启动

徐星阑
2023-03-14

我们有一个Kafka集群(作为第3方托管服务),它启用了SSL。我们现在尝试使用第3方Sink(WePay BigQuery连接器)设置Kafka Connect(Conflow ent 5.0)。当在独立模式下启动Kafka连接时,一切都像魅力一样工作。不幸的是,当启用分布式模式时,Kafka Connect突然失败,并出现以下情况:

[2018-09-25 15:01:46,248] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-09-25 15:01:46,248] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-09-25 15:01:46,667] INFO Kafka cluster ID: Q9PaAEeWSbOavVmHTQS5sA (org.apache.kafka.connect.util.ConnectUtils:59)
[2018-09-25 15:01:46,685] INFO Logging initialized @10512ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193)
[2018-09-25 15:01:46,726] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:119)
[2018-09-25 15:01:46,760] INFO Advertised URI: http://192.168.4.207:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:267)
[2018-09-25 15:01:46,796] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-09-25 15:01:46,796] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
ERROR Stopping due to error 
(org.apache.kafka.connect.cli.ConnectDistributed:117)
java.lang.NoSuchMethodError: 
org.apache.kafka.common.metrics.Sensor.add 
(Lorg/apache/kafka/common/metrics/CompoundStat;)Z
at org.apache.kafka.connect.runtime.Worker$WorkerMetricsGroup.<init> . 
(Worker.java:731)
at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:112)
at 
org.apache.kafka.connect.cli.ConnectDistributed.main 
(ConnectDistributed.java:88)

尝试在Google上查找特定错误,但找不到任何内容。它看起来像是某个地方的版本问题(因此是NoSuchMachodError),但不知道从哪里开始。

与Confluent 4.1.2一起使用时,会出现不同的错误:

[2018-09-26 15:14:05,498] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:112)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:144)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:182)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:159)
    at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:95)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.<init>(Lorg/apache/kafka/common/utils/LogContext;Lorg/apache/kafka/clients/KafkaClient;Lorg/apache/kafka/clients/Metadata;Lorg/apache/kafka/common/utils/Time;JJI)V
    at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:114)
    ... 3 more

当我们使用相同但与Kafka Connect(ConFluent 3.0)一起使用时,会出现不同的错误:

[2018-09-26 10:04:24,588] INFO AvroDataConfig values: 
    schemas.cache.config = 1000
    enhanced.avro.schema.support = false
    connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig:169)
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(Ljava/lang/String;Ljava/lang/String;)V
    at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.stop(WorkerGroupMember.java:194)
    at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:122)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:150)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:132)
    at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:82)

这是分布式的。属性:

bootstrap.servers=*****
group.id=testGroup
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=****
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=****
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
security.protocol=SSL
ssl.truststore.location=truststore.jks
ssl.truststore.password=****
ssl.keystore.type=PKCS12
ssl.keystore.location=keystore.p12
ssl.keystore.password=****
ssl.key.password=****
plugin.path=/*/confluent-5.0.0/share/java

作为参考,standalone.properties:

bootstrap.servers=***
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=***
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=***
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=connect.offsets
consumer.security.protocol=SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=***
consumer.ssl.keystore.type=PKCS12
consumer.ssl.keystore.location=keystore.p12
consumer.ssl.keystore.password=***
consumer.ssl.key.password=***

任何帮助都将不胜感激。

共有1个答案

拓拔富
2023-03-14

我刚刚发现,你必须前缀kafka客户端配置在kafka连接属性文件:https://docs.confluent.io/current/connect/userguide.html#overriding-producer-and-consumer-settings

您的单机版。属性前缀是否与使用者配置:

consumer.security.protocol=SSL

但是你的分布。属性不会:

security.protocol=SSL
 类似资料:
  • 我正在使用合流kafka connect jdbc源将mysql表中的记录推送到我的kafka主题,但似乎日期列被转换为纪元时间。 这是我的配置: kafka主题中的输出: 我也在类似于“select from_unixtime(updated _ on)from temp”的查询中尝试了from _ unixtime(),但是那不行。 有没有办法推到YYYY-MM-DD HH:MM:SS格式的K

  • 我正在浏览Kafka连接,我试图得到一些概念。 假设我有kafka集群(节点k1、k2和k3)设置并且正在运行,现在我想在不同的节点上运行kafka连接工作器,比如分布式模式下的c1和c2。 很少有问题。 1) 要在分布式模式下运行或启动kafka connect,我需要使用命令,这在kaffa集群节点中可用,所以我需要从任何一个kafka集群节点启动kafka连接?或者我启动kafka conn

  • 对于托管在 Confluent Cloud 中的 Kafka 集群,会创建一个审核日志集群。似乎可以将接收器连接器挂接到此群集,并从“汇合审核日志事件”主题中排出事件。 但是,当我运行连接器执行相同操作时,我遇到了以下错误。 在我的connect-distributed.properties文件中,我的设置如下: 需要授予哪些额外的权限,以便连接器可以在集群中创建所需的主题?connect-dis

  • 我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时

  • 我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。 用于独立连接器的命令,工作正常 - 现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示 2 个节点当前正在运行具有相同连接服务。 连接服务已启动并正在运行,但它不会加载 下定义的连接器。 应该对

  • 这就是我想要做的:我在raspberry pi 3上安装了一个glassfish,我想用它作为netbeans的远程域。我知道我需要DAS。当我试图通过xxx通过webbrowser进入管理控制台时。xxx。xxx。xxx:4848并在控制台中输入我的凭据im收到此错误:管理控制台错误现在我已了解到,我需要启用安全管理员,以便使用以下命令通过远程主机访问控制台: 在此之后,我无法再访问管理控制台,