当前位置: 首页 > 知识库问答 >
问题:

Kafka生产者类未找到异常

郎仰岳
2023-03-14

我尝试使用 kafka 实现一个简单的生产者消费者示例,并使用以下属性实现了:

Properties configProperties = new Properties();
    configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:" + portNumber);
    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

    // Belirtilen property ayarlarına sahip kafka producer oluşturulur
    org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);

然而,当我在另一个项目(html" target="_blank">数据可视化软件的插件)中尝试完全相同的配置时,我得到了以下错误:

.... // Here there is some other stuff but I thing the important one is the below one
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer
at App.MyControlPanel.<init>(MyControlPanel.java:130)
at App.CytoVisProject.<init>(CytoVisProject.java:29)
... 96 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Producer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 98 more

在我说它工作的第一个版本中,我使用了“mvn clean compile assembly:single”,但是在第二个版本中,我为整个项目创建了一个jar文件。因为可视化软件需要一个jar文件来安装插件。因为每件事都是一样的(至少我找不到任何不同,我用相同的代码),我想问题是关于建立项目的方式。这里发生了什么?“mvn clean compile assembly:single”和用IntelliJ构建jar文件有什么区别?为什么我会得到这个错误,如何解决这个问题?非常感谢你的帮助!

正如我在第一个答案的最后一条评论中所说,我有一个插件,它的目标是清单和转换。在这里:

<plugin>
            <groupId>com.springsource.bundlor</groupId>
            <artifactId>com.springsource.bundlor.maven</artifactId>
            <version>1.0.0.M2</version>
            <configuration>
                <outputManifest>C:\Users\USER\AppData\Local\Temp\archetype2tmp/META-INF/MANIFEST.MF</outputManifest>
                <failOnWarnings>false</failOnWarnings>
                <removeNullHeaders>true</removeNullHeaders>
                <manifestHeaders><![CDATA[Bundle-ManifestVersion: 2
Bundle-Name: CytoVisProject
Bundle-SymbolicName: CytoVisProject
Spring-DM-Version: ${pom.version}
]]></manifestHeaders>
            </configuration>
            <!-- generate the manifest automatically during packaging -->
            <executions>
                <execution>
                    <id>bundle-manifest</id>
                    <phase>package</phase>
                    <goals>
                        <goal>manifest</goal>
                        <goal>transform</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

如果我使用如下所示的阴影插件:

<plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.0</version>
            <configuration>

            </configuration>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

它不起作用,因为我需要在我的插件中使用清单和转换作为目标。如何将kafka类添加到IntelliJ创建的jar文件中以解决此问题(我不确定这是否可以解决)?

共有3个答案

惠泳
2023-03-14

此外,请确保您没有对标记为提供范围的kafka-client的依赖项。

段志
2023-03-14

在编译时< code > Kafka-clients-0 . 10 . 0 . 0 . jar 是可用的,因此代码编译成功,但在运行时它丢失了,因此出现了错误。

你有两个选择:

  • 在JAR中包含kafka JAR
  • 将kafka JAR添加到插件的类路径中
司马同
2023-03-14

我找到了一个更简单的解决方案。我已经改变了

kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

对此

kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

之后,我的代码也作为预编译插件的一部分运行。

 类似资料:
  • 我创建了一个简单的Kafka生产者 启动捆绑包时,遇到以下错误: 我已尝试设置和如下所示: 也喜欢,但还是出现了同样的错误。我在这里做错了什么。

  • 我们的生产环境中出现了随机的和: 我们偶尔会在我的生产者日志中得到这个异常: 主题:XXXXXX:5608 ms的过期记录自批量创建加上逗留时间以来已经过去。 此类错误消息中的毫秒数不断变化。有时是5秒,有时是13秒! 我们很少能得到: 集群由3个经纪人和3个动物园管理员组成。生产者服务器和Kafka集群在同一个网络中。 我在打同步电话。有一个web服务,多个用户请求调用它来发送数据。Kafka

  • 一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod

  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

  • Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用

  • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会