当前位置: 首页 > 知识库问答 >
问题:

在flink映射中动态解析json

岳正阳
2023-03-14

我使用flink动态分析json类型的数据,对keyby和给定的列求和,在我的mapFunction中,我将json转换为case类,但结果流没有在keyby函数中得到编译器,在线程“main”org.apache.flink.api.common.InvalidProgramException中得到错误异常:此类型(GenericType )不能用作Key. 。我的代码如下所示

//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b
//main function
stream.map(new MapFunc)
      .keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*)
      .sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox() 
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,b:Int,c:String,d:Long){}
   |scala.reflect.classTag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]] 
}

override def map(value: String) {
val tmp = JSON.parseObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0, y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*) 
}}

我如何将json转换为case类或tuple?

共有1个答案

陶俊晤
2023-03-14

实际上,似乎例外情况

org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<Test>) cannot be used as key 

即使对于普通case类也保持不变(不通过反射生成)

case class Test(a: String, b: Int, c: String, d: Long)

第一个问题是这个case类不是POJO

所有字段要么是公共的,要么必须通过getter和setter函数访问。对于名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。

已注册的序列化程序必须支持字段的类型。

所以您应该替换

case class Test(a: String, b: Int, c: String, d: Long)
import scala.beans.BeanProperty

case class Test(
                 @BeanProperty var a: String,
                 @BeanProperty var b: Int,
                 @BeanProperty var c: String,
                 @BeanProperty var d: Long) {
  def this() = {
    this(null, 0, null, 0)
  }
}
    null

下面是工具箱生成的代码的反编译版本

public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {

   public Object wrapper() {
      new LazyRef();

      class Test$1 implements Product, Serializable {
         private String a;
         private int b;
         private String c;
         private long d;

         ...
      }

      return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
   }

   ...
}

完整的反编译代码:

https://gist.github.com/dmytromitin/f1554AD833EA1BB9EB97947AE872D220

在运行时动态编译多个Scala类

Scala反射中的张量流

但是带有手动生成的类的代码

https://gist.github.com/dmytromitin/a23e45a546790630e838e60c7206adcd

使用反射,我们只能返回any。

现在我正在我生成的代码中创建TypeInformation[Test],这似乎修复了这个类型(GenericType )不能用作key 但现在我已经

org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. 
The object probably contains or references non serializable fields.
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox

object App {
  val toolbox = ToolBox(runtime.currentMirror).mkToolBox()

  class MapFunc extends RichMapFunction[String, Any] {
    var typeInfo: TypeInformation[_] = _
    @transient var classSymbol: ClassSymbol = _

    override def open(parameters: Configuration): Unit = {
      val code =
        """|case class Test(
           |                 @scala.beans.BeanProperty var a: String,
           |                 @scala.beans.BeanProperty var b: Int,
           |                 @scala.beans.BeanProperty var c: String,
           |                 @scala.beans.BeanProperty var d: Long) {
           |  def this() = {
           |    this(null, 0, null, 0)
           |  }
           |}""".stripMargin

      val tree = toolbox.parse(code)
      classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
      typeInfo = toolbox.eval(
        q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])"
      ).asInstanceOf[TypeInformation[_]]
    }

    override def map(value: String): Any = {
      val values = Seq("aaa", 1, "ccc", 2L) //hardcoded for now
      createClassInstance(classSymbol, values: _*)
    }
  }


  def main(args: Array[String]): Unit = {
    val func = new MapFunc
    func.open(new Configuration)
    val classInstance = func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")
    println(classInstance) //Test(aaa,1,ccc,2)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("localhost", 9999)
    val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
    println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>
    val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")
    println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904
  }

  def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {
    val runtimeMirror = toolbox.mirror
    val classType = classSymbol.typeSignature
    val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
    val classMirror = runtimeMirror.reflectClass(classSymbol)
    val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
    constructorMirror(args: _*)
  }
}
 类似资料:
  • 因此,通过对Stackoverflow的研究,特别是本主题,我已经确定在一个正常工作的getter/setter设置中嵌入了一个JSON对象字典。当我调用API时,属性的传统getter/setter工作得很好,但这是因为我知道属性的名称。如何使其适用于名为的对象的未知属性,以及该对象的格式是什么()? API响应: 我的POJO: 如何设置getter/setter和正确的字典格式(看起来可能是

  • 问题内容: 我有一个json对象是这样的: 我试图这样解析: 但是我不知道如何访问动态名称。我们如何解析这样的JSON -注意-Ya的 所有值都带有引号,例如:“ Yg&R_” 问题答案: 试试这个动态的JSON解析器

  • 问题内容: 我在以下架构中具有json对象: 在这里,该字段包含一个嵌入的json对象,并且该对象的模式是动态的,并且每次都不同。 的对象是不同的API服务,以及不同的API服务的不同方法获得的原始输出。不可能将其映射到所有可能的值。 是否可能有如下所示的java类: 或者类似的东西,以便我可以接收基本架构并对其进行处理,然后将其发送到相关类,该类将转换为适当的预期类? 问题答案: 使用 您可以从

  • 我有一个json响应从API如下 我试图解析它并在列表视图中显示它。但是我在映射数据时遇到了如下错误,错误是“\u TypeError(type'String'不是type'map的子类型

  • 我遵循Microsoft的指导,通过Azure ADB2C启用Dynamics Portal访问。基础设施已经启动并运行良好。我使用的是"登录"政策。但是,我在声明映射功能上遇到了麻烦。 我已经根据下面链接的文档中的说明配置了以下站点设置。 身份验证/OpenIdConnect/B2C/RegistrationClaimsMapping身份验证/OpenIdConnect/B2C/LoginCla