我有一个平面数据帧(df
),结构如下:
root
|-- first_name: string (nullable = true)
|-- middle_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- title: string (nullable = true)
|-- start_date: string (nullable = true)
|-- end_Date: string (nullable = true)
|-- city: string (nullable = true)
|-- zip_code: string (nullable = true)
|-- state: string (nullable = true)
|-- country: string (nullable = true)
|-- email_name: string (nullable = true)
|-- company: struct (nullable = true)
|-- org_name: string (nullable = true)
|-- company_phone: string (nullable = true)
|-- partition_column: string (nullable = true)
我需要将这个数据帧转换为如下结构(因为我的下一个数据将采用这种格式):
root
|-- firstName: string (nullable = true)
|-- middleName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- currentPosition: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- title: string (nullable = true)
| | |-- startDate: string (nullable = true)
| | |-- endDate: string (nullable = true)
| | |-- address: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- zipCode: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- country: string (nullable = true)
| | |-- emailName: string (nullable = true)
| | |-- company: struct (nullable = true)
| | | |-- orgName: string (nullable = true)
| | | |-- companyPhone: string (nullable = true)
|-- partitionColumn: string (nullable = true)
到目前为止,我已经实现了:
case class IndividualCompany(orgName: String,
companyPhone: String)
case class IndividualAddress(city: String,
zipCode: String,
state: String,
country: String)
case class IndividualPosition(title: String,
startDate: String,
endDate: String,
address: IndividualAddress,
emailName: String,
company: IndividualCompany)
case class Individual(firstName: String,
middleName: String,
lastName: String,
currentPosition: Seq[IndividualPosition],
partitionColumn: String)
val makeCompany = udf((orgName: String, companyPhone: String) => IndividualCompany(orgName, companyPhone))
val makeAddress = udf((city: String, zipCode: String, state: String, country: String) => IndividualAddress(city, zipCode, state, country))
val makePosition = udf((title: String, startDate: String, endDate: String, address: IndividualAddress, emailName: String, company: IndividualCompany)
=> List(IndividualPosition(title, startDate, endDate, address, emailName, company)))
val selectData = df.select(
col("first_name").as("firstName"),
col("middle_name).as("middleName"),
col("last_name").as("lastName"),
makePosition(col("job_title"),
col("start_date"),
col("end_Date"),
makeAddress(col("city"),
col("zip_code"),
col("state"),
col("country")),
col("email_name"),
makeCompany(col("org_name"),
col("company_phone"))).as("currentPosition"),
col("partition_column").as("partitionColumn")
).as[Individual]
select_data.printSchema()
select_data.show(10)
我可以看到为select_data
生成的正确架构,但它在我尝试获取一些实际数据的最后一行给出了错误。我收到一个错误,说无法执行用户定义的函数。
org.apache.spark.SparkException: Failed to execute user defined function(anonfun$4: (string, string, string, struct<city:string,zipCode:string,state:string,country:string>, string, struct<orgName:string,companyPhone:string>) => array<struct<title:string,startDate:string,endDate:string,address:struct<city:string,zipCode:string,state:string,country:string>,emailName:string,company:struct<orgName:string,companyPhone:string>>>)
有没有更好的方法来实现这一目标?
我也有类似的要求。< br >我所做的是创建一个类型化的用户定义的聚合,它将产生元素的< code>List。
import org.apache.spark.sql.{Encoder, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable
object ListAggregator {
private type Buffer[T] = mutable.ListBuffer[T]
/** Returns a column that aggregates all elements of type T in a List. */
def create[T](columnName: String)
(implicit listEncoder: Encoder[List[T]], listBufferEncoder: Encoder[Buffer[T]]): TypedColumn[T, List[T]] =
new Aggregator[T, Buffer[T], List[T]] {
override def zero: Buffer[T] =
mutable.ListBuffer.empty[T]
override def reduce(buffer: Buffer[T], elem: T): Buffer[T] =
buffer += elem
override def merge(b1: Buffer[T], b2: Buffer[T]): Buffer[T] =
if (b1.length >= b2.length) b1 ++= b2 else b2 ++= b1
override def finish(reduction: Buffer[T]): List[T] =
reduction.toList
override def bufferEncoder: Encoder[Buffer[T]] =
listBufferEncoder
override def outputEncoder: Encoder[List[T]] =
listEncoder
}.toColumn.name(columnName)
}
现在你可以像这样使用它。
import org.apache.spark.sql.SparkSession
val spark =
SparkSession
.builder
.master("local[*]")
.getOrCreate()
import spark.implicits._
final case class Flat(id: Int, name: String, age: Int)
final case class Grouped(age: Int, users: List[(Int, String)])
val data =
List(
(1, "Luis", 21),
(2, "Miguel", 21),
(3, "Sebastian", 16)
).toDF("id", "name", "age").as[Flat]
val grouped =
data
.groupByKey(flat => flat.age)
.mapValues(flat => (flat.id, flat.name))
.agg(ListAggregator.create(columnName = "users"))
.map(tuple => Grouped(age = tuple._1, users = tuple._2))
// grouped: org.apache.spark.sql.Dataset[Grouped] = [age: int, users: array<struct<_1:int,_2:string>>]
grouped.show(truncate = false)
// +---+------------------------+
// |age|users |
// +---+------------------------+
// |16 |[[3, Sebastian]] |
// |21 |[[1, Luis], [2, Miguel]]|
// +---+------------------------+
这里的问题是,udf
不能将个人地址
和个人公司
直接作为输入。它们在 Spark 中表示为结构,要在 udf
中使用它们,正确的输入类型是 Row
。这意味着您需要将 makePosition
的声明更改为:
val makePosition = udf((title: String,
startDate: String,
endDate: String,
address: Row,
emailName: String,
company: Row)
在 udf 中,
您现在需要使用例如地址.getAs[字符串](“城市”)
来访问案例类元素,并且要将类作为一个整体使用,您需要再次创建它。
更简单、更好的替代方案是在单个udf
中执行所有操作,如下所示:
val makePosition = udf((title: String,
startDate: String,
endDate: String,
city: String,
zipCode: String,
state: String,
country: String,
emailName: String,
orgName: String,
companyPhone: String) =>
Seq(
IndividualPosition(
title,
startDate,
endDate,
IndividualAddress(city, zipCode, state, country),
emailName,
IndividualCompany(orgName, companyPhone)
)
)
)
问题内容: 我想在Go中解组以下JSON数据: 我知道该怎么做,我这样定义一个结构: 我不知道的是,是否有一种简单的方法可以对此进行专门化处理。我想在拆封后以如下格式获取数据: 这样我可以在像这样解组后稍后使用它: 我真的不知道如何在GO中轻松或习惯地执行此操作,因此我希望有一个不错的解决方案。 问题答案: 您可以通过在结构上实现接口来实现。这样的事情应该做: 这基本上表示应该从2个元素的floa
在我的应用程序我有导航抽屉,所以我有一个主要活动和Rest是碎片。我的应用程序运行良好。当我按back按钮时,它会重定向到以前的片段。它工作得很好。但我想要的是在成功支付后,我会显示成功支付页面,在此页面上,当用户按back按钮时,我想重定向到HomeFragment,但现在它会重定向到Placeorder片段。
我可以使用Jackson ObjectMapper将JSON文件中的数据直接加载到我自己的数据结构中吗?我试图从一个JSON文件中加载一个对象映射,但不知道该怎么做。 例如,给定: 我想在JSON文件中创建/加载视图映射。如果我的JSON是: 我希望能够将所有内容加载到地图中。 我尝试了以下方法,但没有成功。它只是将所有内容加载到地图中: 我猜我的错误是告诉制图员把它当作地图来读。类,但不确定如何
问题内容: 我有一个已删除的文件,但仍保持打开程序的状态。我使用lsof找到了索引节点号。如何创建硬链接回到该索引节点? 任何代码都会有所帮助,但是Perl会很方便。 问题答案: 从/ proc / pid / fd / 文件描述符 复制 __ 使用lsof查找pid和文件描述符。
在Woocommerce中,我需要在添加产品时跳过购物车页面,并直接重定向到仅限订阅产品类型的签出。 我在其他地方找到了下面的代码,它适用于跳过基于产品ID的购物车页面,但是我不能正确地使用产品类型(请参阅下面我尝试的内容)。 我尝试的:(它破坏了整个网站:变成了一个blanc页面) 有没有办法做到这一点?
可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗? 我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。