当前位置: 首页 > 知识库问答 >
问题:

通过Kafka使用者使用消息时出错

邹英光
2023-03-14
    public void saveMessage(final IMMessage message) {

        for (NormalizedEvent event :
                message.getNormalizedEvents()) {

            event.setServiceId(message.getServiceId());

            ProducerRecord<String, NormalizedEvent> producerRecord = buildProducerRecord(null, event, TOPIC);
            ListenableFuture<SendResult<String, NormalizedEvent>> listenableFuture = kafkaTemplate.send(producerRecord);
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, NormalizedEvent>>() {

                @Override
                public void onFailure(Throwable ex) {
                    handleFailure(null, event, ex);
                }

                @Override
                public void onSuccess(SendResult<String, NormalizedEvent> result) {
                    properties.put("partitionId", result.getRecordMetadata().partition());
                    properties.put("offsetId", result.getRecordMetadata().offset());
                    handleSuccess(null, event, result);
                    imMessageDBService.setDBProperties(event, properties);
                }
            });

        }
    }

    private ProducerRecord<String, NormalizedEvent> buildProducerRecord(String key, NormalizedEvent value, String topic) {
        return new ProducerRecord<>(topic, key, value);
    }
 @Bean
    public ConsumerFactory<String, NormalizedEvent> userConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(NormalizedEvent.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> userKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, NormalizedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        return factory;
    }
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1598) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) [spring-kafka-2.7.0.jar:2.7.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition SAP-S_4-HANA-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.sap.innovision.springkafkaproducer.model.NormalizedEvent' is not in the trusted packages: [java.util, java.lang, com.innovision.consumer.model, com.innovision.consumer.model.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:521) ~[spring-kafka-2.7.0.jar:2.7.0]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1410) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1249) [spring-kafka-2.7.0.jar:2.7.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) [spring-kafka-2.7.0.jar:2.7.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_291]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]

共有1个答案

沈单弓
2023-03-14

原因:java.lang.IllegalArgumentException:类'com.sap.Innovision.SpringKafkaProducer.Model.NormalizedEvent'不在受信任的包中:[java.util,java.lang,com.Innovision.Consumer.Model,com.Innovision.Consumer.Model.]。如果您认为这个类可以安全地反序列化,请提供它的名称。如果序列化仅由受信任的源执行,则还可以启用trust all()。

看起来事件在生产者和消费者中处于不同的包中。

默认情况下,反序列化程序在标头中使用类型信息,如果没有类型标头,则使用泛型参数作为回退。

在反序列化器上有两种解决方案:

/**
 * Set to false to ignore type information in headers and use the configured
 * target type instead.
 * Only applies if the preconfigured type mapper is used.
 * Default true.
 * @param useTypeHeaders false to ignore type headers.
 * @since 2.2.8
 */
public void setUseTypeHeaders(boolean useTypeHeaders) {
    if (!this.typeMapperExplicitlySet) {
        this.useTypeHeaders = useTypeHeaders;
        setUpTypePrecedence(Collections.emptyMap());
    }
}

或在序列化程序上:

/**
 * Set to false to disable adding type info headers.
 * @param addTypeInfo true to add headers.
 * @since 2.1
 */
public void setAddTypeInfo(boolean addTypeInfo) {
    this.addTypeInfo = addTypeInfo;
}

请参见文档和Javadocs。

 类似资料:
  • 如果我运行的Kafka集群的分区比我的单个消费者组拥有的消费者还多。对消息的排序或跨分区的消息的按时传递是否有任何保证? 简单示例: 2个分区,1个使用者 生产者通过一个密钥控制分区分配。 消息1进入并转到分区a 消息2进入并转到分区B 消息3进入并转到分区a 我知道消息1将在消息3之前被使用,因为它们在同一个分区中。但是第二条信息呢?是在消息3之前消费还是在消息3之后消费?还是会有变化?它可能在

  • 我在使用时遇到了困难,无法从开头或其他任何显式偏移量读取它。 为同一主题的使用者运行命令行工具,我确实看到带有选项的消息,否则它将挂起 我使用的是kafka-python 0.9.5,而代理运行的是Kafka8.2。不确定确切的问题是什么。 按照dpkp的建议设置_group_id=none_以模拟控制台使用者的行为。

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?

  • 拥有发布者和N个消费者,如果消费者使用,那么他们将错过订阅主题之前发布到主题的所有消息...众所周知,使用的消费者不会重播订阅主题之前存在的消息... 所以我需要: null 我想使用者必须检查现有消息的主题,如果有消息就使用它们,然后启动使用。对我来说这是最好的方法...

  • 我已经启动了我的zookeeper和Kafka服务器。我开始制作Kafka,它发送10条主题为“xxx”的消息。然后阻止了我的Kafka制作人。现在我开始使用Kafka,并订阅了主题“xxx”。我的消费者使用我的Kafka制作人发送的10条消息,这10条消息现在没有运行。我需要我的Kafka使用者只能使用来自运行Kafka服务器的消息。有没有办法做到这一点?以下是我的消费者财产。