当前位置: 首页 > 知识库问答 >
问题:

卡拉夫 - Kafka OSGI 捆绑包 - 制片人问题

长孙鸿波
2023-03-14

我正在尝试为Kafka制作人创建一个简单的捆绑包,在apache Karaf版本4.0.3中。

这是我的Java代码

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");
Producer<String, String> producer = new KafkaProducer<String,String>(props,new StringSerializer(),new StringSerializer());
//for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("test","data", outputData));

producer.close();

我已经在pom.xml中明确声明了各自的依赖关系

<dependency>
        <groupId>org.apache.servicemix.bundles</groupId>
        <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
        <version>0.9.0.0_1</version>
</dependency>

我也部署了那个kafka客户端包。

但在启动生成器时,我看到以下第一次尝试时的异常。

Exception in thread "pool-135-thread-1" java.lang.ExceptionInInitializerError
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
    .
    .
    .
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.clients.producer.internals.DefaultPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.DefaultPartitioner could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:78)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:94)
    at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:206)
    ... 12 more

然后连续这个。。。

Exception in thread "pool-136-thread-1" java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
.
.
.
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745

有没有人对捆绑包提出类似的问题?

共有3个答案

公良天逸
2023-03-14

也许我对其他人有用,建议在JIRA工作

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
夹谷俊远
2023-03-14

使用Kafka客户端版本0.8.2.2_1,解决了问题。

叶健柏
2023-03-14

我在0.9.0中看到了同样的问题。事实证明设置了一个线程上下文加载器,在这种情况下,Kafka使用该类加载器来解决。所以线程上下文类加载器应该是:

    < li >可以解析所有Kafka相关内容的类加载器 < li> null

不知道这是否会伤害我,但补充道:

Thread.currentThread().setContextClassLoader(null);

成功了。

 类似资料:
  • 问题内容: 编辑:Symfony最佳做法回答了我的大部分问题。 关于我的Symfony2应用程序,我有几个问题。 它将有一个前端和一个后端,并且它们将使用一些通用代码(例如日期显示器,分页器,一些经常使用的模板等)。 因此,我创建了一个FrontendBundle和一个BackendBundle,它们分别包含各自的布局。第一个问题:为前端和后端创建捆绑包(这是甚至没有控制器的“通用”捆绑包)的优良

  • 我正在从log4j-slf4j-impl 2.12.0升级到2.17.1。在我之前的代码中,除了 现在我已经升级到2.17.1,我得到了以下错误: org . Apache . Felix . resolver . reason . reason异常:无法解析org . Apache . logging . log4j . slf4j-impl/2 . 17 . 1:缺少需求[org . Apac

  • 我有一个包含多个按钮的活动,当单击一个按钮时,将打开一个包含两个片段的新活动。 我试图根据按下的按钮在其中一个片段中显示一个回收器视图。问题是捆绑包为空,所以回收器视图不显示。 捆绑包在<code>onAttach 碎片 公共类MyFragment扩展片段{

  • 设置 多个独立的源系统将AVRO事件推送到Kafka主题中。KafkaS3接收器连接器从本主题读取AVRO事件,并写入S3拼花格式。 问题 我们的架构注册表中的 AVRO 架构不符合标准。例如,源系统中的十进制字段在架构注册表中具有基类型字符串和逻辑类型十进制。AVRO 中不允许这些类型的组合(十进制逻辑类型必须始终具有基本类型修复/字节。 这些不正确的AVRO模式导致不正确的拼花文件模式。E、

  • 我编写了一个定制的NiFi处理器,它使用一些Hadoop类,处理流文件,并在Avro之间序列化流文件。 处理器的pom.xml文件如下所示: 因为我已经将标记为,所以它不会绑定在生成的NAR文件中。现在,我可以做一个快速修复,完全删除作用域并创建NAR,但NiFi会抱怨next class not found错误。 我想知道:

  • 我试图在Karaf 3.0.0-RC1中使用H2数据库加载来获取Scala库,但我遇到了这个错误 有人知道我需要在POM和/或功能中添加什么吗。xml来让它工作吗? 谢谢,鲍勃