我正在尝试使用Kafka Connect Elasticsearch连接器,但没有成功。它正在崩溃,并出现以下错误:
[2018-11-21 14:48:29,096] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.elasticsearch.ElasticsearchSinkConnector , available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.1', encodedVersion=1.0.1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.0.1', encodedVersion=1.0.1, type=source, typeName='source', location='classpath'}
我已经在kafka子文件夹中解压了插件的编译版本,并在connect-standalone.properties中有以下代码行:
plugin.path=/opt/kafka/plugins/kafka-connect-elasticsearch-5.0.1/src/main/java/io/confluent/connect/elasticsearch
我可以看到该文件夹中的各种连接器,但Kafka Connect不加载它们;但它确实加载了标准连接器,如下所示:
[2018-11-21 14:56:28,258] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:136)
[2018-11-21 14:56:28,259] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
[2018-11-21 14:56:28,260] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:335)
如何正确注册连接器?
插件路径必须加载JAR文件,包含编译的代码,而不是源代码的原始Java类(< code>src/main/java)。
它还需要是包含这些插件的其他目录的父目录。
plugin.path=/opt/kafka-connect/plugins/
哪里?
$ ls - lR /opt/kafka-connect/plugins/
kafka-connect-elasticsearch-x.y.z/
file1.jar
file2.jar
etc
参考-手动安装社区连接器
ConFluent平台中的Kafka Connect启动脚本会自动(用于?)读取所有匹配共享/java/kafka-连接-*
的文件夹,所以这也是一种方法。至少,如果您在插件路径中也包含ConFluent包安装的共享/java
文件夹的路径,它会继续这样做
如果你对Maven不是很熟悉,或者即使你很熟悉,那么你实际上不能只是克隆Elasticsearch连接器存储库并构建主分支;它首先具有 Kafka 的先决条件,然后首先是公共 Confluent 存储库。否则,您必须签出与 Confluent 版本匹配的 Git 标签(如 5.0.1-post
)。
更简单的选择是使用Confluent Hub CLI获取包
如果这些都不起作用,那么下载Confluent平台并使用Kafka Connect脚本将是最简单的。这并不意味着您需要从中使用Kafka或Zookeeper配置
编译后的JAR需要对Kafka Connect可用。这里有几个选项:
>
使用融合平台,其中包括预先构建的Elasticsearch(和其他):https://www.confluent.io/download/.有zip、rpm/deb、Docker映像等可用。
自己构建 JAR。这通常涉及:
cd kafka-connect-elasticsearch-5.0.1
mvn clean package
然后获取生成的< code > Kafka-Connect-elastic search-5 . 0 . 1 . JAR JAR,并将其放在Kafka Connect with < code > plugin . path 中配置的路径中。
您可以在此处找到有关使用Kafka Connect的更多信息:
免责声明:我为ConFluent工作,并撰写了上述博客文章。
我昨天在docker中的kafka上手动运行了jdbc连接器,没有融合平台等,只是为了了解这些东西是如何在下面工作的。我不必在我这边构建jar或任何类似的东西。希望它与您相关-我所做的是(我将跳过docker部分如何使用连接器安装dir等):
>
将 zip 的内容放入属性文件中配置的路径中的目录(如下所示的第 3 点) -
plugin.path=/plugins
所以树看起来像这样:
/plugins/
└── jdbcconnector
└──assets
└──doc
└──etc
└──lib
注意依赖项所在的lib目录,其中一个是kafka-connect-jdbc-5.0.0.jar
现在您可以尝试运行连接器
./connect-standalone.sh connect-standalone.properties jdbc-connector-config.properties
独立连接。属性是kafka connect所需的通用属性,在我的例子中:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/plugins
rest.port=8086
rest.host.name=127.0.0.1
jdbc连接器配置。属性涉及更多,因为它只是这个特定连接器的配置,您需要深入了解连接器文档-对于jdbc源代码https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html
我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误
我试图使用带有Datastax Spark-Cassandra连接器的Spark查询Cassandra。火花代码是 我发现了另一个帖子,看起来类似的火花工作卡珊德拉错误,但它是一个不同的类,无法找到,所以我不确定它是否有帮助。
我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id
问题内容: 注意:我已经尝试过这里给出的其他解决方案,但是没有用 NodeJ的新手。我试图跟随AngularJS专业人士,并陷入设置NodeJs服务器。根据书,我安装了nodejs,然后使用npm install connect安装了connect软件包 然后将angularjs下载到nodejs文件夹旁边的文件夹中。然后编写server.js文件以连接到服务器。这是文件的内容: 当我使用以下命令
我试图在詹金斯安装Android设备连接器插件。 但我在我的Jenkins或那里找不到这个插件http://updates.jenkins-ci.org/download/plugins/. 我只找到了iOS设备连接器插件。 如何使用这个插件?
问题内容: 我只是不明白发生了什么。我的go应用程序无法连接到elasticsearch。该节点可用,已启动并正在运行。我在这里做错了什么? 这里有什么不对的地方?错误说 这是我在浏览器中命中GET请求时从elasticsearch返回的数据 } 问题答案: 当您继续在客户端中进行嗅探但群集没有可用节点时,通常会发生错误。您可以通过点击来检查集群的状态。 如果您不禁用嗅探功能,则Golang客户端