当前位置: 首页 > 知识库问答 >
问题:

合并数百万个具有不同标题的csv文件

易昌翰
2023-03-14

我有数百万个不同标题的csv文件,我想把它们合并到一个大数据框中。

我的问题是我尝试过的解决方案有效,但太慢了!顺便说一句,我可以访问Sparklyr在我的实验室中处理多节点集群,这个大数据工具会有帮助吗?

文件如下所示:

文件1

校长1,校长3,校长5

a、 b,c

文件2

校长4,校长2

e、 f

文件3

校长2,校长6

a, c

我想把它们合并成:

校长1,校长2,校长3,校长4,校长5,校长6

a、 ,b,c,f,e,a,c

我曾尝试将它们直接与R绑定,但程序在服务器中运行几天后崩溃了。代码如下所示:

library(plyr)
library(dplyr)
library(readr)


csvfiles <- list.files(pattern = "file\\d+.csv") 

for (i in 1:length(csvfiles)) {
  assign(paste0("files", i),read_csv(csvfiles[[i]]))
}

csvlist <- mget(ls(pattern = "files\\d"))

result <- data.frame()

for (i in 1:length(csvlist)){
  my_list <- list(result,csvlist[[i]])
  result <- rbindlist(my_list,use.names=TRUE, fill=TRUE)
 }

然后我尝试首先使用命令行工具提取标头,例如sedawkcsvtk。我使用的代码如下所示

for file in $(ls file*.csv); do cat $file | sed "2 d" | csvtk transpose   >> name_combined.csv; done

awk  '{ lines[$1] = $0 } END { for (l in lines) print lines[l] }' name_combined.csv >> long_head.csv

我得到了名为long_head的csv文件。csv,看起来是这样的(实际上我有3000多列)

校长1,校长2,校长3,校长4,校长5,校长6

然后我在dplyr中使用bind_rows。我想首先输出多个具有相同列的csv文件,然后将它们全部组合起来。

library(readr)
library(dplyr)

csvfiles <- list.files(pattern = "file\\d+.csv")
long_head <- read_csv("long_head.csv")

new_file <- paste("new_file",1:length(csvfiles),sep = "")

for (i in 1:length(csvfiles)) {
         bind_rows(long_head,read_csv(csvfiles[[i]]))  %>% 
            write_csv (file = paste0(new_file [[i]], ".csv"))
}

代码一天只能输出大约10万个csv文件,这意味着我要等一个月才能得到这些csv文件来合并它们。

我还尝试直接组合它们,而不编写多个csv文件:

library(readr)
library(dplyr)

csvfiles <- list.files(pattern = "file\\d+.csv")

long_head <- read_csv("long_head.csv")


for (i in 1:length(csvfiles)) {
  a <- bind_rows(read_csv(csvfiles[[i]]),long_head)
  result <- rbind(a,long_head)
}

它跑得更快,但也落后于我的预期。

共有3个答案

蓝恩
2023-03-14
  • 使用带有模式的dir来选择文件名;
  • 添加源文件列,以后会很有用;
  • 循环调用更简单;
  • 强制所有列为字符,读取多个文件时最安全的选项,如果遇到字段不匹配,readr解析猜测函数将中止。

注意:16个文件测试运行始终会使我的计算机在15MB 771列census.csv和180MB1.6M行beer_reviews.csv之间的某个地方崩溃。

library(readr)
library(dplyr)

setwd("/home/username/R/csv_test")

csvfiles <- dir(pattern = "\\.csv$")

csvdata  <- tibble(filename=c("Source File"))

for (i in csvfiles) {
  tmpfile <- read_csv(i, col_types = cols(.default = "c"))
  tmpfile$filename <- i
  csvdata <- bind_rows(csvdata, tmpfile)
}
csvdata
# A tibble: 1,622,379 x 874

...

定时10个文件测试运行,共有20k行和100列。在R中:

 user  system elapsed 
0.678   0.008   0.685 

以及本页上的Awk脚本:

real    0m2.202s
user    0m2.175s
sys     0m0.025s
吴炎彬
2023-03-14

下面是一个使用GNU awk的方法,它可以完整地读取文件。它将执行以下操作:

>

  • 读取每个文件的标题,然后关闭文件
  • 如果发现了新的头元素,请将其添加到当前已知元素的末尾。例如,存在以下标题:

    file1: A,B,D
    file2: A,C,E
    file3: A,E,D
    

    输出标题

    A,B,D,C,E
    

    这个脚本利用了什么是使用awk高效解析CSV的最健壮的方法

    创建一个文件merge\u csv。awk包含以下内容:

    BEGIN {
       OFS=","
       FPAT="[^,]*|\042[^\042]+\042"
       # keep track of the original argument count
       argc_start=ARGC
    }
    
    # Read header and process
    # header names are stored as array index in the array "header"
    # header order is stored in the array header_order
    #    header_order[field_index] = header_name
    (FNR == 1) && (ARGIND < argc_start) {
        for(i=1;i<=NF;++i) if (!($i in header)) { header[$i]; header_order[++nf_out]=$i } 
        # add file to end of argument list to be reprocessed
        ARGV[ARGC++] = FILENAME
        # process the next file
        nextfile
    }
    
    # Print headers in output file
    (FNR == 1) && (ARGIND == argc_start) {
        for(i=1;i<=nf_out;++i) printf header_order[i] (i==nf_out ? ORS : OFS)
    }
    
    # Use array h to keep track of the column_name and corresponding field_index
    # h[column_name] = field_index
    (FNR == 1) { delete h; for(i=1;i<=NF;++i) h[$i]=i; next }
    
    # print record
    {
        # process all fields
        for(i=1;i<=nf_out;++i) {
            # get field index using h
            j = h[header_order[i]]+0
            # if field index is zero, print empty field
            printf (j == 0 ? "" : $j) (i==nf_out ? ORS : OFS)
        }
    }
    

    现在,您可以按以下方式运行脚本:

    $ awk -f merge_csv.awk *.csv > output.csv
    

    这将无法处理大量CSV文件。这可以通过以下方式解决。假设您有一个文件文件列表。txt包含您想要的所有文件(可以通过find生成),然后将上述脚本添加为:

    BEGIN {
       OFS=","
       FPAT="[^,]*|\042[^\042]+\042"
    }
    
    # Read original filelist, and build argument list
    (FNR == NR) { ARGV[ARGC++]=$0; argc_start=ARGC; next }
    
    # Read header and process
    # header names are stored as array index in the array "header"
    # header order is stored in the array header_order
    #    header_order[field_index] = header_name
    (FNR == 1) && (ARGIND < argc_start) {
        for(i=1;i<=NF;++i) if (!($i in header)) { header[$i]; header_order[++nf_out]=$i } 
        # add file to end of argument list to be reprocessed
        ARGV[ARGC++] = FILENAME
        # process the next file
        nextfile
    }
    
    # Print headers in output file
    (FNR == 1) && (ARGIND == argc_start) {
        for(i=1;i<=nf_out;++i) printf header_order[i] (i==nf_out ? ORS : OFS)
    }
    
    # Use array h to keep track of the column_name and corresponding field_index
    # h[column_name] = field_index
    (FNR == 1) { delete h; for(i=1;i<=NF;++i) h[$i]=i; next }
    
    # print record
    {
        # process all fields
        for(i=1;i<=nf_out;++i) {
            # get field index using h
            j = h[header_order[i]]+0
            # if field index is zero, print empty field
            printf (j == 0 ? "" : $j) (i==nf_out ? ORS : OFS)
        }
    }
    

    现在,您可以按以下方式运行代码:

    $ awk -f merge_csv.awk filelist.txt
    

    如果您的文件列表真的太大,您可能希望使用拆分并使用循环来创建各种临时CSV文件,这些文件可以在第二次甚至第三次运行中再次合并。

  • 仲孙华奥
    2023-03-14

    这是一个具有挑战性的问题,需要考虑速度和内存消耗。

    如果我理解正确,OP希望合并数百万个小型csv文件。根据示例数据,每个文件仅由两行组成:第一行的标题和第二行的字符数据。列数和列名可能因文件而异。但是,所有列都具有相同的数据类型字符。

    OP的第一次尝试和M.Viking的答案都在迭代地增长结果对象。这是非常低效的,因为它需要反复复制相同的数据。此外,这两个版本都使用了readr软件包中的read_csv(),该软件包也不是最快的csv阅读器。

    为了避免结果对象反复增长,所有文件都会被读入一个列表,然后使用rbindlist()一次性组合。最终结果将存储为csv文件:

    library(data.table)
    file_names <- list.files(pattern = "file\\d+.csv")
    result <- rbindlist(lapply(file_names, fread), use.names=TRUE, fill=TRUE)
    fwrite(result, "result.csv")
    

    从OP的预期结果来看,这些列似乎应该按列名排序。这可以通过以下方式实现:

    library(magrittr)
    setcolorder(result, names(result) %>% sort())
    

    它会重新排列数据的列。表对象,即不复制整个对象。

    现在,让我们看看处理时间。为了进行基准测试,我创建了100k个文件(见下面的数据部分),这与OP的目标量相去甚远,但可以得出结论。

    在我的电脑上,整个处理时间约为5分钟:

    bench::workout({
      fn <- list.files(pattern = "file\\d+.csv")
      tmp_list <- lapply(fn, function(x) fread(file = x, sep =",", header = TRUE, colClasses = "character") )
      result <- rbindlist(tmp_list, use.names=TRUE, fill=TRUE)
      setcolorder(result, names(result) %>% sort())
      fwrite(result, "result.csv")
    }, 1:5)
    
    # A tibble: 5 x 3                                                                                                                    
      exprs       process     real
      <bch:expr> <bch:tm> <bch:tm>
    1 1           562.5ms 577.19ms
    2 2             1.81m    4.52m
    3 3            14.05s   15.55s
    4 4           15.62ms  175.1ms
    5 5              2.2s    7.72s
    

    在这里,我使用了bench包中的workout()函数对单个表达式进行计时,以识别花费最多时间的语句。批量读取csv文件。

    物体的大小也很重要。综合数据。表result有100k行和1000列,占用800MB,临时列表仅占11%。这是因为有许多空单元格。

    pryr::object_size(result)
    
    800 MB
    
    pryr::object_size(tmp_list)
    
    87.3 MB
    

    顺便说一下,结果文件“result.csv”的磁盘大小为98 MB。

    计算时间似乎不是主要问题,而是存储结果所需的内存。

    如果读取100k文件需要大约5分钟我猜读取1M文件可能需要大约50分钟。

    但是,对于具有3000列的1M文件,结果data.table可能需要10 * 3 = 30倍的内存这是24 GB。临时列表可能只需要大约900 MBytes。所以,重新考虑结果数据结构可能是值得的。

    上述时间显示,超过90%的计算时间用于读取数据文件。因此,对读取CSV文件的不同方法进行基准测试是合适的:

    • read.csv()从基R
    • read_csv()来自readr
    • fread()来自data.table

    为了方便用户,这三个函数都可以猜测文件的某些特征,例如字段分隔符或数据类型。这可能需要额外的计算时间。因此,这些函数也使用明确说明的文件参数进行基准测试。

    对于基准测试,使用了bench包,因为它还测量分配的内存,这可能是计算时间之外的另一个限制因素。对不同数量的文件重复基准测试,以研究其对内存消耗的影响。

    library(data.table)
    library(readr)
    file_names <- list.files(pattern = "file\\d+.csv")
    bm <- press(
      n_files = c(1000, 2000, 5000, 10000),
      {
        fn <- file_names[seq_len(n_files)] 
        mark(
          fread = lapply(fn, fread),
          fread_p = lapply(fn, function(x) fread(file = x, sep =",", header = TRUE, colClasses = "character")),
          # fread_pp = lapply(fn, fread, sep =",", header = TRUE, colClasses = "character"),
          read.csv = lapply(fn, read.csv),
          read.csv_p = lapply(fn, read.csv, colClasses = "character"),
          read_csv = lapply(fn, read_csv),
          read_csv_p = lapply(fn, read_csv, col_types = cols(.default = col_character())),
          check = FALSE,
          min_time = 10
        )
      }
    )
    

    结果通过以下方式可视化:

    library(ggplot2)
    ggplot(bm) + aes(n_files, median, color = names(expression)) + 
      geom_point() + geom_line() + scale_x_log10()
    ggsave("median.png")
    ggplot(bm) + aes(n_files, mem_alloc, color = names(expression)) + 
      geom_point() + geom_line() + scale_x_log10()
    ggsave("mem_alloc.png")
    ggplot(bm) + aes(median, mem_alloc, color = names(expression)) + geom_point() + 
      facet_wrap(vars(n_files))
    ggsave("mem_allov_vs_median.png")
    

    在比较中值执行时间时,我们可以观察到(请注意双对数标度)

    • 计算时间几乎与文件数量呈线性增长;
    • 显式传递文件参数(定时命名为..._p)总是比猜测参数提供性能增益,特别是对于read_csv()
    • 用于读取许多小文件read.csv()fread()快,而read_csv()因因素而变慢。

    当比较分配的内存时,我们可以观察到(再次注意双对数刻度)

    • 内存消耗几乎与文件数呈线性增长

    如上所述,速度和内存消耗在这里可能至关重要。现在,阅读。csv()似乎是速度方面的最佳选择,而fread()则是内存消耗方面的最佳选择,从下面的分面散点图可以看出。

    我个人的选择是更喜欢fread()(更少的内存消耗)而不是阅读。csv()(更快),因为我电脑上的内存有限,无法轻松扩展。你的里程可能会有所不同。

    以下代码用于创建10万个示例文件:

    library(magrittr)   # piping used to improve readability
    n_files <- 10^4L
    max_header <- 10^3L
    avg_cols <- 4L
    headers <- sprintf("header%02i", seq_len(max_header))
    set.seed(1L)   # to ensure reproducible results
    for (i in seq_len(n_files)) {
      n_cols <- rpois(1L, avg_cols - 1L) + 1L # exclude 0
      header <- sample(headers, n_cols)
      file_name <- sprintf("file%i.csv", i)
      sample(letters, n_cols, TRUE) %>% 
        as.list() %>% 
        as.data.frame() %>%
        set_names(header) %>% 
        data.table::fwrite(file_name)
    }
    
     类似资料:
    • 嗨,我正在寻找一个最快的解决方案来处理csv文件的负载。 情况:我在一个文件夹中有多个csv文件,它们的标题不同 我已经对它们进行了预处理,以删除顶部的垃圾行,因此所有这些都有一个标准标头。 我想将一组CSV文件与完全相同的侦听器合并到一个新文件夹中 示例文件-1。csv 示例文件-2。csv 样本文件-3。csv 样本文件-4。csv 样本文件-5。csv 样品File-6.csv 样本文件-7

    • 问题内容: 我有以下数据集。 https://drive.google.com/drive/folders/1NRelNsXQJ7MTNKcm-T69N6r5ZsOyFmTS?usp=sharing 如果列名称与工作表名称相同,则将所有内容合并在一起作为单独的列,以下是代码 运行以上代码后的数据 merged_data 如何合并条件文件? 健康)状况。 以上代码段中的价格1指向带有名称为int 7

    • 问题内容: 我正在使用Python进行一些数据分析。我有两个表,第一个(叫它“ A”)有1000万行和10列,第二个(“ B”)有7300万行和2列。他们有1个具有共同ID的列,我想根据该列将两个表相交。特别是我想要表的内部联接。 我无法将表B作为pandas数据框加载到内存中,以在pandas上使用常规合并功能。我尝试通过读取表B上的文件的块,将每个块与A相交,并将这些交集连接起来(内部联接的输

    • 问题内容: 我有两个表(表A和表B)。 它们具有不同的列数-假设表A具有更多列。 如何合并这两个表,并为表B没有的列获取空值? 问题答案: 为具有较少列的表添加额外的列作为null

    • 问题内容: 我有一些具有相同列标题的CSV文件。例如 文件A 文件B 我想将其合并,以便将数据合并到一个文件中,文件头位于顶部,但其他任何地方都没有文件头。 什么是实现此目标的好方法? 问题答案: 这应该工作。它检查要合并的文件是否具有匹配的头。否则将引发异常。异常处理(关闭流等)已作为练习。

    • 我有一些具有相同列标题的CSV文件。例如 文件A 文件B 我想合并它,以便将数据合并到一个文件中,标题在顶部,但其他地方没有标题。 实现这一目标的好方法是什么?