在我们的应用程序中,我们使用Spark
sql获取字段值作为列。我正在尝试弄清楚如何将列值放入嵌套的json对象并推送到Elasticsearch。还有一种方法可以参数化值selectExpr
以传递给正则表达式?
我们目前正在使用Spark Java API。
Dataset<Row> data = rowExtracted.selectExpr("split(value,\"[|]\")[0] as channelId",
"split(value,\"[|]\")[1] as country",
"split(value,\"[|]\")[2] as product",
"split(value,\"[|]\")[3] as sourceId",
"split(value,\"[|]\")[4] as systemId",
"split(value,\"[|]\")[5] as destinationId",
"split(value,\"[|]\")[6] as batchId",
"split(value,\"[|]\")[7] as orgId",
"split(value,\"[|]\")[8] as businessId",
"split(value,\"[|]\")[9] as orgAccountId",
"split(value,\"[|]\")[10] as orgBankCode",
"split(value,\"[|]\")[11] as beneAccountId",
"split(value,\"[|]\")[12] as beneBankId",
"split(value,\"[|]\")[13] as currencyCode",
"split(value,\"[|]\")[14] as amount",
"split(value,\"[|]\")[15] as processingDate",
"split(value,\"[|]\")[16] as status",
"split(value,\"[|]\")[17] as rejectCode",
"split(value,\"[|]\")[18] as stageId",
"split(value,\"[|]\")[19] as stageStatus",
"split(value,\"[|]\")[20] as stageUpdatedTime",
"split(value,\"[|]\")[21] as receivedTime",
"split(value,\"[|]\")[22] as sendTime"
);
StreamingQuery query = data.writeStream()
.outputMode(OutputMode.Append()).format("es").option("checkpointLocation", "C:\\checkpoint")
.start("spark_index/doc")
实际输出:
{
"_index": "spark_index",
"_type": "doc",
"_id": "test123",
"_version": 1,
"_score": 1,
"_source": {
"channelId": "test",
"country": "SG",
"product": "test",
"sourceId": "",
"systemId": "test123",
"destinationId": "",
"batchId": "",
"orgId": "test",
"businessId": "test",
"orgAccountId": "test",
"orgBankCode": "",
"beneAccountId": "test",
"beneBankId": "test",
"currencyCode": "SGD",
"amount": "53.0000",
"processingDate": "",
"status": "Pending",
"rejectCode": "test",
"stageId": "123",
"stageStatus": "Comment",
"stageUpdatedTime": "2019-08-05 18:11:05.999000",
"receivedTime": "2019-08-05 18:10:12.701000",
"sendTime": "2019-08-05 18:11:06.003000"
}
}
我们需要在节点“ txn_summary”下的上述列,例如以下json:
预期产量:
{
"_index": "spark_index",
"_type": "doc",
"_id": "test123",
"_version": 1,
"_score": 1,
"_source": {
"txn_summary": {
"channelId": "test",
"country": "SG",
"product": "test",
"sourceId": "",
"systemId": "test123",
"destinationId": "",
"batchId": "",
"orgId": "test",
"businessId": "test",
"orgAccountId": "test",
"orgBankCode": "",
"beneAccountId": "test",
"beneBankId": "test",
"currencyCode": "SGD",
"amount": "53.0000",
"processingDate": "",
"status": "Pending",
"rejectCode": "test",
"stageId": "123",
"stageStatus": "Comment",
"stageUpdatedTime": "2019-08-05 18:11:05.999000",
"receivedTime": "2019-08-05 18:10:12.701000",
"sendTime": "2019-08-05 18:11:06.003000"
}
}
}
将所有列添加到顶层结构应提供预期的输出。在Scala中:
import org.apache.spark.sql.functions._
data.select(struct(data.columns:_*).as("txn_summary"))
在Java中,我怀疑是这样的:
import org.apache.spark.sql.functions.struct;
data.select(struct(data.columns()).as("txn_summary"));
我需要展平一个数据帧,以便将其与Spark(Scala)中的另一个数据帧连接起来。 基本上,我的2个数据帧有以下模式: 数据流1 DF2 老实说,我不知道如何使DF2变平。最后,我需要连接DF.field4 = DF2.field9上的2个数据帧 我用的是2.1.0 我的第一个想法是使用爆炸,但在Spark 2.1.0中已经被否决了,有人能给我一点提示吗?
我正在努力学习颤振,但我在JSON序列化上被卡住了。我在YouTube和Flitter文档中学习了一些教程,但我在序列化方面遇到了一些困难。你能帮我一点忙吗,这是为了教育目的,所以我更感兴趣的是背后的理论,而不是解决方案本身,然而,即使只有解决方案,我认为我可以尝试理解这个过程。我应该提到,我知道这一点,但在我的例子中,数据中有嵌套对象,这让我感到困惑。 作为响应数据的样本,它基本上是一个商店,每
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
如何解析嵌套列表的JSON字符串以在pyspark中触发数据帧? 输入数据帧: 预期产出: 示例代码: 有几个例子,但我不知道如何做到这一点: > < li> 如何在pyspark中解析和转换spark数据帧行中的json字符串 如何从pyspark中的spark数据帧行转换具有多个键的JSON字符串?
问题内容: http://play.golang.org/p/f6ilWnWTjm 我正在尝试解码以下字符串,但仅获取空值。 如何在Go中解码嵌套的JSON结构? 我想将以下内容转换为地图数据结构。 问题答案: 在Go中使用嵌套结构来匹配JSON中的嵌套结构。 这是一个如何处理示例JSON的示例: 游乐场链接 您还可以对内部结构使用匿名类型: 游乐场链接 或外部和内部结构: 游乐场链接 如果您不知
在火花批处理作业中,我通常将JSON数据源写入文件,并可以使用DataFrame阅读器的损坏列功能将损坏的数据写入单独的位置,并使用另一个阅读器从同一作业中写入有效数据。(数据写成拼花) 但是在火花结构流中,我首先通过Kafka作为字符串读取流,然后使用from_json来获取我的数据帧。然后from_json使用JsonToSTRts,它在解析器中使用FailFast模式,并且不会将未解析的字符