Barrier Execution Mode
机器学相关,不适合map-reduce模式的计算,比如MPI
Built-in Higher-order Functions
构造高阶方法,支持多种复杂类型操作(数组等)
SELECT array_distinct(array(1, 2, 3, null, 3));
数组去重
SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));
数组交集
databrick notebook介绍
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/142158605138935/3773509768457258/7497868276316206/latest.html
Built-in Avro Data Source
支持avro源,dataframe 增加from_avro() and to_avro() 方法,两倍读吞吐增加,10%写吞吐增加
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read
.format("com.databricks.spark.avro")
.load("src/test/resources/episodes.avro")
val spark = SparkSession.builder().master("local").getOrCreate()
// 配置用deflate 压缩
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.avro("src/test/resources/episodes.avro")
// writes out compressed Avro records
df.write.avro("/tmp/output")
//写入partition
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.partitionBy("year", "month").avro("/tmp/output")
//写入name与namespace信息
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read.avro("src/test/resources/episodes.avro")
val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).avro("/tmp/output")
Experimental Scala 2.12 Support
实验性的支持scala 2.12
Pandas UDF Improvement
改善pandas udf
Image Data Source
针对图形数据源的支持
df = spark.read.format("image").load("...")
Kubernetes Integration Enhancement
提升对kubernetes的支持,在container中支持整合sparkPy,sparkR环境,可以在客户端与pod交互,支持mounting文件系统: emptyDir, hostPath, and persistentVolumeClaim
Flexible Streaming Sink
dataframe包括一些广泛的外部框架的输出,而sink只支持少数的框架,可以让sink中使用dataframe方法输出
streamingDF.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write // Use Cassandra batch data source to write streaming out
.cassandraFormat(tableName, keyspace)
.option("cluster", clusterName)
.mode("append")
.save()
}
避免重新计算并输出多个目的地
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.cache()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.uncache()
}
sql中增加repartition与coalesce方法
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
支持pivot
Date Temp (°F)
07-22-2018 86
07-23-2018 90
07-24-2018 91
07-25-2018 92
07-26-2018 92
07-27-2018 88
07-28-2018 85
07-29-2018 94
07-30-2018 89
SELECT * FROM (
SELECT year(date) year, month(date) month, temp
FROM high_temps
WHERE date BETWEEN DATE '2015-01-01' AND DATE '2018-08-31'
)
PIVOT (
CAST(avg(temp) AS DECIMAL(4, 1))
FOR month in (
1 JAN, 2 FEB, 3 MAR, 4 APR, 5 MAY, 6 JUN,
7 JUL, 8 AUG, 9 SEP, 10 OCT, 11 NOV, 12 DEC
)
)
YEAR JAN FEB MAR APR MAY JUNE JULY AUG SEPT OCT NOV DEC
2018 49.7 45.8 54.0 58.6 70.8 71.9 82.8 79.1 NULL NULL NULL NULL
2017 43.7 46.6 51.6 57.3 67.0 72.1 78.3 81.5 73.8 61.1 51.3 45.6
2016 49.1 53.6 56.4 65.9 68.8 73.1 76.0 79.5 69.6 60.6 56.0 41.9
2015 50.3 54.5 57.9 59.9 68.0 78.9 82.6 79.0 68.5 63.6 49.4 47.1
多个聚合操作
SELECT * FROM (
SELECT year(date) year, month(date) month, temp
FROM high_temps
WHERE date BETWEEN DATE '2015-01-01' AND DATE '2018-08-31'
)
PIVOT (
CAST(avg(temp) AS DECIMAL(4, 1)) avg, max(temp) max
FOR month in (6 JUN, 7 JUL, 8 AUG, 9 SEP)
)
ORDER BY year DESC
year JUN_avg JUN_max JUL_avg JUL_max AUG_avg AUG_max SEP_avg SEP_max
2018 71.9 88 82.8 94 79.1 94 NULL NULL
2017 72.1 96 78.3 87 81.5 94 73.8 90
2016 73.1 93 76.0 89 79.5 95 69.6 78
2015 78.9 92 82.6 95 79.0 92 68.5 81
查询6,7,8,9月最高,最低气温的平均值
SELECT * FROM (
SELECT year(date) year, month(date) month, temp, flag
FROM (
SELECT date, temp, 'H' as flag
FROM high_temps
UNION ALL
SELECT date, temp, 'L' as flag
FROM low_temps
)
WHERE date BETWEEN DATE '2015-01-01' AND DATE '2018-08-31'
)
PIVOT (
CAST(avg(temp) AS DECIMAL(4, 1))
FOR (month, flag) in (
(6, 'H') JUN_hi, (6, 'L') JUN_lo,
(7, 'H') JUL_hi, (7, 'L') JUL_lo,
(8, 'H') AUG_hi, (8, 'L') AUG_lo,
(9, 'H') SEP_hi, (9, 'L') SEP_lo
)
)
ORDER BY year DESC
year JUN_hi JUN_lo JUL_hi JUL_lo AUG_hi AUG_lo SEP_hi SEP_lo
2018 71.9 53.4 82.8 58.5 79.1 58.5 NULL NULL
2017 72.1 53.7 78.3 56.3 81.5 59.0 73.8 55.6
2016 73.1 53.9 76.0 57.6 79.5 57.9 69.6 52.9
2015 78.9 56.4 82.6 59.9 79.0 58.5 68.5 52.5
当cache间有依赖关关系时,需要两个cache同步,但是需要区别对待,试想当drop一个临时视图时,只需要将临时视图
失效(non-cascading mode),其它删除表及视图时,需要产生关联变化( regular mode)
由于streaming aggregation操作会将key 值重复保存在value中,对性能没有提升,所以移除。