当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream kafka流应用程序在反序列化异常时关闭

孟栋
2023-03-14

我正在使用

Spring靴:2.3.5.释放Spring云:Hoxton.SR8

我正在尝试Spring云流Kafka流应用程序。一切都运行良好,直到出现反序列化异常。应用程序每次都会关闭。

我想跳过不良记录,在Kafka主题中前进。但我无法实现这一点。配置

spring:
  application:
    name: statsprocessor.${ENV}.${INSTANCE_ID}
  cloud:
    stream:
      instance-index: ${INSTANCE_INDEX}
      instance-count: ${INSTANCE_COUNT}
      bindings:
        statsInput:
          destination: ${STORE_INPUT_TOPIC}
          group: statsprocessor.${ENV}
          consumer:
            concurrency: ${CONCURRENCY}
            partitioned: true
            useNativeDecoding: true
      kafka:
        streams:
          bindings:
            statsInput:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: per.shades.framework.kafka.serdes.CotsEventSerde
                startOffset: earliest
                applicationId: statsprocessor.${ENV}
                autoCommitOnError: false
                dlqName: ${STORE_INPUT_DLQ}
                useNativeDecoding: true
                configuration:
                  client.id: statsprocessor.${ENV}.${INSTANCE_ID}
          binder:
            auto-add-partitions: true
            auto-create-topics: true
            deserializationExceptionHandler: logAndContinue
            brokers:
              - ${KAFKA_URI}
            configuration:
              num.stream.threads: ${CONCURRENCY}
              buffered.records.per.partition: 500
              cache.max.bytes.buffering: 10485760
              commit.interval.ms: 500
              state.dir: ${KAFKA_STATE_DIR}
              replication.factor: ${DEFAULT_KAFKA_TOPIC_REPLICATION_FACTOR}
              reconnect.backoff.ms: 15000
              retry.backoff.ms: 10000
              producer.linger.ms: 100
              producer.acks: all
              producer.retries: 3
              producer.batch.size: 16384
              consumer.max.poll.records: 100
              consumer.session.timeout.ms: 60000

我得到的错误是

    Exception in thread "statsprocessor.local.1-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Exception caught in process. taskId=0_4, processor=KSTREAM-SOURCE-0000000000, topic=cots-event-store, partition=4, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException:
 ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. 
Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: per.shades.model.events.CotsEvent.

Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    ....
    ....
    Caused by: java.lang.ClassCastException: class per.shades.model.events.CotsEvent cannot be cast to class per.shades.model.stats.StatsMetadata (per.shades.model.events.CotsEvent and per.shades.model.stats.StatsMetadata are in unnamed module of loader 'app')

现在我正在使用这个设置反序列化ExceptionHandler: logAnd继续。它仍然没有效果。根据留档,它应该简单地记录错误并继续处理。即它应该跳过不良记录。但这并没有发生。看到这个错误。

< code >所有流线程都已死亡。该实例将处于错误状态,应该关闭。

我还使用了

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer()
{
    return streamsBuilderFactoryBean ->
    {
        streamsBuilderFactoryBean.getStreamsConfiguration()                    .put(org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                        ContinueOnErrorHandler.class);
    };
}

处理程序类是

public class ContinueOnErrorHandler implements DeserializationExceptionHandler
{

    @Override
    public DeserializationHandlerResponse handle(ProcessorContext processorContext, ConsumerRecord<byte[], byte[]> consumerRecord, Exception e)
    {
        System.out.println(">>>>>>> We are here");
        return DeserializationHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(Map<String, ?> map)
    {

    }
}

但这也不管用。它不会被召唤。

我不想删除我的Kafka主题以摆脱不良记录。真的很难解决简单的反序列化错误。请帮助!!

编辑:绑定代码:

@Configuration
public interface StatsStreamBindings
{
    String statsInput = "statsInput";

    @Input(statsInput)
    KStream<String, StatsMetadata> statsInput();

}

处理器签名

public void aggregateStats(KStream<String, StatsMetadata> inputStream)

共有1个答案

漆雕稳
2023-03-14

由以下原因引起:java.lang.ClassCastException:class per.shades.model.events.CotsEvent 不能强制转换为类 per.shades.model.stats.StatsMetadata (per.shades.model.events.CotsEvent 和 per.shades.model.stats.StatsMetadata 位于加载器“app”的未命名模块中)

您得到的不是< code >反序列化异常,而是< code>ClassCastException调用处理器;这意味着反序列化程序已成功反序列化为< code>CotsEvent,但您的处理器需要< code>StatsMetadata。

DeserializationExceptionHandler仅处理DeserializationExceptions。

 类似资料:
  • 试图在Java中使用protobuf反序列化消息,并得到以下异常。 原因:com.google.protobuf.InvalidProtocolBufferException:在解析协议消息时,输入意外地在字段中间结束。这可能意味着输入被截断,或者嵌入的消息错误报告了自己的长度。在com.google.protobuf.InvalidProtocolBufferException.Truncate

  • 我在使用Jackson1.9.13(和Jackson2.5.0)进行序列化/反序列化时遇到了一个问题,现在已经解决了几天了,没有任何成功。 我的目标是使用@jsonanygetter&@jsonanysetter,我想动态地计算对象是否应该写入输出。我有一个用ObjectMapper序列化的JSON定义(并检查是否应该包含对象),然后将对象转换回字符串。 为此,我使用了一个“HidableSeri

  • 在反序列化包含JSON序列化LocalDate对象的JSON字符串时,我看到了一个异常(有关JSON片段,请参阅本问题末尾)。 这就是我反序列化的方式: 我看到以下异常消息: 下面是我试图反序列化的JSON片段:

  • 我们正在尝试使用具有net core后端的流,但出现反序列化错误 协议是版本3.14.0 grpc-web-gen是1.2.1 生成客户端的命令是:协议-我=。/原型。/原型/*. proto-js_out=import_style=通用js,二进制:。/dist--grpc-web_out=import_style=通用js dts,模式=grpcwebtext:。/dist 我们成功连接到en

  • 有没有一种方法让我忽略这些异常并在消费者处移动偏移量?我想,因为我使用手动偏移提交,我有这个问题。有人知道如何配置kafka-avro-serializer-6.0.0.jar来完成我想要的任务吗? 多谢了。

  • 我反映了JSON.NET JavaScriptDateTimeConverter类代码,复制了它,并将该类重命名为3DateTimeConverter,以便修改它以更精确和强类型的庄园格式化DateTime对象。 我让它根据JSON.NET输出强类型对象的方式输出一个类型,如: 运行JsonConverter的重写WriteJson方法来生成该值。 但是,当我尝试使用与相同转换器完全相同的设置反序