当前位置: 首页 > 知识库问答 >
问题:

avro类型的createDataFrame中的无限递归

戚阳文
2023-03-14

我从本例中的createDataFrame调用中获得了一个StackOverflow Error。它起源于涉及java类型推理的scala代码,该代码在无限循环中调用自己。

final EventParser parser = new EventParser();
JavaRDD<Event> eventRDD = sc.textFile(path)
        .map(new Function<String, Event>()
{
    public Event call(String line) throws Exception
    {
        Event event = parser.parse(line);
        log.info("event: "+event.toString());
        return event;
    }
});
log.info("eventRDD:" + eventRDD.toDebugString());

DataFrame df = sqlContext.createDataFrame(eventRDD, Event.class);
df.show();

堆栈跟踪的底部如下所示:

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:104)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

这与中报告的错误类似http://apache-spark-developers-list.1001551.n3.nabble.com/Stackoverflow-in-createDataFrame-td11791.html但我使用的是Spark 1.4.1,它比修复这个bug时要晚。

事件类由avro从此avsc生成。它确实包含双精度字段和长字段,据报告这会导致问题,但用字符串替换双精度字段并不会改变症状。

{
    "namespace": "mynamespace", 
    "type": "record", 
    "name": "Event", 
    "fields": [
        { "name": "ts", "type": "double", "doc": "Timestamp"},
        { "name": "uid", "type": "string", "doc": "Unique ID of Connection"},
        { "name": "idorigh", "type": "string", "doc": "Originating endpoint’s IP address (AKA ORIG)"},
        { "name": "idorigp", "type": "int", "doc": "Originating endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "idresph", "type": "string", "doc": "Responding endpoint’s IP address (AKA RESP)"},
        { "name": "idrespp", "type": "int", "doc": "Responding endpoint’s TCP/UDP port (or ICMP code)"},
        { "name": "proto", "type": "string", "doc": "Transport layer protocol of connection"},
        { "name": "service", "type": "string", "doc": "Dynamically detected application protocol, if any"},
        { "name": "duration", "type": "double", "doc": "Time of last packet seen – time of first packet seen"},
        { "name": "origbytes", "type": "int", "doc": "Originator payload bytes; from sequence numbers if TCP"},
        { "name": "respbytes", "type": "int", "doc": "Responder payload bytes; from sequence numbers if TCP"},
        { "name": "connstate", "type": "string", "doc": "Connection state (see conn.log:conn_state table)"},
        { "name": "localorig", "type": "boolean", "doc": "If conn originated locally T; if remotely F."},
        { "name": "localresp", "type": "boolean", "doc": "empty, always unset"},
        { "name": "missedbytes", "type": "int", "doc": "Number of missing bytes in content gaps"},
        { "name": "history", "type": "string", "doc": "Connection state history (see conn.log:history table)"},
        { "name": "origpkts", "type": [ "int", "null"], "doc": "Number of ORIG packets"},
        { "name": "origipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "resppkts", "type": [ "int", "null"], "doc": "Number of RESP packets"},
        { "name": "respipbytes", "type": [ "int", "null"], "doc": "Number of RESP IP bytes (via IP total_length header field)"},
        { "name": "tunnelparents", "type": [ "string", "null"], "doc": "If tunneled, connection UID of encapsulating parent (s)"},
        { "name": "origcc", "type": ["string", "null"], "doc": "ORIG GeoIP Country Code"},
        { "name": "respcc", "type": ["string", "null"], "doc": "RESP GeoIP Country Code"}
    ]
}

有人能提供建议吗?谢谢

共有1个答案

羊舌承
2023-03-14

spark avro项目正在开展工作来解决这个问题,请参见:https://github.com/databricks/spark-avro/pull/217和https://github.com/databricks/spark-avro/pull/216

合并后,应该有一个函数将Avro对象的RDD转换为数据集(行数据集相当于数据帧),而不存在生成类中getSchema()函数的循环引用问题。

 类似资料:
  • 查看我在某处(这里是游乐场)找到的这个Typescript4.2片段: 我的头不能绕着它。TS如何处理这件事?它怎么不卡在无限递归里?具体地说,对于和的情况,悬停在变量上显示TS将类型解析为和。这是怎么回事?

  • 问题内容: 我的一个朋友在Java API(https://docs.oracle.com/javase/7/docs/api/java/lang/Enum.html)中找到了这个窍门, 并通过阅读以下文章https://docs.oracle.com/javase/tutorial/java/generics/genTypes.html我可以理解上述行在语法上的含义,但是从给出的示例中我无法弄清

  • 问题内容: 我是Go编程语言的新手,我有一个创建和解释器的任务,但是我遇到了以下问题: 我想将环境定义为: 但是我收到错误“无效的递归类型环境”。根据我的研究,我将父级更改为 Environment。但是现在,当我需要创建一个环境类型为var的新环境时,它会收到错误消息“无法将fun_Val.ds(环境类型)用作 环境值类型”。我正在如下创建环境: 我试图将这篇文章中的代码量限制在一定范围内,但是

  • 我使用Google Endpoint创建了一个应用服务器,它是一个即时消息应用程序的后端。每个用户都有一个好友列表。 当我创建一个新朋友时,我使用下面的方法将用户添加到彼此的朋友列表中。然而,由于循环依赖关系,当我添加朋友时,它给了我以下错误。 我看了贴出的其他问题和解决方案。他们中的大多数人结构不同,他们没有解决我的问题。 这个网站的一个答案是建议添加,但我没有任何字段可以添加。我试图放置,但我

  • 阅读 https://avro.apache.org/docs/current/spec.html 它说架构必须是以下之一: 一个JSON字符串,用于命名定义的类型 一个JSON对象,其形式为:,其中是原始类型名或派生类型名,定义如下。允许将本文档中未定义的属性作为元数据,但不得影响序列化数据的格式 一个JSON数组,表示嵌入类型的联合 我想要一个描述树的模式,使用树的递归定义是: < li >具

  • 我得到了错误下的无限递归。 下面是我的代码 另一个班 我无法理解为什么在UI上获取此值时会出现此错误。