当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect找不到连接器

潘宝
2023-03-14

我正在尝试使用Kafka Connect Elasticsearch连接器,但没有成功。它正在崩溃,并出现以下错误:

[2018-11-21 14:48:29,096] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector , available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.1', encodedVersion=1.0.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}

我已经在kafka子文件夹中解压了插件的编译版本,并在connect-standalone.properties中有以下代码行:

plugin.path=/opt/kafka/plugins/kafka-connect-elasticsearch-5.0.1/src/main/java/io/confluent/connect/elasticsearch

我可以看到该文件夹中的各种连接器,但Kafka Connect不加载它们;但它确实加载了标准连接器,如下所示:

[2018-11-21 14:56:28,258] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:136)
[2018-11-21 14:56:28,259] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-11-21 14:56:28,260] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)

如何正确注册连接器?

共有3个答案

杨柏
2023-03-14

插件路径必须加载JAR文件,包含编译的代码,而不是源代码的原始Java类(< code>src/main/java)。

它还需要是包含这些插件的其他目录的父目录。

plugin.path=/opt/kafka-connect/plugins/

哪里?

$ ls - lR /opt/kafka-connect/plugins/
kafka-connect-elasticsearch-x.y.z/
    file1.jar
    file2.jar 
    etc

参考-手动安装社区连接器

ConFluent平台中的Kafka Connect启动脚本会自动(用于?)读取所有匹配共享/java/kafka-连接-*的文件夹,所以这也是一种方法。至少,如果您在插件路径中也包含ConFluent包安装的共享/java文件夹的路径,它会继续这样做

如果你对Maven不是很熟悉,或者即使你很熟悉,那么你实际上不能只是克隆Elasticsearch连接器存储库并构建主分支;它首先具有 Kafka 的先决条件,然后首先是公共 Confluent 存储库。否则,您必须签出与 Confluent 版本匹配的 Git 标签(如 5.0.1-post)。

更简单的选择是使用Confluent Hub CLI获取包

如果这些都不起作用,那么下载Confluent平台并使用Kafka Connect脚本将是最简单的。这并不意味着您需要从中使用Kafka或Zookeeper配置

那绪
2023-03-14

编译后的JAR需要对Kafka Connect可用。这里有几个选项:

>

  • 使用融合平台,其中包括预先构建的Elasticsearch(和其他):https://www.confluent.io/download/.有zip、rpm/deb、Docker映像等可用。

    自己构建 JAR。这通常涉及:

    cd kafka-connect-elasticsearch-5.0.1
    mvn clean package
    

    然后获取生成的< code > Kafka-Connect-elastic search-5 . 0 . 1 . JAR JAR,并将其放在Kafka Connect with < code > plugin . path 中配置的路径中。

    您可以在此处找到有关使用Kafka Connect的更多信息:

    • https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/
    • https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
    • https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

    免责声明:我为ConFluent工作,并撰写了上述博客文章。

  • 柴瀚
    2023-03-14

    我昨天在docker中的kafka上手动运行了jdbc连接器,没有融合平台等,只是为了了解这些东西是如何在下面工作的。我不必在我这边构建jar或任何类似的东西。希望它与您相关-我所做的是(我将跳过docker部分如何使用连接器安装dir等):

    >

  • 从 https://www.confluent.io/connector/kafka-connect-jdbc/ 下载连接器,解压缩 ZIP
  • 将 zip 的内容放入属性文件中配置的路径中的目录(如下所示的第 3 点) -

    plugin.path=/plugins
    

    所以树看起来像这样:

    /plugins/
    └── jdbcconnector
        └──assets
        └──doc
        └──etc
        └──lib
    

    注意依赖项所在的lib目录,其中一个是kafka-connect-jdbc-5.0.0.jar

    现在您可以尝试运行连接器

    ./connect-standalone.sh connect-standalone.properties jdbc-connector-config.properties
    

    独立连接。属性是kafka connect所需的通用属性,在我的例子中:

    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    plugin.path=/plugins
    rest.port=8086
    rest.host.name=127.0.0.1
    

    jdbc连接器配置。属性涉及更多,因为它只是这个特定连接器的配置,您需要深入了解连接器文档-对于jdbc源代码https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html

  •  类似资料:
    • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

    • 我试图使用带有Datastax Spark-Cassandra连接器的Spark查询Cassandra。火花代码是 我发现了另一个帖子,看起来类似的火花工作卡珊德拉错误,但它是一个不同的类,无法找到,所以我不确定它是否有帮助。

    • 我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id

    • 问题内容: 注意:我已经尝试过这里给出的其他解决方案,但是没有用 NodeJ的新手。我试图跟随AngularJS专业人士,并陷入设置NodeJs服务器。根据书,我安装了nodejs,然后使用npm install connect安装了connect软件包 然后将angularjs下载到nodejs文件夹旁边的文件夹中。然后编写server.js文件以连接到服务器。这是文件的内容: 当我使用以下命令

    • 我试图在詹金斯安装Android设备连接器插件。 但我在我的Jenkins或那里找不到这个插件http://updates.jenkins-ci.org/download/plugins/. 我只找到了iOS设备连接器插件。 如何使用这个插件?

    • 问题内容: 我只是不明白发生了什么。我的go应用程序无法连接到elasticsearch。该节点可用,已启动并正在运行。我在这里做错了什么? 这里有什么不对的地方?错误说 这是我在浏览器中命中GET请求时从elasticsearch返回的数据 } 问题答案: 当您继续在客户端中进行嗅探但群集没有可用节点时,通常会发生错误。您可以通过点击来检查集群的状态。 如果您不禁用嗅探功能,则Golang客户端