当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream&Kafka:Kafka Producer序列化错误

仇正豪
2023-03-14

错误堆栈:

2019-10-25 16:13:40.762 ERROR 4628 --- [  XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits   : Internal Server Error

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		....
		
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)

这里列出的是Spring Cloud Stream设置

  cloud:
    stream:
      bindings:
          greetings-in:
              destination: greetings
              #content-type: application/json
              content-type: text/plain
          greetings-out:
              destination: greetings
              #content-type: application/json
              content-type: text/plain

生产者设置

2019-10-25 16:12:28.346  INFO 4628 --- [  restartedMain] com.ll.kafkaservice.KafkaServiceApp      : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [  restartedMain] c.l.k.aop.logging.LoggingAspect          : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377  INFO 4628 --- [  restartedMain] c.l.k.service.KafkaServiceKafkaProducer  : Kafka producer initializing...
2019-10-25 16:12:28.378  INFO 4628 --- [  restartedMain] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-10-25 16:12:28.399  INFO 4628 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0

这里列出了包含消息的对象

package com.ll.kafkaservice.messaging;

import java.io.Serializable;

public class Greeting  implements Serializable {
	private static final long serialVersionUID = 1L;
	
	private String message;

	
	public Greeting() {
	}

	
	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
	
	public String toString() {
		StringBuffer sbuffer = new StringBuffer();
		
		sbuffer.append("{");
		sbuffer.append("message:");
		sbuffer.append(message);
		sbuffer.append("}");
		
		return sbuffer.toString();
	}
}

定义流

package com.ll.kafkaservice.greeting;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

捆住溪流

package com.ll.kafkaservice.config;

import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;


@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {

}
package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import com.ll.kafkaservice.messaging.Greeting;


@Service
public class GreetingsService {
    private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
    
    private final GreetingsStreams greetingsStreams;
    
    private MessageChannel messageChannel;
    
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greeting greeting) {
        messageChannel = greetingsStreams.outboundGreetings();
        log.info("Before send {}", greeting.toString());
        messageChannel.send(MessageBuilder
        		// Sends a string to payload not the object
                .withPayload(greeting.getMessage())
                // Note:  tried this with and without the header
                //.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build());
    }
}

消费/接收消息

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.ll.kafkaservice.messaging.Greeting;



@Component
public class GreetingsListener {
    private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);

    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greeting greetings) {
        log.info("Received greetings: {}", greetings.getMessage());
    }
    
    //@StreamListener(GreetingsStreams.INPUT)
    //public void handleGreetings(String greetings) {
    //    log.info("Received greetings: {}", greetings);
    //}
}

共有1个答案

别宏盛
2023-03-14

默认情况下,SCSt框架将有效负载转换为byte[]并使用ByteArraySerializers

由于您已将绑定配置为使用自定义序列化器,因此必须将UseNativeEncoding设置为true。请参阅生成器属性。

使用编码

当设置为true时,出站消息由客户端库直接序列化,客户端库必须进行相应的配置(例如,设置适当的Kafka生产者值序列化程序)。使用此配置时,出站消息封送不基于绑定的contentType。当使用本机编码时,使用者有责任使用适当的解码器(例如,Kafka使用者值反序列化器)来反序列化入站消息。此外,当使用本机编码和解码时,HeaderMode=EmbeddedHeaders属性将被忽略,消息中不嵌入头。请参阅consumer属性UseNativeDecoding。

但是,如果要发送POJO,您将需要使用JSONSerializer而不仅仅是字符串序列化器。

是不是有什么原因你不依赖框架来为你做转换?

 类似资料:
  • 问题内容: 我正在尝试使用protobuf序列化结构。经过许多小时试图弄清楚我在做什么错,我决定测试google的示例,但效果不佳 我从Google(https://developers.google.com/protocol- buffers/docs/javatutorial )获得以下协议: 我正在尝试将其序列化: byte []序列化= john.toByteArray(); 我得到“ j

  • 在任何人试图告诉我这是一个重复之前,我已经试过了关于这个的解决方案,以及几乎所有其他我能找到的答案。 我的项目在IntelliJ中构建得很好,通过了我设置的所有测试,它甚至在IntelliJ内部运行得也很好,当我试图运行maven构建的JAR时,除了几个例外,我就被打了个耳光。 代码所指向的类: 根据我看到的例外情况,当通过命令提示符运行时,问题出现在第31行,它将是: 命令提示符中的堆栈跟踪如下

  • ClassCastException:com.google.gson.internal.LinkedTreeMap不能强制转换为java.util.HashMap 提前谢了。

  • 问题内容: 我试图反序列化以DateTime作为修饰符的类: 但是,当我尝试tro反序列化时,却遇到以下异常: 我用它来反序列化: 还有我的jsonData的示例: 问题答案: 期望使用无参数构造函数。的最新版本没有这样的构造函数。 如果您已固定格式,即。应该只是一个时间戳,那么你可以简单地注册与。它将在内部用于字段。您可以摆脱注释。 您需要添加库。

  • (我之前的问题/议题不够具体,因此完全重写了。) 问题:ServiceStack 4.0.16不适用于Redis。 重新创建的步骤: 在VS 2013中创建一个新的ASP. NET Web表单应用程序。 更新Nuget包 安装Nuget包= 容器登记册(c)= > 运行项目,你应该只得到一个空白屏幕在Chrome 导航到/元数据,你会得到一个通用的HTTP错误500.0-内部服务器错误 打开事件查

  • 这是包含列表的POJO JSON具有以下结构: 运行时,web服务的结果运行良好,但反序列化会打印此错误:

  • 我有以下课 以及以下测试: 我收到以下错误: com.fasterxml.jackson.databind.exc.MismatchedInputException:无法构造com.store.domain.model.Cart实例(尽管至少存在一个Creator):无法从[Source:(String)"{"id":"56c7b5f7-115b-4cb9-9658-acb7b849d5d5"}"

  • 我尝试调用一个url谁接受列表。 发送的数据为 “{”时间戳“:1445958336633,”状态“:400,”错误“:”错误请求“,”异常“:”org.springframework.http.converter.httpmessagenotreadableException“,”消息“:”无法读取文档:无法反序列化START_OBJECT令牌之外的java.util.arraylist实例\n