当前位置: 首页 > 面试题库 >

如何为Kafka 2.2实现FlinkKafkaProducer序列化程序

向苗宣
2023-03-14
问题内容

我一直在努力更新从Kafka读取然后写入Kafka的Flink处理器(Flink 1.9版)。我们已经将此处理器编写为可以朝着Kafka
0.10.2集群运行,现在我们已经部署了一个运行2.2版的新Kafka集群。因此,我着手更新处理器以使用最新的FlinkKafkaConsumer和FlinkKafkaProducer(由Flink文档建议)。但是我遇到了卡夫卡制片人的一些问题。我无法使用不赞成使用的构造函数来将其序列化数据(不足为奇),并且我无法在线找到有关如何实现序列化程序的任何实现或示例(所有示例都使用较旧的Kafka连接器)

当前的实现(对于Kafka 0.10.2)如下

FlinkKafkaProducer010<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer010<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                (FlinkKafkaPartitioner) null
        );

尝试实施以下FlinkKafkaProducer时

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new SimpleStringSchema(),
                producerProps,
                null
        );

我收到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:525)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:483)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:357)
    at com.ebs.flink.sessionprocessor.SessionProcessor.main(SessionProcessor.java:122)

而且我还无法弄清楚为什么。FlinkKafkaProducer的构造函数也已被弃用,当我尝试实现未弃用的构造函数时,我不知道如何序列化数据。以下是它的外观:

FlinkKafkaProducer<String> eventBatchFlinkKafkaProducer = new FlinkKafkaProducer<String>(
                "playerSessions",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return null;
                    }
                },
                producerProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

但是我不明白如何实现KafkaSerializationSchema,并且在网上或Flink文档中都找不到此示例。

是否有人对实现此目标有任何经验,或者对为什么FlinkProducer在步骤中获得NullPointerException有任何提示?


问题答案:

如果您只是将String发送给Kafka:

public class ProducerStringSerializationSchema implements KafkaSerializationSchema<String>{

    private String topic;

    public ProducerStringSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, Long timestamp) {
        return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
    }

}

发送Java对象:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;


    public class ObjSerializationSchema implements KafkaSerializationSchema<MyPojo>{

        private String topic;   
        private ObjectMapper mapper;

        public ObjSerializationSchema(String topic) {
            super();
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(MyPojo obj, Long timestamp) {
            byte[] b = null;
            if (mapper == null) {
                mapper = new ObjectMapper();
            }
             try {
                b= mapper.writeValueAsBytes(obj);
            } catch (JsonProcessingException e) {
                // TODO 
            }
            return new ProducerRecord<byte[], byte[]>(topic, b);
        }

    }

在你的代码中

.addSink(new FlinkKafkaProducer<>(producerTopic, new ObjSerializationSchema(producerTopic), 
                        params.getProperties(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE));


 类似资料:
  • 问题内容: 我在网上阅读,可以通过将派生对象声明为瞬时对象来省略它们的序列化。但是,在链接列表的情况下,链接是对象之间的内存引用。那么,我应该将其转换为数组并存储数组表示形式吗? 问题答案: 这是Java序列化的方式:它获取所有元素并将它们与大小一起写入。当然要声明条目 请参阅的和方法:

  • 问题内容: 仅 子类具有已实现的接口。 我在这里注意到的一件事是,父类未序列化。然后,为什么它没有抛出却确实显示了以下内容 输出量 同样,输出不同于和。我只知道,这是因为父类尚未实现。但是,如果有人向我解释,在对象序列化和反序列化期间会发生什么。它如何改变价值?我不知道,我在程序中使用了注释。因此,如果我在任何时候错了,请告诉我。 问题答案: 根据可序列化的javadoc 反序列化期间,将使用该类

  • 本文向大家介绍请问什么是java序列化?以及如何实现java序列化?相关面试题,主要包含被问及请问什么是java序列化?以及如何实现java序列化?时的应答技巧和注意事项,需要的朋友参考一下 考察点:序列化 序列化就是一种用来处理对象流的机制,所谓对象流也就是将对象的内容进行流化。可以对流化后的对象进行读写操作,也可将流化后的对象传输于网络之间。序列化是为了解决在对对象流进行读写操作时所引发的问题

  • 如果一封邮件被发送到我的收件箱,我会收到一条消息,并将内容插入数据库。我有一个组织。springframework。整合。果心信息如下: 现在,如果出现故障,我希望有故障安全恢复机制,我想的是将消息对象序列化到一个文件中,然后反序列化并更新到DB。 问题1。在这种情况下,如何序列化消息对象?2。除了序列化,还可以使用其他机制吗? 编辑我以前没有做过序列化,我听说类应该实现Serializable,

  • 本文向大家介绍.net实现序列化与反序列化实例解析,包括了.net实现序列化与反序列化实例解析的使用技巧和注意事项,需要的朋友参考一下 序列化与反序列化是.net程序设计中常见的应用,本文即以实例展示了.net实现序列化与反序列化的方法。具体如下: 一般来说,.net中的序列化其实就是将一个对象的所有相关的数据保存为一个二进制文件(注意:是一个对象) 而且与这个对象相关的所有类型都必须是可序列化的

  • 我将来自Kafka主题的消息存储在KeyValueStore中,以便以后可以查询它们。我创建了一个KTable,如下所示: 我在application.yml中配置了使用者,如下所示: 但是,当我从KeyValueStore读取时,键会正确地作为字符串返回,但返回的值是字节数组,而不是MyMessage。由于某种原因,我的自定义反序列化程序未被使用。我尝试自己反序列化消息,但我的反序列化器发生了异