当前位置: 首页 > 知识库问答 >
问题:

kafka连接合流elasticsearch接收器(未找到类错误)

祁嘉木
2023-03-14

我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases)

我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时,我会出现此错误

。/usr/bin/connect-standalone etc/schema-registry/connect-avro-standalone . properties etc/Kafka-connect-elastic search/quick start-elastic search . properties

[2016-11-13 00:05:38,768] ERROR Task elasticsearch-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) java.lang.NoClassDefFoundError:io/searchbox/client/JestClientFactory
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:81)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:52)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at o

NoClassDefFoundError:io/search box/client/JestClientFactory

我检查了pom。xml,并且正确定义了Jest客户端依赖关系。我错过了什么吗?

任何建议都将非常感谢。

谢谢,拉杰什

共有2个答案

汤枫涟
2023-03-14

你也可以建造脂肪罐。添加描述符参考

<build>
  <plugins>
  ....
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
      ....
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
      </configuration>
      ....
    </plugin>
  </plugins>
  ....
</build>
曾嘉瑞
2023-03-14

类路径中似乎确实缺少其中一个依赖项。Github 发布页面上的包不包含依赖项。

我建议使用ConFluent开源发行版并遵循快速入门。

 类似资料:
  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

  • 使用此Kafka Connect连接器: https://www.confluent.io/hub/confluentinc/kafka-connect-s3 我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。 在运行时,使用Kafka Connect,我会得到以下错误: 在ConFluen

  • 我正在尝试设置Kafka Connect接收器,以便使用Datastax连接器将主题中的数据收集到Cassandra表中:https://downloads.Datastax.com/#AKC 运行一个直接在代理上运行的独立worker,运行Kafka 0.10.2.2-1: 但我有以下错误: 卡桑德拉或Kafka方面没有额外的错误。我在cassandra节点上看到活动连接,但没有任何东西到达密钥

  • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?

  • 我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。 用于独立连接器的命令,工作正常 - 现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示 2 个节点当前正在运行具有相同连接服务。 连接服务已启动并正在运行,但它不会加载 下定义的连接器。 应该对