我对Kafka很陌生。我正在尝试发送一个消息到Kafka主题,其中包含头和有效载荷。
以下是错误:
"org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.cabservice.request.CabLocationPayload to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer\nCaused by: java.lang.ClassCastException: class com.cabservice.request.CabLocationPayload cannot be cast to class java.lang.String
public class CabLocationPayload {
private Header header;
private Payload payload;
@PostMapping(value=“/publish”)public void sendMessageToKafkaTopic(@RequestBody CabLocationPayload CabLocationPayload){
Header和Payload具有JSON的映射字段。
在Producer中更改VALUE_SERIALIZER_CLASS_CONFIG后,我可以看到数据。但ClassCastException仍然失败。
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
当前错误为{2 09:41:20.108信息22561--[ad producer-1]org.apache.kafka.clients.metadata:[ProducerClientID=producer-1]群集ID:lWghv-b_RG-_hO-qOp_cjA 2021-04-22 09:41:20.123错误22561--[nio-9080-exec-2]o.a.cc.[.[.[/].[dispatcherServlet]:路径为[]的上下文中servlet[dispatcherServlet]的servlet.Service()引发异常[请求处理失败;嵌套异常为
java.lang.ClassCastException:类com.cabservice.request.cabLocationPayload不能强制转换为类java.lang.String(com.cabservice.request.cabLocationPayload位于加载程序org.springframework.boot.devtools.restart.ClassLoader.restartClassLoader@1144043d的未命名模块中;java.lang.String位于加载程序“bootstrap”的模块java.base中),位于
任何帮助都是非常感谢的。
Kafka配置“value.Serializer”配置应该是序列化程序子类,而不是您的对象类型
例如:
键:VALUE_SERIALIZER_CLASS_CONFIG,值:jsonserializer.class(来源:org.springframework.kafka.support.serializer)
@EnableKafka
@Configuration
public class KafkaProducerConfiguration {
@Bean
KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(getConfig());
}
private Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
}
您必须将您的类替换为您想要消耗的类名。(对于本例:CabLocationPayload)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfiguration {
private Map<String, Object> consumerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id");
return props;
}
@Bean
public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() {
final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false));
return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.kafkaListenerConsumerFactory());
return factory;
}
}
我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)
问题内容: 我需要转换从REST API获得的JSON数据,并将其转换为CSV以便进行分析。问题在于JSON数据不一定遵循相同的内容,因此我无法定义映射类型。这已经成为一项挑战,占用了我太多时间。我已经创建了一些代码,但是由于它在此行上引发了异常,因此它当然不起作用 错误是: 附加信息:无法将当前JSON对象(例如{“ name”:“ value”})反序列化为类型’System.Collecti
我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为1920
我有一个非常大的JSON文件太大,无法使用ObjectMapper.read值()到JsonNode的问题。我想在Android上使用杰克逊库解析大型JSON时使用内存不足错误的解决方案,除了JSON文件是一个具有字段名称的单个对象,因此我无法创建一个模型POJO来反序列化。 对象内部的每个属性都有相同的格式,我可以忽略这些内部对象的许多属性(我已经有了一个POJO类来建模)。如果JSON文件是一
错误: java.lang.ClassNotFoundException:testprocedure.tp$3在java.net.URLClassLoader$1上运行(未知源)在java.net.URLClassLoader上运行(未知源)在java.security.accessController.doprivileged(本机方法)在java.net.URLClassLoader.find
问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ