错误堆栈:
2019-10-25 16:13:40.762 ERROR 4628 --- [ XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits : Internal Server Error
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
....
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
这里列出的是Spring Cloud Stream设置
cloud:
stream:
bindings:
greetings-in:
destination: greetings
#content-type: application/json
content-type: text/plain
greetings-out:
destination: greetings
#content-type: application/json
content-type: text/plain
生产者设置
2019-10-25 16:12:28.346 INFO 4628 --- [ restartedMain] com.ll.kafkaservice.KafkaServiceApp : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [ restartedMain] c.l.k.aop.logging.LoggingAspect : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377 INFO 4628 --- [ restartedMain] c.l.k.service.KafkaServiceKafkaProducer : Kafka producer initializing...
2019-10-25 16:12:28.378 INFO 4628 --- [ restartedMain] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2019-10-25 16:12:28.399 INFO 4628 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.0
这里列出了包含消息的对象
package com.ll.kafkaservice.messaging;
import java.io.Serializable;
public class Greeting implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public Greeting() {
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String toString() {
StringBuffer sbuffer = new StringBuffer();
sbuffer.append("{");
sbuffer.append("message:");
sbuffer.append(message);
sbuffer.append("}");
return sbuffer.toString();
}
}
定义流
package com.ll.kafkaservice.greeting;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface GreetingsStreams {
String INPUT = "greetings-in";
String OUTPUT = "greetings-out";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}
捆住溪流
package com.ll.kafkaservice.config;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {
}
package com.ll.kafkaservice.greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
import com.ll.kafkaservice.messaging.Greeting;
@Service
public class GreetingsService {
private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
private final GreetingsStreams greetingsStreams;
private MessageChannel messageChannel;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greeting greeting) {
messageChannel = greetingsStreams.outboundGreetings();
log.info("Before send {}", greeting.toString());
messageChannel.send(MessageBuilder
// Sends a string to payload not the object
.withPayload(greeting.getMessage())
// Note: tried this with and without the header
//.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build());
}
}
消费/接收消息
package com.ll.kafkaservice.greeting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.ll.kafkaservice.messaging.Greeting;
@Component
public class GreetingsListener {
private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greeting greetings) {
log.info("Received greetings: {}", greetings.getMessage());
}
//@StreamListener(GreetingsStreams.INPUT)
//public void handleGreetings(String greetings) {
// log.info("Received greetings: {}", greetings);
//}
}
默认情况下,SCSt框架将有效负载转换为byte[]
并使用ByteArraySerializers
。
由于您已将绑定配置为使用自定义序列化器,因此必须将UseNativeEncoding
设置为true
。请参阅生成器属性。
使用编码
当设置为true时,出站消息由客户端库直接序列化,客户端库必须进行相应的配置(例如,设置适当的Kafka生产者值序列化程序)。使用此配置时,出站消息封送不基于绑定的contentType。当使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka使用者值反序列化器)来反序列化入站消息。此外,当使用本机编码和解码时,HeaderMode=EmbeddedHeaders属性将被忽略,消息中不嵌入头。请参阅consumer属性UseNativeDecoding。
但是,如果要发送POJO,您将需要使用JSONSerializer
而不仅仅是字符串序列化器。
是不是有什么原因你不依赖框架来为你做转换?
问题内容: 我正在尝试使用protobuf序列化结构。经过许多小时试图弄清楚我在做什么错,我决定测试google的示例,但效果不佳 我从Google(https://developers.google.com/protocol- buffers/docs/javatutorial )获得以下协议: 我正在尝试将其序列化: byte []序列化= john.toByteArray(); 我得到“ j
在任何人试图告诉我这是一个重复之前,我已经试过了关于这个的解决方案,以及几乎所有其他我能找到的答案。 我的项目在IntelliJ中构建得很好,通过了我设置的所有测试,它甚至在IntelliJ内部运行得也很好,当我试图运行maven构建的JAR时,除了几个例外,我就被打了个耳光。 代码所指向的类: 根据我看到的例外情况,当通过命令提示符运行时,问题出现在第31行,它将是: 命令提示符中的堆栈跟踪如下
ClassCastException:com.google.gson.internal.LinkedTreeMap不能强制转换为java.util.HashMap 提前谢了。
问题内容: 我试图反序列化以DateTime作为修饰符的类: 但是,当我尝试tro反序列化时,却遇到以下异常: 我用它来反序列化: 还有我的jsonData的示例: 问题答案: 期望使用无参数构造函数。的最新版本没有这样的构造函数。 如果您已固定格式,即。应该只是一个时间戳,那么你可以简单地注册与。它将在内部用于字段。您可以摆脱注释。 您需要添加库。
(我之前的问题/议题不够具体,因此完全重写了。) 问题:ServiceStack 4.0.16不适用于Redis。 重新创建的步骤: 在VS 2013中创建一个新的ASP. NET Web表单应用程序。 更新Nuget包 安装Nuget包= 容器登记册(c)= > 运行项目,你应该只得到一个空白屏幕在Chrome 导航到/元数据,你会得到一个通用的HTTP错误500.0-内部服务器错误 打开事件查
这是包含列表的POJO JSON具有以下结构: 运行时,web服务的结果运行良好,但反序列化会打印此错误:
我有以下课 以及以下测试: 我收到以下错误: com.fasterxml.jackson.databind.exc.MismatchedInputException:无法构造com.store.domain.model.Cart实例(尽管至少存在一个Creator):无法从[Source:(String)"{"id":"56c7b5f7-115b-4cb9-9658-acb7b849d5d5"}"
我尝试调用一个url谁接受列表。 发送的数据为 “{”时间戳“:1445958336633,”状态“:400,”错误“:”错误请求“,”异常“:”org.springframework.http.converter.httpmessagenotreadableException“,”消息“:”无法读取文档:无法反序列化START_OBJECT令牌之外的java.util.arraylist实例\n