当前位置: 首页 > 知识库问答 >
问题:

将REST API中的CSV处理成Spark

马阳曦
2023-03-14
val resultCsv = scala.io.Source.fromURL(url).getLines()

共有1个答案

茅鸿宝
2023-03-14

这是可以做到的。

对于Spark 2.2.x

import scala.io.Source._
import org.apache.spark.sql.{Dataset, SparkSession}

var res = fromURL(url).mkString.stripMargin.lines.toList
val csvData: Dataset[String] = spark.sparkContext.parallelize(res).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.printSchema()

对旧版本的Spark使用databricks lib

import scala.io.Source._
import com.databricks.spark.csv.CsvParser

var res = fromURL(url).mkString.stripMargin.lines.toList
val csvData: Dataset[String] = spark.sparkContext.parallelize(res)

val csvParser = new CsvParser()
  .withUseHeader(true)
  .withInferSchema(true)

val frame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
frame.printSchema()
 类似资料:
  • 我是PowerShell的新手。我正在尝试创建一个脚本,它将通过CSV和Active Directory组进行查看。如果用户不在CSV中,我想删除该用户(当前正在使用写输出进行测试)。我必须使用UserPrincipalName。我的CSV只是 排名,编号 某物,3333//此用户是AD GROUP 某物,2222//此用户不在AD组中 目前,我想让它只输出3333,但它两者都输出。 导入模块活动

  • 在下面的代码中,如何避免处理的第一行? file1.csv是: 而file2.csv是: 我试图减去人和他/她的宠物的年龄,如果它出现在两个CSV中,但我得到以下错误,因为它也处理第一行:

  • 作为我正在构建的应用程序的一部分,我正在使用csv-parse读取和操作大型(约5.5GB,800万行)csv文件。我让这个过程运行得相对平稳,但我被困在一个项目上——捕捉由不一致的列数引发的错误。 我之所以使用管道函数,是因为它与应用程序的其余部分配合得很好,但我的问题是,如何将解析器抛出的错误重定向到日志并允许该过程继续? 我认识到,我可以使用选项跳过列数不一致的记录,该选项几乎就足够了。问题

  • 我正在使用Spark,Flink创建流式分析应用程序 我在简单的Scala应用程序中完美地运行Spark/Flink作业,并通过Spark提交此作业 如何整合我的Spark 到目前为止,我尝试了Lagom Microservice,但我发现了很多问题,您可以检查 在Lagom Microservice中摄取流式数据的最佳方法 我认为我没有为流处理微服务应用程序选择正确的方向。正在寻找正确的方向来通

  • 问题内容: 我有一个CSV,第一行包含字段名称。示例数据是… 我需要将数据格式化为键-值对数组,其中键名是列标题。我想第1行会是这样的: 第2行 …以及第3行… 除了静态以外,我不确定如何执行此操作-问题是带有关联数据的列可能从一个文件更改为下一个文件…重新排列,删除或添加了列。 非常感谢您的想法。 问题答案:

  • 开源世界中成功的价值很重。随着你的软件的流行,搜寻信息的人也会急剧增加,而能够提供信息的人则增长的慢很多。此外,即使比率能够保持平衡,在大多数开源项目中处理交流也依然存在基础扩展问题。以邮件列表为例。大多数项目有一个一般用户问题的邮件列表—有时,列表的名称是“users”、“discuss”、“help”或其他。无论名称是什么,列表的目的相同:为人们获取问题的答案提供一个场所,同时其他人可以观看并