当前位置: 首页 > 知识库问答 >
问题:

JSON解码Base64数据流Apache Flink

通京
2023-03-14

需要一些建议,我已经使用scala创建了一个flink作业来消费来自Kafka的消息。但是消息是用base64编码压缩的。我已经试过这个代码了

    val x_stream: DataStream[ObjectNode] = env
      .addSource(
        new FlinkKafkaConsumer010[ObjectNode](parameters.get("kafka.topic.source"),
          new JsonNodeDeserializationSchema(),
          kfk_props
        ).setStartFromEarliest()
      ).name("Topic Test")).rebalance

代码由于它不是有效的Json格式而失败。

然后我尝试使用SimpleStringSchema(),就像下面的代码一样

    val x_stream: DataStream[String] = env
      .addSource(
        new FlinkKafkaConsumer010[String](parameters.get("kafka.topic.source"),
          new SimpleStringSchema(),
          kfk_props
        ).setStartFromEarliest()
      ).name("Topic Test")).rebalance

Kafka的信息完美地消耗了,但是输出如下

.....
    Br?G"p)0?p?AF??g}?Ly?@?
    ??>??j?)??);?E?]<d╚? ?-?@?g?????'2???�?�o???r?z????Q$????p    F╔?7?yx+_'v?2???K&??O??c??D,c0F2??ny[?=??%?/?M1:???bq?yHt"A??5???

如何将此数据解码为有效的JSON?

此致敬意

共有1个答案

敖毅
2023-03-14

您应该实现KafkaDeserializationSchema接口来定制解析逻辑。

首先,解码base64并将其转换为字节流,然后生成一个字符串对象,最后将其解析为json。

 类似资料:
  • 问题内容: 我有一些大型的base64编码数据(存储在hadoop文件系统中的快照文件中)。该数据最初是压缩的文本数据。我需要能够读取此编码数据的大块,对其进行解码,然后将其刷新到GZIPOutputStream。 关于如何执行此操作而不是将整个base64数据加载到数组中并调用Base64.decodeBase64(byte [])的任何想法? 如果我读了直到’\ r \ n’分隔符并逐行解码的

  • 问题 你需要使用Base64格式解码或编码二进制数据。 解决方案 base64 模块中有两个函数 b64encode() and b64decode() 可以帮你解决这个问题。例如; >>> # Some byte data >>> s = b'hello' >>> import base64 >>> # Encode as Base64 >>> a = base64.b64encode(s)

  • 问题内容: 我有一个Base64编码的图像。用Java解码的最佳方法是什么?希望仅使用Sun Java 6附带的库 问题答案: 从v6开始,Java SE随JAXB一起提供。有静态方法可以简化这一过程。请参阅和。

  • 介绍 Base64编码是用64(2的6次方)个ASCII字符来表示256(2的8次方)个ASCII字符,也就是三位二进制数组经过编码后变为四位的ASCII字符显示,长度比原来增加1/3。 使用 String a = "伦家是一个非常长的字符串"; //5Lym5a625piv5LiA5Liq6Z2e5bi46ZW/55qE5a2X56ym5Liy String encode = Base64.en

  • 问题内容: 如何用Java解码Base64数据? 问题答案: 从v6开始,Java SE随JAXB一起提供。javax.xml.bind.DatatypeConverter有静态方法可以简化这一过程。 请参阅parseBase64Binary()和printBase64Binary()。 从Java 8开始,已经有官方支持的用于Base64编码和解码的API。随着时间的流逝,它可能会成为默认选择。

  • 问题内容: 我正在编写脚本以自动生成演示数据,并且需要在JSON中序列化一些数据。该数据的一部分是图像,因此我在base64中对其进行了编码,但是当我尝试运行脚本时,我得到了: 据我所知,无论使用base64编码的任何内容(在这种情况下,都是PNG图像)都只是一个字符串,因此它应该对序列化造成问题。我想念什么? 问题答案: 您必须注意数据类型。 如果读取二进制图像,则会得到字节。如果将这些字节编码