当前位置: 首页 > 知识库问答 >
问题:

java包中sink连接器的实现

齐乐
2023-03-14

我已经启动了zoo的管理员,Kafka服务器,Kafka生产者和Kafka消费者,我已经把jdbc sql连接器罐下载从合流,并把罐子的路径,我已经提到plugin.path连接独立properties.and我已经运行connect-standalone.bat……\config\connect-standalone.properties…\config\sink-quickstart-mysql.properties没有任何错误,但它有许多警告,它没有开始,但我的数据没有得到反映tables.what我错过了?你能帮帮我吗?我有下面的警告

                                                                                                    org.reflections.ReflectionsException: could not get type for name io.netty.inter
nal.tcnative.SSLPrivateKeyMethod
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
        at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
        at org.reflections.Reflections.<init>(Reflections.java:140)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$Inte
rnalReflections.<init>(DelegatingClassLoader.java:433)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scan
PluginPath(DelegatingClassLoader.java:325)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scan
UrlsAndAddPlugins(DelegatingClassLoader.java:261)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.init
PluginLoader(DelegatingClassLoader.java:209)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.init
Loaders(DelegatingClassLoader.java:202)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.jav
a:60)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone
.java:79)
Caused by: java.lang.ClassNotFoundException: io.netty.internal.tcnative.SSLPriva
teKeyMethod
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
        ... 9 more

共有1个答案

周正真
2023-03-14

无需自己编写源连接器,除非需要将 kafka 连接到某些外来数据源。像mysql这样的流行工具已经被很好地覆盖了。confluent已经有一个“jdbc-connector”,可以做你想要的。

https://docs . confluent . io/current/connect/Kafka-connect-JDBC/index . html

您将需要一个有效的kafka-连接安装,然后您可以使用HTTP POST将mysql表“连接”到kafka连接API。只需在tables.whitelist属性中指定一个逗号分隔的表列表,您希望将其用作源。例如,像这样的东西……

curl -X POST $KAFKA_CONNECT_API/connectors -H "Content-Type: application/json" -d '{
      "name": "jdbc_source_mysql_01",
      "config": {
              "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
              "connection.url": "jdbc:mysql://mysql:3306/test",
              "connection.user": "connect_user",
              "connection.password": "connect_password",
              "topic.prefix": "mysql-01-",
              "poll.interval.ms" : 3600000,
              "table.whitelist" : "test.accounts",
              "mode":"bulk"
              }
      }'
 类似资料:
  • 我正在实现一个连接池(JDBC连接和SMPP连接)。我知道有几个经过良好测试的连接池。但我只想自己实现。我在多线程环境中使用它。这更是我个人的兴趣所在。我的实现是这样的。我创建一个ConcurrentLinkedQueue并将连接推送到队列。每次线程请求连接时,都会从队列中弹出连接。作业完成后,线程将连接推回到队列。我的连接轮询实现类如下所示。 我只想知道这个实现有什么问题。请指教。我想为JDBC

  • 我们希望使用Kafka connect sink连接器将消息从Kafka复制到Mongo DB。在我们的用例中,我们有多个主题,每个主题都有一个分区(主题的名称可以用正则表达式表示,例如topic.XXX.name)。这些主题的数量在不断增加。我想知道Kafka connect架构是否适合这个用例。如果是这样,如何配置它的增益高可缩放性和并行性?任务是什么。最大值?工人数量?

  • 我正在阅读一个已经创建的Kafka主题,在这个主题上,一个单独的集群正在产生一些键和值。我的最终目标是以JSON格式写HDFS,为此我已经用Kafka HDFS Sink 5.3做了一段时间的实验。我面临的问题是,我无法将该主题的所有记录摄取并写入HDFS。到目前为止,如果我的主题包含每小时数百万条记录的数据,我只能写10万条记录。 以下是我用于kafka-connect-standalone.p

  • 这就是开发人员指南对动态连接器的描述https://docs.confluent.io/current/connect/devguide.html#dynamic-连接器 并非所有的连接器都有一组静态的分区,因此连接器实现还负责监控外部系统是否有任何可能需要重新配置的变化。例如,在JDBCSourceConnector示例中,连接器可能会为每个任务分配一组表。当创建一个新表时,它必须发现这一点,这

  • 我试图使用java包装器将RunDeck server作为windows服务运行,本文将指导您:http://www.phwitservices.com/2014/06/rundeck-windows-service

  • 在这篇文章runn-mongob-querys-contined-with-go中说mgo。DialSusInfo:创建一个会话,它维护一个到MongoDB的套接字连接池,但是当我在函数DialSusInfo的文档中查找时,我没有找到谈论池连接的东西,只有我在Dial Function Dial Function中找到了一些东西,上面写着:对于给定的集群,此方法通常只调用一次。然后在获得的会话上使