当前位置: 首页 > 知识库问答 >
问题:

如何用自定义对象处理和聚合Kafka流?

许波涛
2023-03-14

所以基本上我有会计课。我有数据。我想将这些对象发送到我与生产者的主题中。现在没关系。稍后,我想使用 Kafka 流进行聚合,但我不能,因为某些 Serde 属性在我的配置中是错误的,我认为 :/。我不知道错误在哪里。我的制作人工作正常,但我无法聚合。有人帮我查看我的 kafka 流代码吗?我的帐户类:

public class Account {

    private long fromId;
    private long amount;
    private long toId;
    private ZonedDateTime time;
}

我的Account类有两个类Serializer和Deserializer。序列化程序:

public class AccountSerializer implements Serializer {

    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson = new Gson();


    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, Object o) {
        String line = gson.toJson(o);
        // Return the bytes from the String 'line'
        return line.getBytes(CHARSET);
    }



    @Override
    public void close() {

    }
}

反序列化程序:

public class AccountDeserializer implements Deserializer {
    private static final Charset CHARSET = Charset.forName("UTF-8");
    static private Gson gson;

    static {
        gson = new Gson();
    }

    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            // Transform the bytes to String
            String person = new String(bytes, CHARSET);
            // Return the Person object created from the String 'person'
            return gson.fromJson(person, Account.class);
        } catch (Exception e) {
            throw new IllegalArgumentException("Error reading bytes! Yanlış", e);
        }
    }

    @Override
    public void close() {

    }
}

我的kafka流的AccountSerde类:

public class AccountSerde implements Serde<Object> {

    private AccountSerializer accountSerializer;
    private AccountDeserializer accountDeserializer;

    @Override
    public void configure(Map<String, ?> map, boolean b) {


    }

    @Override
    public void close() {
        accountSerializer.close();
        accountDeserializer.close();

    }

    @Override
    public Serializer<Object> serializer() {
        return accountSerializer;
    }

    @Override
    public Deserializer<Object> deserializer() {
        return accountDeserializer;
    }
}

还有我的Kafka制作人:

 public static void main(String[] args) {

        DataAccess dataAccess = new DataAccess();
        List<Account> accountList = dataAccess.read();

        final Logger logger = LoggerFactory.getLogger(Producer.class);
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,LongSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,AccountSerializer.class.getName());


        KafkaProducer<Long,Account> producer = new KafkaProducer<>(properties);



        for (Account account : accountList) {

            ProducerRecord<Long,Account> record = new ProducerRecord<Long, Account>("bank_account",account.getFromId(),account);


            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {


            logger.info("Record sent successfully. \n "+ "Topic : "+recordMetadata.topic() +"\n"+
                            "Partition : " + recordMetadata.partition() + "\n"+
                            "Offset : " +recordMetadata.offset() +"\n"+
                            "Timestamp: " +recordMetadata.timestamp() +"\n");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }

                }
                else{
                    logger.info("Error sending producer");
                }
            }
        });
    }


    producer.flush();
    producer.close();
}

这是我想尝试聚合的类,我的 Kafka Stream 类。

  public static void main(String[] args) {
        System.out.println();

        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.01:9092");
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"demo-kafka-streams");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,AccountDeserializer.class.getName());
        properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde);
        properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, AccountSerde.class.getName());
        //create a topology

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        KStream<Long, Account> inputTopic = streamsBuilder.stream("bank_account");

        KTable<Long, Long> aggregate = inputTopic.groupByKey().aggregate(
                () -> 0L,
                (key, current, oldBalance) -> current.getAmount() + oldBalance);

        aggregate.toStream().to("son");

        KafkaStreams streams = new KafkaStreams(streamsBuilder.build(),properties);

        streams.start();

        System.out.println(streams.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));


    }

我试着让我的制作人正常工作并发送对象。然而,由于错误,我不能尝试我的聚合代码是否工作。它给了我

[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] ERROR org.apache.kafka.streams.errors.LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: bank_account, partition: 0, offset: 0
java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] Shutting down
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89] State transition from RUNNING to ERROR
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89] All stream threads have died. The instance will be in error state and should be closed.
[demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1] Shutdown complete
Exception in thread "demo-kafka-streams-9e3b0ab8-c021-4707-bf85-174e1356ea89-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 7 more

共有1个答案

田曜瑞
2023-03-14

你从来没有初始化字段,所以你得到一个NPE

您还应该将 Serde 类型更改为实际类

public class AccountSerde implements Serde<Account> {

    // These are both null unless you initialize them 
    private AccountSerializer accountSerializer;
    private AccountDeserializer accountDeserializer;

此外,您需要从此值修复您的IP地址,这不是一个有效的IP

"127.0.01:9092"
 类似资料:
  • 我有一个处理器,它从主题中获取json字符串,类型为GenericRecord。现在我把这条河分成两条支流。我采用第一个分支,并将(key,value)映射为2个字符串,其中包含一个特定的json字段和该字段的值,然后按key分组。到目前为止,一切都很好。现在,我必须用用户定义的新类型聚合流,并收到一个异常。 这里是代码: 新类型: 好流: 问题是: 这是例外: 我如何解决这个问题? 更新 ---

  • 对于我的一个Kafka streams应用程序,我需要同时使用DSL和处理器API的特性。我的流媒体应用程序流是 聚合之后,我需要向接收器发送单个聚合消息。因此我定义拓扑如下 知道这里出了什么问题吗?

  • 我正在构建一个非常简单的KafkaStreams演示应用程序,以测试用例。 我无法升级我正在使用的Kafka broker(当前版本为0.10.0),并且有几条消息是由0.10.0之前的生产者编写的,因此我使用自定义时间戳提取器,我将其作为默认值添加到主类开头的配置中: 当从我的源主题消费时,这非常好。但是当使用聚合运算符时,我遇到了一个异常,因为当从内部聚合主题消费时,使用了的实现而不是自定义实

  • 考虑下面的代码: 对于上面的代码,输出是:

  • 我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些