当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Streams应用程序抛出嵌套异常是java.lang.域模型类的ClassCastExc的

呼延鸿畅
2023-03-14

嗨,我正在尝试最新的 Kafka Spring云流框架。但是,对于字符串和双精度,它工作正常,但是当我尝试发送Java POJO类时,它会抛出以下异常。

我已经尝试了各种序列化和反序列化配置,但似乎没有任何效果。我能够以json形式从供应商处生成消息,但由于错误,消费者无法处理它。

对这个问题的任何建议将不胜感激。谢谢

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9), failedMessage=GenericMessage [payload=byte[72], headers={kafka_offset=804, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@24ee2e4c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=random-topic-2, kafka_receivedTimestamp=1624778145945, kafka_groupId=mypublish-reader-group, target-protocol=kafka}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2371) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2339) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2300) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2214) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2021) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1703) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398) ~[spring-integration-kafka-5.5.1.jar!/:5.5.1]

以下是POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sample</groupId>
    <artifactId>message</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>message</name>
    <description>Spring messaging sample</description>
    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2020.0.3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

下面是应用程序

@Bean
public Supplier<Grade> random() {
    return () -> {
        System.out.println("=========In Supplier --------------------");
        return new Grade(UUID.randomUUID().toString(),Math.random()*100);
    };
}

@Bean
public Function<Grade, String> publish() {
    System.out.println("@@@@@@@@@@@@@@In Function user--------------------");
    return user -> "User:: " + user.toString();
}

@Bean
public Consumer<String> log() {
    System.out.println("++++++++++++In log --------------------");
    return s -> {
        System.out.println("Received>> " + s);
    };
}

应用程序.yml

spring:
  application:
    name: kafka-messaging
  json:
    value:
      default:
        type: com.msg.Grade
    trusted:
      packages: com.msg

  cloud:
    function:
      definition: random;publish;log
    stream:
      bindings:
        random-out-0:
          destination: random-topic-2

        publish-in-0:
          destination: random-topic-2
          group: mypublish-reader-group
        publish-out-0:
          destination: message-topic-2

        log-in-0:
          destination: message-topic-2
          group: message-reader-group

      kafka:
        binder:
          brokers: localhost:9092

共有1个答案

司马项明
2023-03-14

最后我发现了问题所在。我觉得非常愚蠢,因为问题是在模型类中没有默认的无参数构造函数。添加了无参数构造函数后,一切都按预期工作。

 类似资料:
  • 我在sts中创建了一个maven项目,并试图为该项目创建一个war文件。我还为我的类配置了querydsl。当我运行maven install时,一个类给我一个错误响应。 错误:创建在文件[C:\ Users \ Vince \ Documents \ workspace-spring-tool-suite-4-4 . 0 . 1 . release \ project \ target \ te

  • 我是jUnit和mockito的新手。对于虚空的嘲弄是如何发挥作用的,我完全感到困惑。在这里,如果名称是“hello”,则函数抛出一个异常。但当我测试它时,它并没有抛出异常... 测试 }

  • 我使用了这个config允许将嵌套对象解析为json: 它似乎确实起到了作用,因为带有嵌套对象的工作正常。然后我尝试从json创建一个类实例https://pub.dev/packages/json_serializable: 我得到一个例外: 抛出的异常:类型'_InternalLinkedHashMap 这是的值: 红色覆盖的值是字符串。当和为时,没有错误。 我认为这是因为是嵌套的JsonSe

  • 你看?我应该编写多少额外的代码来向Api客户端返回正确的响应?此外,我可以忘记捕获一些可以正确报告的异常,它将作为默认的内部服务器异常返回。在controller中,我总是应该查看服务层,并检查哪些异常可以引发服务以正确处理它们。(请不要建议在java中检查异常)。 现在让我们来看看另一种解决方案: 在服务层(模型)中使用HttpStatus引发异常 就这样了。代码更少。现在我不必处理每一个可能引

  • 问题内容: 我试图在Netbeans中重构一个大型程序,但我有点迷茫。我从来没有非常模块化,但是现在通过实际学习如何做到这一点来尝试纠正这种情况,并在将来纠正这种情况。不幸的是,我在将某些教程翻译成我的程序时遇到了麻烦。所以我希望这里有人可以帮忙。目前,我正在尝试分解一部分采用特定格式的文件并制成表格的代码。我知道我需要创建一个类并使用它来创建表对象,但是我不确定如何做。我有一个主文件,用于获取文

  • 我有一个带有Yarn的Flink集群,使用flink-quickstart-java原型构建一个演示项目。在使用'mvn clean package-pbuild-jar'命令构建fat-jar之后,并使用'flink run-m yar-cluster-yn2./flink-Snapshot-1.0.jar'提交程序,程序会抛出以下异常: 下面是我的演示: 和一些版本信息: FLink版本:1.