MongoDB Connector for Spark
Spark Connector Scala Guide
spark-shell --jars "mongo-spark-connector_2.11-2.0.0.jar,mongo-hadoop-core-2.0.2.jar,mongo-java-driver-3.4.2.jar"
import org.apache.spark.sql.SparkSession
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.bson.Document
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnector")
.config("spark.some.config.option", "some-value")
.getOrCreate()
val uri = "mongodb://172.1.1.1:27017"
val userDF = spark.sql("""
select
uid,
name,
current_date() version
from test_table
limit 100
""").repartition(8)
// Write to MongoDB
userDF.write.mode("overwrite").format("com.mongodb.spark.sql").options(
Map(
"uri" -> uri,
"database" -> "test",
"collection" -> "test_table")).save()
// Read From MongoDB
val df = spark.read.format("com.mongodb.spark.sql").options(
Map(
"uri" -> uri,
"database" -> "test",
"collection" -> "test_table")).load()
// 其他方式
userDF.write.mode("overwrite").format("com.mongodb.spark.sql").options(
Map(
"spark.mongodb.input.uri" -> uri,
"spark.mongodb.output.uri" -> uri,
"spark.mongodb.output.database" -> "test",
"spark.mongodb.output.collection" -> "test_table")).save()
MongoSpark.save(
userDF.write.mode("overwrite").options(
Map(
"spark.mongodb.input.uri" -> uri,
"spark.mongodb.output.uri" -> uri,
"spark.mongodb.output.database" -> "test",
"spark.mongodb.output.collection" -> "test_table")))
MongoSpark.save(
userDF.write.mode("overwrite").options(
Map(
"uri" -> uri,
"database" -> "test",
"collection" -> "test_table")))
spark.stop()