当前位置: 首页 > 知识库问答 >
问题:

Flink,如何使用protobuf序列化程序反序列化protobuf?

毕泽宇
2023-03-14

我需要通过flink消费Kafka,不幸的是,Kafka消息是在serde中使用原型,完全不知道如何处理它,这里是来自互联网的代码,但我不能使它工作。

...    
import com.google.protobuf.InvalidProtocolBufferException;
import com.twitter.chill.protobuf.ProtobufSerializer;

public class Protobuf2 {

    public static void main(String[] args) throws Exception {
        String inputTopic = "zz";
        String consumerGroup = "foobar";
        String address = "localhost:9092";
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.getConfig().registerTypeWithKryoSerializer(ExchangeMessage.Order.class, ProtobufSerializer.class);

        FlinkKafkaConsumer<ExchangeMessage.Order> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address,
                consumerGroup);
        DataStream<ExchangeMessage.Order> input = environment.addSource(flinkKafkaConsumer);

        input.map(order ->
            {
                System.out.println("foooooooo, " + order.getOid());
                return order;
            });

        environment.execute("kafka02");
    }

    public static FlinkKafkaConsumer<ExchangeMessage.Order> createStringConsumerForTopic(String topic,
            String kafkaAddress, String kafkaGroup) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id", kafkaGroup);
        FlinkKafkaConsumer<ExchangeMessage.Order> consumer = new FlinkKafkaConsumer<>(topic,
                new OrderSerDeSchema(), props);
        return consumer;
    }


    @SuppressWarnings("serial")
    static class OrderSerDeSchema
            implements DeserializationSchema<ExchangeMessage.Order>, SerializationSchema<ExchangeMessage.Order> {

        @Override
        public ExchangeMessage.Order deserialize(byte[] message) throws IOException {
            ExchangeMessage.Order order = null;
            try {
                order = ExchangeMessage.Order.parseFrom(message);
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            return order;
        }

        @Override
        public boolean isEndOfStream(ExchangeMessage.Order nextElement) {
            return false;
        }

        @Override
        public TypeInformation<ExchangeMessage.Order> getProducedType() {
            return null;
        }

        @Override
        public byte[] serialize(ExchangeMessage.Order element) {
            return new byte[0];
        }
    }

}

这不起作用,它会让NPE:

Caused by: java.lang.NullPointerException
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at ......

有人知道我做错了什么吗?使用twitter ProtobufSerializer是唯一值得拥有protobuf的方法吗?还是还有别的路要走?

共有1个答案

璩珂
2023-03-14

添加依赖项

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>chill-protobuf</artifactId>
    <version>0.7.6</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.8.0</version>
</dependency>

并将其添加到代码中

env.getConfig().registerTypeWithKryoSerializer(MockMessageProto.MockMessage.class, ProtobufSerializer.class);

也许你会遇到版本冲突,改变chily-原型版本来修复它

 类似资料:
  • 我试图阅读和打印从Kafka使用Apache Flink的原型消息。 我遵循官方文件,但没有成功:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/third_party_serializers/ Flink消费者代码是: 反序列化器代码是:

  • 试图在Java中使用protobuf反序列化消息,并得到以下异常。 原因:com.google.protobuf.InvalidProtocolBufferException:在解析协议消息时,输入意外地在字段中间结束。这可能意味着输入被截断,或者嵌入的消息错误报告了自己的长度。在com.google.protobuf.InvalidProtocolBufferException.Truncate

  • 我正在尝试使用kryo序列化和反序列化到二进制。我想我已经完成了序列化,但似乎无法反序列化。下面是我正在处理的代码,但最终我想存储一个字节[],然后再次读取它。文档只显示了如何使用文件。

  • 问题内容: 我有一个问题。我想使用JAXB将一个对象转换为另一个对象。就像在中,我有一个class 和另一个class ,它们都有相同的参数,实际上都是相同的(复制粘贴),但是包不同。我想使用进行它们之间的转换。 怎么做,请帮帮我。 问题答案: 您可以执行以下操作。 注意: 不需要利用JAXBSource将数据具体化为XML。 它在对象模型上不需要任何注释。 com.home.Student co

  • 嗨,我有LogEventObject在客户端用于记录事件,我想使用REST API将其发送到服务器。我将LogEventObject转换为json字符串,并通过REST将其作为有效载荷发送。在服务器端,我使用Groovy,当我尝试做对象apper.read值()时,我得到以下错误。 com.fasterxml.jackson.databind.JsonMappingExc0019:找不到非具体的集

  • 问题内容: 我正在尝试使用protobuf序列化结构。经过许多小时试图弄清楚我在做什么错,我决定测试google的示例,但效果不佳 我从Google(https://developers.google.com/protocol- buffers/docs/javatutorial )获得以下协议: 我正在尝试将其序列化: byte []序列化= john.toByteArray(); 我得到“ j