我有数百万个不同标题的csv文件,我想把它们合并到一个大数据框中。
我的问题是我尝试过的解决方案有效,但太慢了!顺便说一句,我可以访问Sparklyr在我的实验室中处理多节点集群,这个大数据工具会有帮助吗?
文件如下所示:
文件1
校长1,校长3,校长5
a、 b,c
文件2
校长4,校长2
e、 f
文件3
校长2,校长6
a, c
我想把它们合并成:
校长1,校长2,校长3,校长4,校长5,校长6
a、 ,b,c,f,e,a,c
我曾尝试将它们直接与R绑定,但程序在服务器中运行几天后崩溃了。代码如下所示:
library(plyr)
library(dplyr)
library(readr)
csvfiles <- list.files(pattern = "file\\d+.csv")
for (i in 1:length(csvfiles)) {
assign(paste0("files", i),read_csv(csvfiles[[i]]))
}
csvlist <- mget(ls(pattern = "files\\d"))
result <- data.frame()
for (i in 1:length(csvlist)){
my_list <- list(result,csvlist[[i]])
result <- rbindlist(my_list,use.names=TRUE, fill=TRUE)
}
然后我尝试首先使用命令行工具提取标头,例如sed
、awk
和csvtk
。我使用的代码如下所示
for file in $(ls file*.csv); do cat $file | sed "2 d" | csvtk transpose >> name_combined.csv; done
awk '{ lines[$1] = $0 } END { for (l in lines) print lines[l] }' name_combined.csv >> long_head.csv
我得到了名为long_head的csv文件。csv,看起来是这样的(实际上我有3000多列)
校长1,校长2,校长3,校长4,校长5,校长6
然后我在dplyr
中使用bind_rows
。我想首先输出多个具有相同列的csv文件,然后将它们全部组合起来。
library(readr)
library(dplyr)
csvfiles <- list.files(pattern = "file\\d+.csv")
long_head <- read_csv("long_head.csv")
new_file <- paste("new_file",1:length(csvfiles),sep = "")
for (i in 1:length(csvfiles)) {
bind_rows(long_head,read_csv(csvfiles[[i]])) %>%
write_csv (file = paste0(new_file [[i]], ".csv"))
}
代码一天只能输出大约10万个csv文件,这意味着我要等一个月才能得到这些csv文件来合并它们。
我还尝试直接组合它们,而不编写多个csv文件:
library(readr)
library(dplyr)
csvfiles <- list.files(pattern = "file\\d+.csv")
long_head <- read_csv("long_head.csv")
for (i in 1:length(csvfiles)) {
a <- bind_rows(read_csv(csvfiles[[i]]),long_head)
result <- rbind(a,long_head)
}
它跑得更快,但也落后于我的预期。
dir
来选择文件名;readr
解析猜测函数将中止。注意:16个文件测试运行始终会使我的计算机在15MB 771列census.csv
和180MB1.6M行beer_reviews.csv
之间的某个地方崩溃。
library(readr)
library(dplyr)
setwd("/home/username/R/csv_test")
csvfiles <- dir(pattern = "\\.csv$")
csvdata <- tibble(filename=c("Source File"))
for (i in csvfiles) {
tmpfile <- read_csv(i, col_types = cols(.default = "c"))
tmpfile$filename <- i
csvdata <- bind_rows(csvdata, tmpfile)
}
csvdata
# A tibble: 1,622,379 x 874
...
定时10个文件测试运行,共有20k行和100列。在R中:
user system elapsed
0.678 0.008 0.685
以及本页上的Awk脚本:
real 0m2.202s
user 0m2.175s
sys 0m0.025s
下面是一个使用GNU awk的方法,它可以完整地读取文件。它将执行以下操作:
>
如果发现了新的头元素,请将其添加到当前已知元素的末尾。例如,存在以下标题:
file1: A,B,D
file2: A,C,E
file3: A,E,D
输出标题
A,B,D,C,E
这个脚本利用了什么是使用awk高效解析CSV的最健壮的方法?
创建一个文件merge\u csv。awk
包含以下内容:
BEGIN {
OFS=","
FPAT="[^,]*|\042[^\042]+\042"
# keep track of the original argument count
argc_start=ARGC
}
# Read header and process
# header names are stored as array index in the array "header"
# header order is stored in the array header_order
# header_order[field_index] = header_name
(FNR == 1) && (ARGIND < argc_start) {
for(i=1;i<=NF;++i) if (!($i in header)) { header[$i]; header_order[++nf_out]=$i }
# add file to end of argument list to be reprocessed
ARGV[ARGC++] = FILENAME
# process the next file
nextfile
}
# Print headers in output file
(FNR == 1) && (ARGIND == argc_start) {
for(i=1;i<=nf_out;++i) printf header_order[i] (i==nf_out ? ORS : OFS)
}
# Use array h to keep track of the column_name and corresponding field_index
# h[column_name] = field_index
(FNR == 1) { delete h; for(i=1;i<=NF;++i) h[$i]=i; next }
# print record
{
# process all fields
for(i=1;i<=nf_out;++i) {
# get field index using h
j = h[header_order[i]]+0
# if field index is zero, print empty field
printf (j == 0 ? "" : $j) (i==nf_out ? ORS : OFS)
}
}
现在,您可以按以下方式运行脚本:
$ awk -f merge_csv.awk *.csv > output.csv
这将无法处理大量CSV文件。这可以通过以下方式解决。假设您有一个文件文件列表。txt
包含您想要的所有文件(可以通过find
生成),然后将上述脚本添加为:
BEGIN {
OFS=","
FPAT="[^,]*|\042[^\042]+\042"
}
# Read original filelist, and build argument list
(FNR == NR) { ARGV[ARGC++]=$0; argc_start=ARGC; next }
# Read header and process
# header names are stored as array index in the array "header"
# header order is stored in the array header_order
# header_order[field_index] = header_name
(FNR == 1) && (ARGIND < argc_start) {
for(i=1;i<=NF;++i) if (!($i in header)) { header[$i]; header_order[++nf_out]=$i }
# add file to end of argument list to be reprocessed
ARGV[ARGC++] = FILENAME
# process the next file
nextfile
}
# Print headers in output file
(FNR == 1) && (ARGIND == argc_start) {
for(i=1;i<=nf_out;++i) printf header_order[i] (i==nf_out ? ORS : OFS)
}
# Use array h to keep track of the column_name and corresponding field_index
# h[column_name] = field_index
(FNR == 1) { delete h; for(i=1;i<=NF;++i) h[$i]=i; next }
# print record
{
# process all fields
for(i=1;i<=nf_out;++i) {
# get field index using h
j = h[header_order[i]]+0
# if field index is zero, print empty field
printf (j == 0 ? "" : $j) (i==nf_out ? ORS : OFS)
}
}
现在,您可以按以下方式运行代码:
$ awk -f merge_csv.awk filelist.txt
如果您的文件列表真的太大,您可能希望使用拆分
并使用循环来创建各种临时CSV文件,这些文件可以在第二次甚至第三次运行中再次合并。
这是一个具有挑战性的问题,需要考虑速度和内存消耗。
如果我理解正确,OP希望合并数百万个小型csv文件。根据示例数据,每个文件仅由两行组成:第一行的标题和第二行的字符数据。列数和列名可能因文件而异。但是,所有列都具有相同的数据类型字符。
OP的第一次尝试和M.Viking的答案都在迭代地增长结果对象。这是非常低效的,因为它需要反复复制相同的数据。此外,这两个版本都使用了readr
软件包中的read_csv()
,该软件包也不是最快的csv阅读器。
为了避免结果对象反复增长,所有文件都会被读入一个列表,然后使用rbindlist()
一次性组合。最终结果将存储为csv文件:
library(data.table)
file_names <- list.files(pattern = "file\\d+.csv")
result <- rbindlist(lapply(file_names, fread), use.names=TRUE, fill=TRUE)
fwrite(result, "result.csv")
从OP的预期结果来看,这些列似乎应该按列名排序。这可以通过以下方式实现:
library(magrittr)
setcolorder(result, names(result) %>% sort())
它会重新排列数据的列。表对象,即不复制整个对象。
现在,让我们看看处理时间。为了进行基准测试,我创建了100k个文件(见下面的数据部分),这与OP的目标量相去甚远,但可以得出结论。
在我的电脑上,整个处理时间约为5分钟:
bench::workout({
fn <- list.files(pattern = "file\\d+.csv")
tmp_list <- lapply(fn, function(x) fread(file = x, sep =",", header = TRUE, colClasses = "character") )
result <- rbindlist(tmp_list, use.names=TRUE, fill=TRUE)
setcolorder(result, names(result) %>% sort())
fwrite(result, "result.csv")
}, 1:5)
# A tibble: 5 x 3
exprs process real
<bch:expr> <bch:tm> <bch:tm>
1 1 562.5ms 577.19ms
2 2 1.81m 4.52m
3 3 14.05s 15.55s
4 4 15.62ms 175.1ms
5 5 2.2s 7.72s
在这里,我使用了bench
包中的workout()
函数对单个表达式进行计时,以识别花费最多时间的语句。批量读取csv文件。
物体的大小也很重要。综合数据。表result
有100k行和1000列,占用800MB,临时列表仅占11%。这是因为有许多空单元格。
pryr::object_size(result)
800 MB
pryr::object_size(tmp_list)
87.3 MB
顺便说一下,结果文件“result.csv”
的磁盘大小为98 MB。
计算时间似乎不是主要问题,而是存储结果所需的内存。
如果读取100k文件需要大约5分钟我猜读取1M文件可能需要大约50分钟。
但是,对于具有3000列的1M文件,结果data.table可能需要10 * 3 = 30倍的内存这是24 GB。临时列表可能只需要大约900 MBytes。所以,重新考虑结果数据结构可能是值得的。
上述时间显示,超过90%的计算时间用于读取数据文件。因此,对读取CSV文件的不同方法进行基准测试是合适的:
read.csv()
从基Rread_csv()
来自readr
包fread()
来自data.table
包为了方便用户,这三个函数都可以猜测文件的某些特征,例如字段分隔符或数据类型。这可能需要额外的计算时间。因此,这些函数也使用明确说明的文件参数进行基准测试。
对于基准测试,使用了bench
包,因为它还测量分配的内存,这可能是计算时间之外的另一个限制因素。对不同数量的文件重复基准测试,以研究其对内存消耗的影响。
library(data.table)
library(readr)
file_names <- list.files(pattern = "file\\d+.csv")
bm <- press(
n_files = c(1000, 2000, 5000, 10000),
{
fn <- file_names[seq_len(n_files)]
mark(
fread = lapply(fn, fread),
fread_p = lapply(fn, function(x) fread(file = x, sep =",", header = TRUE, colClasses = "character")),
# fread_pp = lapply(fn, fread, sep =",", header = TRUE, colClasses = "character"),
read.csv = lapply(fn, read.csv),
read.csv_p = lapply(fn, read.csv, colClasses = "character"),
read_csv = lapply(fn, read_csv),
read_csv_p = lapply(fn, read_csv, col_types = cols(.default = col_character())),
check = FALSE,
min_time = 10
)
}
)
结果通过以下方式可视化:
library(ggplot2)
ggplot(bm) + aes(n_files, median, color = names(expression)) +
geom_point() + geom_line() + scale_x_log10()
ggsave("median.png")
ggplot(bm) + aes(n_files, mem_alloc, color = names(expression)) +
geom_point() + geom_line() + scale_x_log10()
ggsave("mem_alloc.png")
ggplot(bm) + aes(median, mem_alloc, color = names(expression)) + geom_point() +
facet_wrap(vars(n_files))
ggsave("mem_allov_vs_median.png")
在比较中值执行时间时,我们可以观察到(请注意双对数标度)
..._p
)总是比猜测参数提供性能增益,特别是对于read_csv()
;read.csv()
比fread()
快,而read_csv()
因因素而变慢。当比较分配的内存时,我们可以观察到(再次注意双对数刻度)
如上所述,速度和内存消耗在这里可能至关重要。现在,阅读。csv()
似乎是速度方面的最佳选择,而fread()
则是内存消耗方面的最佳选择,从下面的分面散点图可以看出。
我个人的选择是更喜欢fread()
(更少的内存消耗)而不是阅读。csv()
(更快),因为我电脑上的内存有限,无法轻松扩展。你的里程可能会有所不同。
以下代码用于创建10万个示例文件:
library(magrittr) # piping used to improve readability
n_files <- 10^4L
max_header <- 10^3L
avg_cols <- 4L
headers <- sprintf("header%02i", seq_len(max_header))
set.seed(1L) # to ensure reproducible results
for (i in seq_len(n_files)) {
n_cols <- rpois(1L, avg_cols - 1L) + 1L # exclude 0
header <- sample(headers, n_cols)
file_name <- sprintf("file%i.csv", i)
sample(letters, n_cols, TRUE) %>%
as.list() %>%
as.data.frame() %>%
set_names(header) %>%
data.table::fwrite(file_name)
}
嗨,我正在寻找一个最快的解决方案来处理csv文件的负载。 情况:我在一个文件夹中有多个csv文件,它们的标题不同 我已经对它们进行了预处理,以删除顶部的垃圾行,因此所有这些都有一个标准标头。 我想将一组CSV文件与完全相同的侦听器合并到一个新文件夹中 示例文件-1。csv 示例文件-2。csv 样本文件-3。csv 样本文件-4。csv 样本文件-5。csv 样品File-6.csv 样本文件-7
问题内容: 我有以下数据集。 https://drive.google.com/drive/folders/1NRelNsXQJ7MTNKcm-T69N6r5ZsOyFmTS?usp=sharing 如果列名称与工作表名称相同,则将所有内容合并在一起作为单独的列,以下是代码 运行以上代码后的数据 merged_data 如何合并条件文件? 健康)状况。 以上代码段中的价格1指向带有名称为int 7
问题内容: 我正在使用Python进行一些数据分析。我有两个表,第一个(叫它“ A”)有1000万行和10列,第二个(“ B”)有7300万行和2列。他们有1个具有共同ID的列,我想根据该列将两个表相交。特别是我想要表的内部联接。 我无法将表B作为pandas数据框加载到内存中,以在pandas上使用常规合并功能。我尝试通过读取表B上的文件的块,将每个块与A相交,并将这些交集连接起来(内部联接的输
问题内容: 我有两个表(表A和表B)。 它们具有不同的列数-假设表A具有更多列。 如何合并这两个表,并为表B没有的列获取空值? 问题答案: 为具有较少列的表添加额外的列作为null
问题内容: 我有一些具有相同列标题的CSV文件。例如 文件A 文件B 我想将其合并,以便将数据合并到一个文件中,文件头位于顶部,但其他任何地方都没有文件头。 什么是实现此目标的好方法? 问题答案: 这应该工作。它检查要合并的文件是否具有匹配的头。否则将引发异常。异常处理(关闭流等)已作为练习。
我有一些具有相同列标题的CSV文件。例如 文件A 文件B 我想合并它,以便将数据合并到一个文件中,标题在顶部,但其他地方没有标题。 实现这一目标的好方法是什么?