嗨,我正在尝试最新的 Kafka Spring云流框架。但是,对于字符串和双精度,它工作正常,但是当我尝试发送Java POJO类时,它会抛出以下异常。
我已经尝试了各种序列化和反序列化配置,但似乎没有任何效果。我能够以json形式从供应商处生成消息,但由于错误,消费者无法处理它。
对这个问题的任何建议将不胜感激。谢谢
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9), failedMessage=GenericMessage [payload=byte[72], headers={kafka_offset=804, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@24ee2e4c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=random-topic-2, kafka_receivedTimestamp=1624778145945, kafka_groupId=mypublish-reader-group, target-protocol=kafka}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2371) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2339) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2300) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2214) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2021) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1703) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar!/:2.7.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9)
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar!/:5.3.8]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398) ~[spring-integration-kafka-5.5.1.jar!/:5.5.1]
以下是POM.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sample</groupId>
<artifactId>message</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>message</name>
<description>Spring messaging sample</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
下面是应用程序
@Bean
public Supplier<Grade> random() {
return () -> {
System.out.println("=========In Supplier --------------------");
return new Grade(UUID.randomUUID().toString(),Math.random()*100);
};
}
@Bean
public Function<Grade, String> publish() {
System.out.println("@@@@@@@@@@@@@@In Function user--------------------");
return user -> "User:: " + user.toString();
}
@Bean
public Consumer<String> log() {
System.out.println("++++++++++++In log --------------------");
return s -> {
System.out.println("Received>> " + s);
};
}
应用程序.yml
spring:
application:
name: kafka-messaging
json:
value:
default:
type: com.msg.Grade
trusted:
packages: com.msg
cloud:
function:
definition: random;publish;log
stream:
bindings:
random-out-0:
destination: random-topic-2
publish-in-0:
destination: random-topic-2
group: mypublish-reader-group
publish-out-0:
destination: message-topic-2
log-in-0:
destination: message-topic-2
group: message-reader-group
kafka:
binder:
brokers: localhost:9092
最后我发现了问题所在。我觉得非常愚蠢,因为问题是在模型类中没有默认的无参数构造函数。添加了无参数构造函数后,一切都按预期工作。
我在sts中创建了一个maven项目,并试图为该项目创建一个war文件。我还为我的类配置了querydsl。当我运行maven install时,一个类给我一个错误响应。 错误:创建在文件[C:\ Users \ Vince \ Documents \ workspace-spring-tool-suite-4-4 . 0 . 1 . release \ project \ target \ te
我是jUnit和mockito的新手。对于虚空的嘲弄是如何发挥作用的,我完全感到困惑。在这里,如果名称是“hello”,则函数抛出一个异常。但当我测试它时,它并没有抛出异常... 测试 }
我使用了这个config允许将嵌套对象解析为json: 它似乎确实起到了作用,因为带有嵌套对象的工作正常。然后我尝试从json创建一个类实例https://pub.dev/packages/json_serializable: 我得到一个例外: 抛出的异常:类型'_InternalLinkedHashMap 这是的值: 红色覆盖的值是字符串。当和为时,没有错误。 我认为这是因为是嵌套的JsonSe
你看?我应该编写多少额外的代码来向Api客户端返回正确的响应?此外,我可以忘记捕获一些可以正确报告的异常,它将作为默认的内部服务器异常返回。在controller中,我总是应该查看服务层,并检查哪些异常可以引发服务以正确处理它们。(请不要建议在java中检查异常)。 现在让我们来看看另一种解决方案: 在服务层(模型)中使用HttpStatus引发异常 就这样了。代码更少。现在我不必处理每一个可能引
问题内容: 我试图在Netbeans中重构一个大型程序,但我有点迷茫。我从来没有非常模块化,但是现在通过实际学习如何做到这一点来尝试纠正这种情况,并在将来纠正这种情况。不幸的是,我在将某些教程翻译成我的程序时遇到了麻烦。所以我希望这里有人可以帮忙。目前,我正在尝试分解一部分采用特定格式的文件并制成表格的代码。我知道我需要创建一个类并使用它来创建表对象,但是我不确定如何做。我有一个主文件,用于获取文
我有一个带有Yarn的Flink集群,使用flink-quickstart-java原型构建一个演示项目。在使用'mvn clean package-pbuild-jar'命令构建fat-jar之后,并使用'flink run-m yar-cluster-yn2./flink-Snapshot-1.0.jar'提交程序,程序会抛出以下异常: 下面是我的演示: 和一些版本信息: FLink版本:1.