当前位置: 首页 > 知识库问答 >
问题:

使用R sparklyr将多个Spark parquet文件加载到Spark表中?

咸昊昊
2023-03-14

我试图使用Rsparklyr将多个parquet文件加载到一个Spark表中。附加的代码显示了我是如何做到这一点的。

spark_load_data <- function(db_conn, test_period)
{
    library(DBI)
    #
    overwrite <- TRUE
    #
    for (ts in seq(as.Date(test_period["START_DATE","VALUE"]),
                   as.Date(test_period["END_DATE","VALUE"]),
                   by="day")) {
        #
        # date to load
        #
        td <- format(as.Date(ts,origin="1970-01-01"), "%Y-%m-%d")
        #
        # load parquet files
        #
        tbl <- "pcidata"
        pq_path <- paste0("s3://<path>/PciData/transaction_date=", td)
        read_in <- spark_read_parquet(db_conn, 
                                      name=tbl,
                                      path=pq_path,
                                      overwrite=overwrite)
        #
        overwrite <- FALSE
    }
}

共有1个答案

宣星光
2023-03-14

read.parquet方法实际上支持提供多个文件路径,这意味着我们可以编写一个简单的包装器:

read_parquet_multiple <- function(sc, paths) {
  spark_session(sc) %>% invoke("read") %>% invoke("parquet", as.list(paths))
}

然后使用它读取多个文件,例如(完整的示例包括连接到本地spark实例并编写2个parquet文件以供加载):

library(sparklyr); library(dplyr)
sc <- spark_connect(master = "local")

# Write 1:10 into 2 separate parquet files
sdf_seq(sc, 1, 3, repartition = NULL) %>% spark_write_parquet("batch_1")
sdf_seq(sc, 4, 6, repartition = NULL) %>% spark_write_parquet("batch_2")

# Read mulitple files
dataset <- sc %>% read_parquet_multiple(paths = c("batch_1", "batch_2"))

# Collect to show the results
dataset %>% collect()

# # A tibble: 6 x 1
# id
# <int>
#   1     2
#   2     3
#   3     5
#   4     6
#   5     1
#   6     4
 类似资料:
  • 问题内容: 我一直试图将多个文件加载到一个表中,以便它们适合同一行。 我可以将它们分别插入,但是问题出在值之内,因此我打算加入该表。如果发生这种情况,我会得到太多的值-无用的数据。 我实际上研究过的另一件事是将文件与 但是,它变成了一个烂摊子。如果第一种方法不起作用,那么我可以使用第二种方法,但是我也需要有关它的建议。 问题答案: 您可以将4个文件加载到4个(临时)表中(每个表都有一个自动编号的字

  • 我刚接触Cassandra Spark,并尝试使用Spark主集群将数据从文件加载到Cassandra表。我遵循以下链接中给出的步骤 http://docs.datastax.com/en/datastax_enterprise/4.7/datastax_enterprise/spark/sparkImportTxtCQL.html 在第8步,数据显示为整数数组,但当我使用相同的命令时,结果显示为

  • 为了完整起见,下面是toolbarview.fxml:

  • 问题内容: 我需要处理分散在各个目录中的多个文件。我想将所有这些加载到单个RDD中,然后在其上执行map / reduce。我看到SparkContext能够使用通配符从单个目录加载多个文件。我不确定如何从多个文件夹加载文件。 以下代码段失败: 这在第三个循环中失败,并显示以下错误消息: 鉴于我仅提供了两个参数,所以这很奇怪。任何指针表示赞赏。 问题答案: 措辞如何呢? 在Scala中,有两种变体

  • 我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据。以下是我正在做的: 我希望这个调用会给我一个文件前两列的列表,但我遇到了以下错误: 索引器中第1行的文件“”:列表索引超出范围 虽然我的CSV文件不止一列。

  • csv文件中的每一行结构如下: