当前位置: 首页 > 知识库问答 >
问题:

org.apache.kafka.common.KafkaException:使用自定义对象Serializer构造kafka生产者失败

计光赫
2023-03-14

我编写了一个spring kafka包,使用spring boot将消息发送到kafka主题,其中“Key”作为字符串,“Arraylist”作为值。“Custom Object”是一个具有属性item id、item name和item ordered count的类。

Kafka制作人日志如下所示。

2021-10-29 00:09:34.147  INFO 16496 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [172.26.77.192:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = true
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class io.springbootlearn.orders.customerorders.util.kafkaProducer.KafkaArrayListSerializer

2021-10-29 00:09:34.180  INFO 16496 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-10-29 00:09:34.180  INFO 16496 --- [           main] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2021-10-29 00:09:34.180  INFO 16496 --- [           main] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-10-29 00:09:34.181  INFO 16496 --- [           main] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2021-10-29 00:09:34.182  INFO 16496 --- [           main] o.a.kafka.common.utils.AppInfoParser     : App info kafka.producer for producer-1 unregistered
2021-10-29 00:09:34.182 DEBUG 16496 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Kafka producer has been closed
ProcessOrders: Exception found.....org.apache.kafka.common.KafkaException: Failed to construct kafka producer

我编写了一个自定义序列化程序,如下所示。

package io.springbootlearn.orders.customerorders.util.kafkaProducer;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import io.springbootlearn.orders.customerorders.models.UpdateItemCount;

public class KafkaArrayListSerializer<UpdateItemCount> implements Serializer<ArrayList<UpdateItemCount>>{
    
    private final Serializer<UpdateItemCount> arrayListSerializer;
    
    public KafkaArrayListSerializer(final Serializer<UpdateItemCount> paramArrayListSerializer) {
        System.out.println("KafkaArrayListSerializer: Inside constructor.......");
        this.arrayListSerializer = paramArrayListSerializer;
    }

    @Override
    public byte[] serialize(String topic, ArrayList<UpdateItemCount> customerOrderData) {
        System.out.println("KafkaArrayListSerializer: Inside serialize.......");
        System.out.println("Topic name......."+topic);
        
        int dataSize = customerOrderData.size();
        System.out.println("dataSize......."+dataSize);
        
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(baos);
        Iterator<UpdateItemCount> arrayListIter = customerOrderData.iterator();
        try {
            out.writeInt(dataSize);
            while (arrayListIter.hasNext()) {
                final byte[] customerOrderDataToBytes =  arrayListSerializer.serialize(topic, arrayListIter.next());
                out.writeInt(customerOrderDataToBytes.length);
                out.write(customerOrderDataToBytes);
            }
            out.close();
        } catch(IOException e) {
            throw new RuntimeException("unable to serialize ArrayList", e);
        }
        
        return baos.toByteArray();
    }
}

“Arraylist”的Serde类如下所示。

package io.springbootlearn.orders.customerorders.util.kafkaProducer;

import java.util.ArrayList;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

public class ArrayListSerde<T> implements Serde<ArrayList<T>>{
    
    private final Serde<ArrayList<T>> arrayListSerdeObj;
    
    public ArrayListSerde(final Serde<T> SerdeObj) {
        this.arrayListSerdeObj = Serdes.serdeFrom(new KafkaArrayListSerializer<>(SerdeObj.serializer())
                                                 , new KafkaArrayListDeserializer<>(SerdeObj.deserializer()));
    }

    @Override
    public Serializer<ArrayList<T>> serializer() {
        System.out.println("ArrayListSerde: Inside Serializer.......");
        return arrayListSerdeObj.serializer();
    }

    @Override
    public Deserializer<ArrayList<T>> deserializer() {
        return null;
    }
    
    @Override
      public void close() {
        arrayListSerdeObj.serializer().close();
      }

}

Kafka生产者服务代码如下所示:-

package io.springbootlearn.orders.customerorders.util.kafkaProducer;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import io.springbootlearn.orders.customerorders.models.UpdateItemCount;


@Service
@ConfigurationProperties
public class KafkaProducerService {
    
    
    private final KafkaTemplate<String, ArrayList<UpdateItemCount>> kafkaTemplateObj;
    private String KAFKA_TOPIC_NAME = "ItemsOrdered";
    private ListenableFuture<SendResult<String,ArrayList<UpdateItemCount>>> asyncCall;
    private ProducerListener<String, ArrayList<UpdateItemCount>> producerListener;
    
    public KafkaProducerService(KafkaTemplate<String,ArrayList<UpdateItemCount>> kafkaTemplateParam) {
        this.kafkaTemplateObj = kafkaTemplateParam;     
    }
    
    public KafkaTemplate<String, ArrayList<UpdateItemCount>> producerListener(ProducerListener<String, ArrayList<UpdateItemCount>> producerListener) {
        kafkaTemplateObj.setProducerListener(producerListener);
        return kafkaTemplateObj;
    }
    
    public void sendMessage(Integer PartitionId, String key, ArrayList<UpdateItemCount> UpdateItemCountArr) {
        
            try {

                System.out.println("KafkaProducerService:sending message");
                
                final ProducerRecord<String, ArrayList<UpdateItemCount>> record = new ProducerRecord<String, ArrayList<UpdateItemCount>>(KAFKA_TOPIC_NAME, PartitionId, key,UpdateItemCountArr);

                System.out.println("KafkaProducerService: sending async call...");
                
                ListenableFuture<SendResult<String,ArrayList<UpdateItemCount>>> future = kafkaTemplateObj.send(record);
                 
                kafkaTemplateObj.flush();
                
            } catch(KafkaProducerException ex) {
                System.out.println("Exception...." + ex);
                System.out.println("Exception msg...." + ex.getMessage());
                System.out.println("Exception root cause..." + ex.getRootCause());
                System.out.println("Exception stack trace..." + ex.getStackTrace());
            }
     }
    
}

kafka配置如下所示

spring.kafka.producer.bootstrap-servers = 172.26.77.192:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = io.springbootlearn.orders.customerorders.util.kafkaProducer.KafkaArrayListSerializer
logging.level.org.apache.kafka=debug
logging.level.org.apache.kafka.clients=debug

我已经确认我的Kafka设置工作正常,方法是编写另一个程序,该程序将消息发送到不同的Kafka主题,其中“Key”和“Value”都作为字符串。消费者能够从此主题获取消息。

有人能帮忙吗?

非常感谢。

共有1个答案

翟泰
2023-03-14

Kafka只能使用无参数构造函数创建序列化程序。对于更复杂的对象,您必须自己构造它们,并通过构造函数或setter传递到默认生产者工厂。

 类似资料:
  • 我正在手动启动Zoomaster,然后是Kafka服务器,最后是Kafka-Rest服务器及其各自的属性文件。接下来,我正在tomcat上部署我的Spring Boot应用程序 在Tomcat日志跟踪中,我得到了错误org。springframework。上下文ApplicationContextException:无法启动bean的组织。springframework。Kafka。配置。inte

  • 我正在用Springboot做一个简单的Kafka示例项目,我遇到了一个错误,制作人没有创建,但其余的工作正常。 我遇到的错误似乎引发了异常,因为制作人没有创建,但没有解释原因,我也不知道: 这是我的kafka配置: 这里是控制器,endpoint“/api/kafka”:

  • 我有一个主题中的多个事件,我试图在这些步骤中处理: 根据标题值过滤事件 应用反序列化程序 按键分组 聚合以生成新的KTable 新KTable将以流式传输方式传输到与具有新标题的新事件相同的主题 我可以使用transformValues访问标题,但不确定在执行toStream时如何注入新的标题值。 注意:我是KStream的新手。

  • 我有一个kafka streams应用程序 或 这是一个类,用于将消息分发到不同的分区,即使在kafka 2.4版本中使用相同的键 RoundRobinPartitioner具有以下实现: 我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是: 当我这样配置时,消息在两种实现中都被分发到不同的分区,但决不使用某些分区。 我有50个分区,而分区14和34从未收到消息。我的分区不是没有价值

  • 我想在Kafka上用Flink设置一个基本的生产者-消费者,但是我很难通过Java向现有消费者生成数据。 CLI解决方案 > 我设置了一个使用zip from 和 我使用创建了一个名为transactions1的主题 现在我可以在命令行上使用生产者和消费者来查看主题已经创建并工作。 设置我运行的消费者 现在,如果任何制作人向主题发送数据,我将在消费者控制台中看到它。 我通过运行 并在cli中的生产

  • 我创建了一个带有三个分区的Kafka主题。使用Spring Kafka中的ProducerFactory,我可以创建一个producer实例。但是,我想创建三个生产者实例,因为我有三个分区。类似地,我想要三个consumer的实例。我该怎么做?请帮忙。