我尝试使用 kafka 实现一个简单的生产者消费者示例,并使用以下属性实现了:
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:" + portNumber);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// Belirtilen property ayarlarına sahip kafka producer oluşturulur
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
然而,当我在另一个项目(html" target="_blank">数据可视化软件的插件)中尝试完全相同的配置时,我得到了以下错误:
.... // Here there is some other stuff but I thing the important one is the below one
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer
at App.MyControlPanel.<init>(MyControlPanel.java:130)
at App.CytoVisProject.<init>(CytoVisProject.java:29)
... 96 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.producer.Producer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 98 more
在我说它工作的第一个版本中,我使用了“mvn clean compile assembly:single”,但是在第二个版本中,我为整个项目创建了一个jar文件。因为可视化软件需要一个jar文件来安装插件。因为每件事都是一样的(至少我找不到任何不同,我用相同的代码),我想问题是关于建立项目的方式。这里发生了什么?“mvn clean compile assembly:single”和用IntelliJ构建jar文件有什么区别?为什么我会得到这个错误,如何解决这个问题?非常感谢你的帮助!
正如我在第一个答案的最后一条评论中所说,我有一个插件,它的目标是清单和转换。在这里:
<plugin>
<groupId>com.springsource.bundlor</groupId>
<artifactId>com.springsource.bundlor.maven</artifactId>
<version>1.0.0.M2</version>
<configuration>
<outputManifest>C:\Users\USER\AppData\Local\Temp\archetype2tmp/META-INF/MANIFEST.MF</outputManifest>
<failOnWarnings>false</failOnWarnings>
<removeNullHeaders>true</removeNullHeaders>
<manifestHeaders><![CDATA[Bundle-ManifestVersion: 2
Bundle-Name: CytoVisProject
Bundle-SymbolicName: CytoVisProject
Spring-DM-Version: ${pom.version}
]]></manifestHeaders>
</configuration>
<!-- generate the manifest automatically during packaging -->
<executions>
<execution>
<id>bundle-manifest</id>
<phase>package</phase>
<goals>
<goal>manifest</goal>
<goal>transform</goal>
</goals>
</execution>
</executions>
</plugin>
如果我使用如下所示的阴影插件:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
它不起作用,因为我需要在我的插件中使用清单和转换作为目标。如何将kafka类添加到IntelliJ创建的jar文件中以解决此问题(我不确定这是否可以解决)?
此外,请确保您没有对标记为提供范围的kafka-client的依赖项。
在编译时< code > Kafka-clients-0 . 10 . 0 . 0 . jar 是可用的,因此代码编译成功,但在运行时它丢失了,因此出现了错误。
你有两个选择:
我找到了一个更简单的解决方案。我已经改变了
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
对此
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
之后,我的代码也作为预编译插件的一部分运行。
我创建了一个简单的Kafka生产者 启动捆绑包时,遇到以下错误: 我已尝试设置和如下所示: 也喜欢,但还是出现了同样的错误。我在这里做错了什么。
我们的生产环境中出现了随机的和: 我们偶尔会在我的生产者日志中得到这个异常: 主题:XXXXXX:5608 ms的过期记录自批量创建加上逗留时间以来已经过去。 此类错误消息中的毫秒数不断变化。有时是5秒,有时是13秒! 我们很少能得到: 集群由3个经纪人和3个动物园管理员组成。生产者服务器和Kafka集群在同一个网络中。 我在打同步电话。有一个web服务,多个用户请求调用它来发送数据。Kafka
一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod
Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?
Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用
我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会