我试图使用Rsparklyr
将多个parquet文件加载到一个Spark表中。附加的代码显示了我是如何做到这一点的。
spark_load_data <- function(db_conn, test_period)
{
library(DBI)
#
overwrite <- TRUE
#
for (ts in seq(as.Date(test_period["START_DATE","VALUE"]),
as.Date(test_period["END_DATE","VALUE"]),
by="day")) {
#
# date to load
#
td <- format(as.Date(ts,origin="1970-01-01"), "%Y-%m-%d")
#
# load parquet files
#
tbl <- "pcidata"
pq_path <- paste0("s3://<path>/PciData/transaction_date=", td)
read_in <- spark_read_parquet(db_conn,
name=tbl,
path=pq_path,
overwrite=overwrite)
#
overwrite <- FALSE
}
}
read.parquet
方法实际上支持提供多个文件路径,这意味着我们可以编写一个简单的包装器:
read_parquet_multiple <- function(sc, paths) {
spark_session(sc) %>% invoke("read") %>% invoke("parquet", as.list(paths))
}
然后使用它读取多个文件,例如(完整的示例包括连接到本地spark实例并编写2个parquet文件以供加载):
library(sparklyr); library(dplyr)
sc <- spark_connect(master = "local")
# Write 1:10 into 2 separate parquet files
sdf_seq(sc, 1, 3, repartition = NULL) %>% spark_write_parquet("batch_1")
sdf_seq(sc, 4, 6, repartition = NULL) %>% spark_write_parquet("batch_2")
# Read mulitple files
dataset <- sc %>% read_parquet_multiple(paths = c("batch_1", "batch_2"))
# Collect to show the results
dataset %>% collect()
# # A tibble: 6 x 1
# id
# <int>
# 1 2
# 2 3
# 3 5
# 4 6
# 5 1
# 6 4
问题内容: 我一直试图将多个文件加载到一个表中,以便它们适合同一行。 我可以将它们分别插入,但是问题出在值之内,因此我打算加入该表。如果发生这种情况,我会得到太多的值-无用的数据。 我实际上研究过的另一件事是将文件与 但是,它变成了一个烂摊子。如果第一种方法不起作用,那么我可以使用第二种方法,但是我也需要有关它的建议。 问题答案: 您可以将4个文件加载到4个(临时)表中(每个表都有一个自动编号的字
我刚接触Cassandra Spark,并尝试使用Spark主集群将数据从文件加载到Cassandra表。我遵循以下链接中给出的步骤 http://docs.datastax.com/en/datastax_enterprise/4.7/datastax_enterprise/spark/sparkImportTxtCQL.html 在第8步,数据显示为整数数组,但当我使用相同的命令时,结果显示为
为了完整起见,下面是toolbarview.fxml:
问题内容: 我需要处理分散在各个目录中的多个文件。我想将所有这些加载到单个RDD中,然后在其上执行map / reduce。我看到SparkContext能够使用通配符从单个目录加载多个文件。我不确定如何从多个文件夹加载文件。 以下代码段失败: 这在第三个循环中失败,并显示以下错误消息: 鉴于我仅提供了两个参数,所以这很奇怪。任何指针表示赞赏。 问题答案: 措辞如何呢? 在Scala中,有两种变体
我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据。以下是我正在做的: 我希望这个调用会给我一个文件前两列的列表,但我遇到了以下错误: 索引器中第1行的文件“”:列表索引超出范围 虽然我的CSV文件不止一列。
csv文件中的每一行结构如下: