我试图对Apache Kafka 2.5设置mongo-kafka连接器。我使用的是Kafka-原型连接器。
ExchangeMessage.proto
--------------------
syntax = "proto3";
package exchange_message_def;
option java_package = "kafka.message";
option java_outer_classname = "ExchangeMessage";
option optimize_for = SPEED;
message Order {
string oid = 1;
}
我为这个原型建立了一个jar文件,并放在插件路径中。但我得到以下错误。
我的Pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.data</groupId>
<artifactId>data-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.tests>false</skip.tests>
</properties>
<dependencies>
<!-- Unit testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- Protocol Buffers -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.12.2</version>
</dependency>
<dependency>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-java</artifactId>
<version>1.30.2</version>
<type>pom</type>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<finalName>${project.name}-${project.version}</finalName>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.name}-${project.version}</finalName>
<descriptors>
<descriptor>src/assembly/dist.xml</descriptor>
</descriptors>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>process-sources</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>3.12.2</protocVersion>
<includeDirectories>
<include>src/main/proto</include>
</includeDirectories>
<inputDirectories>
<include>src/main/proto</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<addSources>main</addSources>
<outputDirectory>src/main/java</outputDirectory>
</outputTarget>
<outputTarget>
<type>grpc-java</type>
<addSources>main</addSources>
<outputDirectory>src/main/grpc</outputDirectory>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.30.2</pluginArtifact>
</outputTarget>
<outputTarget>
<type>python</type>
<addSources>main</addSources>
<outputDirectory>src/main/python</outputDirectory>
</outputTarget>
<outputTarget>
<type>js</type>
<addSources>main</addSources>
<outputDirectory>src/main/js</outputDirectory>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Mongo接收器配置:
{
"name":"mongo-sink",
"config":{
"name": "mongo-sink",
"topics": "zz",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
"value.converter": "com.blueapron.connect.protobuf.ProtobufConverter",
"value.converter.protoClassName":"kafka.message.ExchangeMessage$Order",
"key.converter.schemas.enable":false,
"value.converter.schemas.enable":true,
"connection.uri": "mongodb://user:password@localhost:27017",
"database":"data_db",
"collection":"zz",
"max.num.retries":"3",
"retries.defer.timeout":"5000",
"key.projection.type":"none",
"key.projection.list": "",
"value.projection.type":"none",
"value.projection.list": "",
"field.renamer.mapping":"[]",
"field.renamer.regex":"[]",
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"post.processor.chain":"com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"delete.on.null.values":false,
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy",
"max.batch.size":"2",
"rate.limiting.timeout":"100",
"rate.limiting.every.n":"100",
"change.data.capture.handler":""
}
}
我得到以下错误:
组织。阿帕奇。Kafka。连接错误。ConnectException:原型类kafka。消息在com的类路径中找不到ExchangeMessage$Order。蓝停机坪。连接protobuf。原型转换器。在org上配置(ProtobufConverter.java:48)。阿帕奇。Kafka。连接运行时。隔离插件。org上的newConverter(Plugins.java:293)。阿帕奇。Kafka。连接运行时。工人startTask(Worker.java:442)位于org。阿帕奇。Kafka。连接运行时。分发。分配牧人。startTask(DistributedHerder.java:1147)位于org。阿帕奇。Kafka。连接运行时。分发。分配牧人。在org上访问$1600(DistributedHerder.java:126)。阿帕奇。Kafka。连接运行时。分发。分配给赫尔德12美元。在org上调用(DistributedHerder.java:1162)。阿帕奇。Kafka。连接运行时。分发。分配给赫尔德12美元。在java上调用(DistributedHerder.java:1158)。base/java。util。同时发生的未来任务。在java上运行(FutureTask.java:264)。base/java。util。同时发生的线程池执行器。java上的runWorker(ThreadPoolExecutor.java:1128)。base/java。util。同时发生的线程池执行器$Worker。在java上运行(ThreadPoolExecutor.java:628)。base/java。朗。丝线。运行(Thread.java:834)
之前,我复制了3 jar文件到Kafka插件路径:
为了解决这个问题,我做了以下:从插件路径中删除Blueapron jar文件,并通过在我的pom文件中添加以下内容作为依赖项,创建了我自定义jar文件的uber jar:
<dependency>
<groupId>com.sclasen</groupId>
<artifactId>kafka-connect-protobuf-converter</artifactId>
<version>2.0.1</version>
</dependency>
把我的uber罐和mongo连接器罐放在插件路径中。
我正在执行Java-Eclipse-TestNG,但在使用testng命令行运行测试时遇到了困难。我有多个源文件夹,当创建。class文件时,它们将转到/bin目录,文件夹结构与/src相同,如下所示。我已经设置了类路径(使用导出classpath=...)并尝试运行“java org.testng.testng testng.xml”,但它给我的结果是“不能在类路径中找到类:testcases.
问题内容: 我不太确定是什么原因导致的,因为清单中正确列出了它: 我还添加了来构建路径,并将其移至的顶部,但是它仍然给我同样的错误。 编辑:我完全重建了该项目,并且无法重现该错误。不知道是什么原因造成的。 问题答案: 我的项目有同样的问题。这是由于我的项目与我在项目中添加的库项目之间的android支持库版本冲突而发生的。将相同版本的android支持库放入您的项目和包含的库项目中,并清理构建 …
问题内容: 我目前正在一个项目中,我必须使用纯本地ndk。当我尝试从Irrlicht引擎源运行helloworld示例时,它起作用了。然后,按照该示例的相同格式尝试在我的项目中使用它。但是我得到了: 在运行我的项目时。 这是我的main.cpp文件: n Android.mk: 我在AndroidManifest.xml中给了Activity名称: 我在这里做什么错?如有必要,我将发布完整代码。
我目前正在做一个项目,在这个项目中我必须使用纯原生的NDK。当我尝试从Irrlicht引擎源代码运行helloworld示例时,它起到了作用。然后我尝试在我的项目中使用它,遵循该示例的相同格式。但我得到: 在运行我的项目时。 并且我在AndroidManifest.xml中给出了活动名称: 我在这里犯了什么错误?如果需要,我会发布完整的代码。