在我的简单应用程序中,我试图实例化一个KafkaConsumer,我的代码几乎是javadoc代码的副本(“自动偏移提交”):
@Slf4j
public class MyKafkaConsumer {
public MyKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe( Arrays.asList("mytopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
log.info( record.offset() + record.key() + record.value() );
//System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
如果我尝试实例化这个,我得到:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
...
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
... 48 more
这个怎么修
您的Custom类需要实现org.apache.kafka.common.serialization.Deserializer。
喜欢
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.Serializable;
import java.util.Map;
//Developed by Arun Singh
public class Employee implements Serializable, Serializer, **Deserializer** {
@Override
public Object deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
Employee employee = null;
try {
//employee = mapper.readValue(bytes, Employee.class);
employee = mapper.readValue(bytes.toString(), Employee.class);
} catch (Exception e) {
e.printStackTrace();
}
return employee;
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
ObjectMapper mapper = new ObjectMapper();
Employee employee = null;
try {
//employee = mapper.readValue(bytes, Employee.class);
employee = mapper.readValue(data.toString(), Employee.class);
} catch (Exception e) {
e.printStackTrace();
}
return employee;
}
public void close() {
}
}
这可能是Kafka类加载的问题
将类加载器设置为null可能会有所帮助。
...
Thread currentThread = Thread.currentThread();
ClassLoader savedClassLoader = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
currentThread.setContextClassLoader(savedClassLoader);
...
有完整的解释:
https://stackoverflow.com/a/50981469/1673775
不确定这是不是最终修复了您的错误,但请注意,在对1.1.x kafka客户端jar使用spring kafka测试(版本2.1.x,从版本2.1.5开始)时,您需要重写某些可传递依赖项,如下所示:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.1</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
所以你的可传递依赖肯定有问题
我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制
为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示 上面的代码将消息的处理异步委托给下面的另一个类。 但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?
我尝试在GitHub(https://github.com/onurtokat/kafka-clickstream-enrich)上模拟Gwen(Chen)Shapira的kafka-clickstream-rich kafka-stream项目。当我使用反序列化器使用消费者类消费一个主题时,我遇到了一个错误。自定义的Serde类有序列化器和反序列化器。但是,我试图理解为什么自定义serde用于反
我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但
对于这个示例: null 是否有类似这样的语句用于此检查?或者我应该使用
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所