当前位置: 首页 > 知识库问答 >
问题:

将Kafka主题下沉到mongodb

纪鸿禧
2023-03-14

我有一个项目,我需要使用java从JSON文件中获取数据,并将其沉入kafka topic,然后将数据从topic沉入mongodb。我已经找到了kafka-mongodb连接器,但是文档只适用于使用汇合平台进行连接。我试过了:

  • 从Maven下载mongo-kafka-connect-1.2.0.jar。
  • 将文件放入 /kafka/plugins 中
  • 在connect-standalone.properties中添加了这一行“plugin.path=C:\kafka\plugins”。
  • 创建了MongoSinkConnector.properties。
name=mongo-sink
topics=test
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true

# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017
database=student_kafka
collection=students
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

然后我运行命令

.\bin\windows\connect单机版。bat.\config\connect单机版。properties.\config\MongoSinkConnector.properties

我犯了这个错误

[2020-08-09 20:18:30,329] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone)
java.util.concurrent.ExecutionException: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
        at com.mongodb.kafka.connect.sink.MongoSinkConfig.createConfigDef(MongoSinkConfig.java:248)
        at com.mongodb.kafka.connect.sink.MongoSinkConfig.<clinit>(MongoSinkConfig.java:139)
        at com.mongodb.kafka.connect.MongoSinkConnector.config(MongoSinkConnector.java:72)
        at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:366)
        at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$1(AbstractHerder.java:326)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
        ... 10 more
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
        at java.net.URLClassLoader.findClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
        at java.lang.ClassLoader.loadClass(Unknown Source)
        ... 10 more

编辑:感谢博格丹·苏卡丘的帮助,我找到了解决这个问题的方法。

您需要将以下罐子添加到kafka/lib文件夹中。

    < li>mongodb-driver-3.12.7.jar和mongodb-driver-core-3.12.7.jar和mongo-java-driver-3.12.6.jar和mongo-Kafka-connect-1 . 0 . 1 . jar。 < li>PS:我在使用最新的mongo-kafka-connect时遇到了一些问题。所以我不得不使用这个版本。

共有1个答案

牧宁
2023-03-14

您缺少MongoDB驱动程序。MongoDB连接器jar仅包含与Kafka Connect相关的类,但它仍然需要驱动程序才能连接到MongoDB实例。您需要下载该驱动程序并将jar文件复制到您发布连接器的同一路径(C:\kafka\plugins)。

为了保持干净,您还应该在该插件目录中创建另一个文件夹(例如:C:\kafka\plugins\mongodb),并将与此连接器相关的所有内容移动到那里。

稍后编辑:

这让我相信kafka-连接-mongdbjar和mongodb-驱动程序是不够的。不过,你可以试一试。

 类似资料:
  • 我有一个服务器a,在服务器a中,我安装了kafka并启动了kafka和Zookeeper。我还创建了一个主题作为my_topic。现在我有一个应用程序B运行在服务器B中,应用程序B有一些数据,我想把这些数据推送到服务器A中的my_topic。我是否也需要在服务器B中安装kafka并在服务器B中创建一个生产者?如果是,如何将来自服务器B的消息推送到服务器A中的主题?介质是什么?

  • 我看过mongodb文档,它支持spark到mongo sink https://docs.mongodb.com/spark-connector/master/scala/streaming/

  • 阅读:Kafka Connect FileStreamSource忽略附加行 看来Kafka现在支持这一观点,他说: https://docs.confluent.io/5.5.0/connect/management/configuring.html#Standalone-示例 是否声明该文件被监视: 开始独立连接 将文件的所有内容添加到主题中,将新行添加到中不会将这些行添加到主题中。是否需要配

  • 我见过,但对于我的(简单的)用例来说,它似乎有些过头了。 我也知道,但我不想仅仅为此编写和维护代码。 我的问题是:有没有一种方法可以用kafka原生工具实现这个主题调度,而不用自己写一个Kafka-Consumer/Producer?

  • 问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ

  • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr