当前位置: 首页 > 工具软件 > SparkR > 使用案例 >

SparkR

文寒
2023-12-01

概述

SparkR提供了轻量级的方式在R中使用Spark,SparkR实现了分布式的dataframe,支持类似查询,过滤和聚合等,(类似R中data frames : dplyr),这个可以操作大规模的数据集。

创建 SparkContext, SQLContext

SparkR的切入点是SparkContext,它可以连接R和Spark集群,通过SparkR.init()构建SparkContext,如果想用DataFrames,需要创建SQLContext,可以通过SparkContext来构建,如果使用SparkR shell, SparkContext和SQLContext会自动创建好。

sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

使用RStudio交互

可以通过RStudio连接R和Spark集群,确保Spark_HOME设置在环境变量中。这里因为没有从Spark shell启动,需要使用SparkR.init初始化

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "local[*]", sparkEnvir = list(spark.driver.memory="2g"))

这里master=local[*]是local模式,如果spark是集群环境,要指定它的master节点的ip和端口,如master = "spark://192.168.175.128:7070",在sparkEnvir参数中还可以设置其他的一些参数选项。

创建 DataFrames

使用SQLContext,可以通过以下方式创建DataFrames,可以是从本地的R data frame或hive table或其他的一些数据源。

1、从R data frame

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

2、从Data Sources

people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: integer (nullable = true)
#  |-- name: string (nullable = true)

3、从Hive tables

# sc is an existing SparkContext.
hiveContext <- sparkRHive.init(sc)

sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql(hiveContext, "FROM src SELECT key, value")

# results is now a DataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

DEMO

下面是一个运行Gaussian GLM model的一个示例:

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/usr/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sc <- sparkR.init(master = "spark://192.168.175.128:7070")
sqlContext <- sparkRSQL.init(sc)

#data <- read.csv('/home/xufly/Desktop/chicagoCensus.csv',header=TRUE,sep=",",fileEncoding = 'UTF-8')
df <- createDataFrame(sqlContext, iris)
# Fit a gaussian GLM model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model summary are returned in a similar format to R's native glm().
summary(model)

# Make predictions based on the model.
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))
 类似资料: