public void saveMessage(final IMMessage message) {
for (NormalizedEvent event :
message.getNormalizedEvents()) {
event.setServiceId(message.getServiceId());
ProducerRecord<String, NormalizedEvent> producerRecord = buildProducerRecord(null, event, TOPIC);
ListenableFuture<SendResult<String, NormalizedEvent>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, NormalizedEvent>>() {
@Override
public void onFailure(Throwable ex) {
handleFailure(null, event, ex);
}
@Override
public void onSuccess(SendResult<String, NormalizedEvent> result) {
properties.put("partitionId", result.getRecordMetadata().partition());
properties.put("offsetId", result.getRecordMetadata().offset());
handleSuccess(null, event, result);
imMessageDBService.setDBProperties(event, properties);
}
});
}
}
private ProducerRecord<String, NormalizedEvent> buildProducerRecord(String key, NormalizedEvent value, String topic) {
return new ProducerRecord<>(topic, key, value);
}
@Bean
public ConsumerFactory<String, NormalizedEvent> userConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(NormalizedEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> userKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) [spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) [spring-kafka-2.7.0.jar:2.7.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition SAP-S_4-HANA-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) ~[spring-kafka-2.7.0.jar:2.7.0]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) [spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) [spring-kafka-2.7.0.jar:2.7.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.0.jar:2.7.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
原因:java.lang.IllegalArgumentException:类'com.sap.Innovision.SpringKafkaProducer.Model.NormalizedEvent'不在受信任的包中:[java.util,java.lang,com.Innovision.Consumer.Model,com.Innovision.Consumer.Model.]。如果您认为这个类可以安全地反序列化,请提供它的名称。如果序列化仅由受信任的源执行,则还可以启用trust all()。
看起来事件在生产者和消费者中处于不同的包中。
默认情况下,反序列化程序在标头中使用类型信息,如果没有类型标头,则使用泛型参数作为回退。
在反序列化器上有两种解决方案:
/**
* Set to false to ignore type information in headers and use the configured
* target type instead.
* Only applies if the preconfigured type mapper is used.
* Default true.
* @param useTypeHeaders false to ignore type headers.
* @since 2.2.8
*/
public void setUseTypeHeaders(boolean useTypeHeaders) {
if (!this.typeMapperExplicitlySet) {
this.useTypeHeaders = useTypeHeaders;
setUpTypePrecedence(Collections.emptyMap());
}
}
或在序列化程序上:
/**
* Set to false to disable adding type info headers.
* @param addTypeInfo true to add headers.
* @since 2.1
*/
public void setAddTypeInfo(boolean addTypeInfo) {
this.addTypeInfo = addTypeInfo;
}
请参见文档和Javadocs。
如果我运行的Kafka集群的分区比我的单个消费者组拥有的消费者还多。对消息的排序或跨分区的消息的按时传递是否有任何保证? 简单示例: 2个分区,1个使用者 生产者通过一个密钥控制分区分配。 消息1进入并转到分区a 消息2进入并转到分区B 消息3进入并转到分区a 我知道消息1将在消息3之前被使用,因为它们在同一个分区中。但是第二条信息呢?是在消息3之前消费还是在消息3之后消费?还是会有变化?它可能在
我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。
我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。
我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?
拥有发布者和N个消费者,如果消费者使用,那么他们将错过订阅主题之前发布到主题的所有消息...众所周知,使用的消费者不会重播订阅主题之前存在的消息... 所以我需要: null 我想使用者必须检查现有消息的主题,如果有消息就使用它们,然后启动使用。对我来说这是最好的方法...
我已经启动了我的zookeeper和Kafka服务器。我开始制作Kafka,它发送10条主题为“xxx”的消息。然后阻止了我的Kafka制作人。现在我开始使用Kafka,并订阅了主题“xxx”。我的消费者使用我的Kafka制作人发送的10条消息,这10条消息现在没有运行。我需要我的Kafka使用者只能使用来自运行Kafka服务器的消息。有没有办法做到这一点?以下是我的消费者财产。