SparkR提供了轻量级的方式在R中使用Spark,SparkR实现了分布式的dataframe,支持类似查询,过滤和聚合等,(类似R中data frames : dplyr),这个可以操作大规模的数据集。
SparkR的切入点是SparkContext,它可以连接R和Spark集群,通过SparkR.init()构建SparkContext,如果想用DataFrames,需要创建SQLContext,可以通过SparkContext来构建,如果使用SparkR shell, SparkContext和SQLContext会自动创建好。
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
可以通过RStudio连接R和Spark集群,确保Spark_HOME设置在环境变量中。这里因为没有从Spark shell启动,需要使用SparkR.init初始化
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))
这里master=local[*]
是local模式,如果spark是集群环境,要指定它的master节点的ip和端口,如master = "spark://192.168.175.128:7070"
,在sparkEnvir参数中还可以设置其他的一些参数选项。
使用SQLContext,可以通过以下方式创建DataFrames,可以是从本地的R data frame或hive table或其他的一些数据源。
df <- createDataFrame(sqlContext, faithful)
# Displays the content of the DataFrame to stdout
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: integer (nullable = true)
# |-- name: string (nullable = true)
# sc is an existing SparkContext.
hiveContext <- sparkRHive.init(sc)
sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql(hiveContext, "FROM src SELECT key, value")
# results is now a DataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
下面是一个运行Gaussian GLM model的一个示例:
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/usr/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "spark://192.168.175.128:7070")
sqlContext <- sparkRSQL.init(sc)
#data <- read.csv('/home/xufly/Desktop/chicagoCensus.csv',header=TRUE,sep=",",fileEncoding = 'UTF-8')
df <- createDataFrame(sqlContext, iris)
# Fit a gaussian GLM model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
# Model summary are returned in a similar format to R's native glm().
summary(model)
# Make predictions based on the model.
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))