因此,我希望有条件地映射CSV文件的所有行,并将结果导出到另一个CSV文件中,每行的条件如下:
>
如果第4列的值不为null,那么该行第4、5、6和7列的值应存储为名为LastValueSof4to7的数组。(在数据集中,如果第4列中的元素不为null,则第1、2和3列为null,可以忽略)
如果第3列的值不为null,那么第1、2和3列的值以及上述lastValuesOf4to7数组中的四个元素应作为新行导出到另一个名为condensed.CSV的CSV文件中。(在数据集中,如果第3列中的元素不为null,则第4、5、6和7列为null,可以忽略)
import scala.io.Source
object structuringData {
def main(args: Array[String]) {
val data = Source.fromFile("/path/to/file.csv")
var lastValuesOf4to7 = Array("0","0","0","0")
val lines = data.getLines // Get the lines of the file
val splitLine = lines.map(s => s.split(',')).toArray // This gives an out of memory error since the original file is huge.
data.close
}
}
从上面的代码中可以看到,我试图将它移到数组中,但由于无法单独处理每一行,因此无法进一步进行。
我非常肯定,在Scala/Spark上处理csv文件一定有简单明了的解决方案。
使用Spark-csv包,然后使用Sql查询来查询数据,并根据您的用例制作过滤器,然后在最后导出它。
如果您使用的是Spark2.0.0,那么spark-csv将出现在spark-sql中,否则,如果您使用的是旧版本,请相应地添加依赖项。
您可以在这里找到指向spark-csv的链接。
如何在JMeter中将一个csv文件循环到另一个csv文件,其中第一个csv文件包含所有登录数据,另一个csv文件包含交易数据。我应该运行1个出纳员应该处理30笔交易的地方。
问题内容: 我遇到一个有趣的问题: file1.csv有几百行,例如: file2.csv大约有1100万行,例如: 我想做的是编写一个脚本,该脚本接受file1.csv中的每个DTime值,并在file2.csv的DateTime列中找到部分匹配的第一个实例,并输出DateTime,Bid,Ask询问该行。部分匹配位于前16个字符上。 这两个文件都是按照从最早到最新的顺序排序的,因此,如果fil
我想在现有的CSV文件中追加表。我使用下面的代码: 每次下面的代码块运行时,都会在data/outputs.CSV(其中outputs.CSV是文件夹而不是CSV文件)中创建一个新文件。
我对Spark和Scala是新手。我们将广告事件日志文件格式化为CSV,然后使用PKZIP进行压缩。我已经看到了许多关于如何使用Java解压缩压缩文件的示例,但是如何使用Scala for Spark来实现这一点呢?我们最终希望从每个传入文件中获取、提取并加载数据到Hbase目标表中。也许这可以用HadooprDD来完成吗?在这之后,我们将引入Spark streaming来监视这些文件。