当前位置: 首页 > 知识库问答 >
问题:

mongo kafka连接源

颜举
2023-03-14

我使用kafka connect从mongo读取数据并将其写入kafka主题。

我正在使用 mongo kafka 源连接器。

我收到以下错误:

ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:115)
java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
    at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:209)
    at com.mongodb.kafka.connect.source.MongoSourceConfig.<clinit>(MongoSourceConfig.java:138)
    at com.mongodb.kafka.connect.MongoSourceConnector.config(MongoSourceConnector.java:56)
    at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:282)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:109)
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    ... 7 more

罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压缩生成的jar并遍历这些类时,我找不到提到的类:com.mongodb.ConnectionString

我使用了以下配置文件

工人.属性:

 rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/plugins

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=127.0.0.1:9092

mongo连接器. properties:

name=mongo
tasks.max=1
connector.class =com.mongodb.kafka.connect.MongoSourceConnector
database=
collection=alerts
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter

topic.prefix=someprefix
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
pipeline=[]
batch.size=0
change.stream.full.document=updateLookup

然后我通过以下命令启动了连接器:

/usr/local/kafka/bin/connect-standalone.sh worker.properties mongo-connector.properties 

知道如何解决这个问题

共有2个答案

桑博远
2023-03-14

正如scalacode所指出的,我花了一些时间寻找解决方案,创造了这个答案,最简单的解决方案是从confluent下载jar,而不是从maven。

https://www.confluent.io/hub/mongodb/kafka-connect-mongodb

邓高韵
2023-03-14

您必须将连接器的JAR文件放在<code>插件下。路径,在您的情况下是/usr/share/java/plugins

这些说明已经存在于Confluent的文档中:

Kafka Connect插件是:

在单个JAR文件中包含插件及其第三方依赖项的所有类文件的uber JAR;或者文件系统上包含插件及其第三方依赖项的JAR文件的目录。但是,插件不应包含Kafka Connect运行时提供的任何库。

Kafka Connect使用其插件路径找到插件,该路径是Kafka Connect工作器配置中定义的目录的逗号分隔列表。要安装插件,请将插件目录或uber JAR(或解析到其中之一的符号链接)放在插件路径上列出的目录中,或者更新插件路径以包含包含插件的目录的绝对路径。

 类似资料:
  • 代码片段如下所示: 如果有人有决议,请帮忙?

  • 本文向大家介绍HTTP长连接、短连接?相关面试题,主要包含被问及HTTP长连接、短连接?时的应答技巧和注意事项,需要的朋友参考一下 在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个W

  • 问题内容: 我在两个表之间有多对多关系。 表包含我的餐厅。 表包含不同的类别。 表包含两列,每列分别包含两个表的ID。 以下陈述是我能想到的,但没有给我我想要的输出。 我希望输出是有关餐厅的信息,并在最后一列中是类别的连接行。 问题答案: 要串联值,可以使用。xml路径解决方案有误,应使用和特殊字符。 您也可以使用变量解决方案

  • 有人能告诉我在maven中scm连接和developerConnection之间的区别吗? 我正在尝试使用,它需要其中之一。 [错误]未能执行goal org . Apache . maven . plugins:maven-release-plugin:2 . 3 . 2:在项目was-topology-legacy-dsl上准备(default-cli ):缺少必需的设置:必须指定scm连接或

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

  • 我有一个kafka connect插件,部署在kafka集群中(在独立模式下,仅用于测试,目的是分布式完成)。这个Kafka连接插件使用curator连接到集群的zookeper,并从中提取一些信息,以决定如何处理这些消息。 代码如下: 在treeCache启动时超时,配置根路径存在于本地zookeeper中(已确认在zookeeper外壳中执行ls,对于我尝试使用的zkConnection字符串

  • 圆盘 D72 直线导轨滑块连接器 A 连接片 3*6 连接片 7*9-B 连接片 45° 连接片 135° 连接片 I1 连接片 O1 8mm轴轮连接片 三角连接片 6*8 十字连接片 T型连接片 连接片0324-184

  • 连接是一个合并小矩阵成大矩阵的过程。事实上,你创建第一个矩阵时就是通过将它的各自独立的元素连接在一起的。中括号[]是连接的算子。例如,以4*4魔方A开始,构造 B = [A A+32; A+48 A+16] 结果是含有加入的四个子矩阵的8×8矩阵。 B = 16 3 2 13 48 35 34 45 5 10 11