当前位置: 首页 > 知识库问答 >
问题:

UDF 连接埋在行对象中的未定义大小写类的数组

吕翰飞
2023-03-14

我有一个数据框架,叫做sessions,它的列可能会随着时间而变化。(编辑以澄清:我没有列的case类——只有反射模式。)我将在外部作用域中始终有一个uuid和clientId,其他一些内部和外部作用域列可能会构成跟踪事件,所以...类似于:

root
 |-- runtimestamp: long (nullable = true)
 |-- clientId: long (nullable = true)
 |-- uuid: string (nullable = true)
 |-- oldTrackingEvents: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- timestamp: long (nullable = true)
 |    |    |-- actionid: integer (nullable = true)
 |    |    |-- actiontype: string (nullable = true)
 |    |    |-- <tbd ... maps, arrays and other stuff matches sibling> section
 ...
 |-- newTrackingEvents: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- timestamp: long (nullable = true)
 |    |    |-- actionid: integer (nullable = true)
 |    |    |-- actiontype: string (nullable = true)
 |    |    |-- <tbd ... maps, arrays and other stuff matches sibling>      
 ...

我现在想将oldTrackingEvents和newTrackingEvents与包含这些参数和尚待解决的代码逻辑的UDF合并:

val mergeTEs = udf((oldTEs : Seq[Row], newTEs :  Seq[Row]) =>
        // do some stuff - figure best way 
        //    - to merge both groups of tracking events
        //    - remove duplicates tracker events structures
        //    - limit total tracking events < 500 
        return result // same type as UDF input params
    )

UDF返回结果将是结构的数组,它是连接的两个字段的结果List。

问题:我的问题是如何构造这样的UDF-(1)使用正确的传入参数类型,(2)在UDF中操作这些集合的方法,(3)返回没有编译器错误的值的明确方法。我未成功测试输入/输出的Seq[Row](使用val testUDF=udf((跟踪事件:Seq[Row]))=

sessions.select(mergeTEs('oldTrackingEvents, 'newTrackingEvents).as("cleanTrackingEvents"))

在每一行中,...以内存/速度有效的方式取回单个“跟踪事件”结构数组。

补足的:

看看向我展示的一个问题…如果存在相关性,可能会有一个提示…定义一个UDF,接受Spark数据帧中的对象数组<代码>创建传递给udf的结构函数必须返回产品类型(元组*或案例类),而不是行 也许…这篇文章是相关/有用的。


共有2个答案

阚小云
2023-03-14

或者。。。如果您的目标是将某些随机行结构/模式的序列与某些操作合并,这是一种避免分区讨论的替代通用方法:

从主数据帧中,为每个trackingEvents部分创建数据帧,< code>new和< code>old。选择每个展开的“跟踪事件”部分的列。将这些< code>val dataframe声明保存为< code>newTE和< code>oldTE。

创建另一个dataframe,其中选取的列对于< code>oldTrackingEvents和< code>newTrackingEvents数组中的每个跟踪事件是唯一的,例如每个事件的< code>uuid 、< code>clientId和事件< code >时间戳。您的伪模式应该是:

< code>(uuid: String,clientId : Long,newTE : Seq[Long],oldTE : Seq[Long])

使用UDF连接结构中的两个简单序列,两者都是< code>Seq[Long],这是“未经测试的东西”示例:

val limitEventsUDF = udf { (newTE: Seq[Long], oldTE: Seq[Long], limit: Int, tooOld: Long) => {
    (newTE ++ oldTE).filter(_ > tooOld).sortWith(_ > _).distinct.take(limit) 
}}

UDF将返回清除跟踪事件的数据帧

之后根据需要使用collect_ list进行分组。

仍然...这看起来工作量很大,我不确定是否应该投票给< code >“答案”?

汪臻
2023-03-14
匿名用户

我认为你所链接的问题解释了这一切,所以我重申一下。使用自定义项时

  • STRtType的输入表示形式是弱类型的Row对象。
  • 结构类型的输出类型必须是Scala产品。您不能返回Row对象。

如果这会带来很大的负担,则应使用强类型数据集

val f: T => U 
sessions.as[T].map(f): Dataset[U]

其中< code>T是表示< code >会话模式的代数数据类型,而< code>U是表示结果的代数数据类型。

 类似资料:
  • 颜色选取器 在 1.受保护的无效上创建(捆绑保存实例状态) { 超级.on创建(已保存实例状态); 错误:创建(捆绑包)上的方法未定义对象 类型2.new MyView(指画活动.this); 错误:构造函数指画活动.MyView(指画活动)未定义 3.set内容查看(mv); 错误:他的方法集内容视图(指画活动.MyView)是未定义的类型指画活动 4.公共布尔值在创建选项菜单(菜单菜单) {

  • 我已经编写了以下代码,运行良好。但是我想连接UDF,这样代码可以压缩成几行。请建议我怎么做。下面是我编写的代码。

  • 问题内容: 如果设置最大Java堆大小,那么单个对象可能的最大大小是多少? 假设我的应用程序只有一个类,而我正在创建一个对象。 该对象有大约大小限制吗? 我的课看起来像下面的课: 注意 正如我提到的JVM堆大小一样,我要求定量地回答。 问题答案: 理论上最大的Java对象(如果您有足够大的堆)将是带有2 31-1个元素的Java对象。那是16Gb。 但是,对于给定的堆大小,您将能够创建的最大对象取

  • 我想定义一个带有对象和不同类型的接口,例如 在定义上,没有问题,但在调用like后 这不起作用,并出现以下错误 错误错误:未捕获(promise中):TypeError:无法设置未定义的属性“名称”TypeError:无法设置未定义的属性“名称” 有一些相似的主题,但它们并不完全相关。(如何在类型脚本接口中定义对象或者如何在类型脚本中定义对象变量的类型?) 我很感激你帮助我找到解决办法。

  • 另一个这个程序包帮助你的地方就是提供许多既支持面向对象设计理念又有共通功能的许多对象。 The PeriodicalExecuter object 这个对象提供一定间隔时间上重复调用一个方法的逻辑。 Method Kind Arguments Description [ctor](callback, interval) constructor callback: a parameterless f

  • 问题内容: 在C中,我们可以找到的大小int,char等我想知道如何获得物体的大小就像一个字符串,整数,等在Python。 相关问题:Python列表(元组)中每个元素有多少个字节? 我使用的XML文件包含指定值大小的大小字段。我必须解析此XML并进行编码。当我想更改特定字段的值时,我将检查该值的大小字段。在这里,我想比较输入的新值是否与XML中的值相同。我需要检查新值的大小。如果是字符串,我可以