当前位置: 首页 > 知识库问答 >
问题:

kafka jdbc接收器连接器独立错误

蔺沛
2023-03-14

我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-quickstart-mysql.properties

sink-quick start-MySQL . properties如下

name=test-sink-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=third_topic
connection.url=jdbc:postgres://localhost:5432/postgres
connection.user=postgres
connection.password=postgres
auto.create=true

我得到的错误是

[2019-01-29 13:16:48,859] ERROR Failed to create job for /home/ashley/confluent-5.1.0/etc/kafka-connect-jdbc/sink-quickstart-mysql.properties (org.apache.kafka.connect.cli.ConnectStandalone:102) [2019-01-29 13:16:48,862] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113) java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}     at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)  at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)     at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110) Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.1.0-cp1', encodedVersion=2.1.0-cp1, type=source, typeName='source', location='classpath'}   at org.apache.kafka.connect.runtime.isolation.Plugins.newConnector(Plugins.java:179)    at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:382)    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:261)     at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189)   at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107) [2019-01-29 13:16:48,886] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65) [2019-01-29 13:16:48,886] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:223) [2019-01-29 13:16:48,894] INFO Stopped http_8083@dc4fee1{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:341) [2019-01-29 13:16:48,895] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:167) [2019-01-29 13:16:48,930] INFO Stopped o.e.j.s.ServletContextHandler@3c46dcbe{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1040) [2019-01-29 13:16:48,943] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:241) [2019-01-29 13:16:48,943] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:95) [2019-01-29 13:16:48,944] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:184) [2019-01-29 13:16:48,944] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:66) [2019-01-29 13:16:48,947] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:205) [2019-01-29 13:16:48,950] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:112) [2019-01-29 13:16:48,951] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:70)

Postgres jar文件已经在文件夹中。有人能提出建议吗?

共有1个答案

房学文
2023-03-14

以下行是日志中最重要的行:

java.util.concurrent。执行异常:org.apache.cafka.connect.errors。ConnectException:找不到任何实现连接器且名称与io.confluent.connect.jdbc匹配的类。JdbcSinkConnector,可用的连接器有:。。。

似乎您没有安装kafka connect jdbc连接器

检查 etc/schema-registry/connect-avro-standalone.properties 中的 plugin.path 属性,并确保 plugin.path行未被注释。

如果不使用Confluent Platform,则需要在该插件下创建。path目录,jdbc插件的另一个目录:例如kafka connect jdbc,并将所有需要的jar放在那里。例如kafka connection-jdbc-5.1.0.jar、其依赖项和jdbc驱动程序。

可以找到更多详细信息:https://docs.confluent.io/current/connect/userguide.html#installing-plugins

 类似资料:
  • 在我的程序中,我正在访问wep api。最多可以有7个不同的线程访问web api的不同服务器。每个线程负责一个服务器,每个服务器速率限制每个线程。每个线程更新相同的mysql数据库。线程数保持不变。 在我的示例中,是否需要连接池?我不应该只打开7个不同的连接,这些连接将在程序的生命周期中打开吗?

  • 我尝试使用以下配置启动JDBC接收器连接器: 但当连接器处于运行状态时,没有任务正在运行: 我多次面对这个问题,但我很困惑,因为它是随机发生的。我的问题与这个问题非常相似。如果有任何帮助,我将不胜感激! 更新。11/04/2019(不幸的是,现在我只有INFO级别日志) 最后,经过几次尝试,我通过更新现有连接器的配置crm_data-sink_db_hh启动了正在运行任务的连接器: 日志: 更新。

  • 我们试图在给定的节点上启动多个独立的kafka hdfs连接器。 对于每个连接器,我们分别将和设置为不同的端口和路径。 也是Kafka经纪人JMX港是@ 9999。 当我启动 kafka 独立连接器时,出现错误 错误:代理引发异常:java.rmi.server。ExportException:端口已在使用:9999;嵌套异常是:java.net。BindException:地址已在使用中(绑定失

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka