当前位置: 首页 > 工具软件 > spark-pac4j > 使用案例 >

spark2.4 feature

鲁单弓
2023-12-01

Barrier Execution Mode

机器学相关,不适合map-reduce模式的计算,比如MPI

Built-in Higher-order Functions

构造高阶方法,支持多种复杂类型操作(数组等)

SELECT array_distinct(array(1, 2, 3, null, 3));

数组去重

SELECT array_intersect(array(1, 2, 3), array(1, 3, 5));

数组交集

databrick notebook介绍 
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/142158605138935/3773509768457258/7497868276316206/latest.html

Built-in Avro Data Source

支持avro源,dataframe 增加from_avro() and to_avro() 方法,两倍读吞吐增加,10%写吞吐增加

val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read
    .format("com.databricks.spark.avro")
    .load("src/test/resources/episodes.avro")

val spark = SparkSession.builder().master("local").getOrCreate()

// 配置用deflate 压缩
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.avro("src/test/resources/episodes.avro")

// writes out compressed Avro records
df.write.avro("/tmp/output")

//写入partition

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.partitionBy("year", "month").avro("/tmp/output")


//写入name与namespace信息

val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.read.avro("src/test/resources/episodes.avro")

val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("/tmp/output")

Experimental Scala 2.12 Support

实验性的支持scala 2.12

Pandas UDF Improvement

改善pandas udf

Image Data Source

针对图形数据源的支持

df = spark.read.format("image").load("...")

Kubernetes Integration Enhancement

提升对kubernetes的支持,在container中支持整合sparkPy,sparkR环境,可以在客户端与pod交互,支持mounting文件系统: emptyDir, hostPath, and persistentVolumeClaim


Flexible Streaming Sink

dataframe包括一些广泛的外部框架的输出,而sink只支持少数的框架,可以让sink中使用dataframe方法输出 

streamingDF.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>

    batchDF.write       // Use Cassandra batch data source to write streaming out
      .cassandraFormat(tableName, keyspace)
      .option("cluster", clusterName)
      .mode("append")
      .save()
  }

避免重新计算并输出多个目的地

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}


sql中增加repartition与coalesce方法

INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

支持pivot

Date    Temp (°F)
07-22-2018    86
07-23-2018    90
07-24-2018    91
07-25-2018    92
07-26-2018    92
07-27-2018    88
07-28-2018    85
07-29-2018    94
07-30-2018    89

SELECT * FROM (
  SELECT year(date) year, month(date) month, temp
  FROM high_temps
  WHERE date BETWEEN DATE '2015-01-01' AND DATE '2018-08-31'
)
PIVOT (
  CAST(avg(temp) AS DECIMAL(4, 1))
  FOR month in (
    1 JAN, 2 FEB, 3 MAR, 4 APR, 5 MAY, 6 JUN,
    7 JUL, 8 AUG, 9 SEP, 10 OCT, 11 NOV, 12 DEC
  )
)

YEAR    JAN    FEB    MAR    APR    MAY    JUNE    JULY    AUG    SEPT    OCT    NOV    DEC
2018    49.7    45.8    54.0    58.6    70.8    71.9    82.8    79.1    NULL    NULL    NULL    NULL
2017    43.7    46.6    51.6    57.3    67.0    72.1    78.3    81.5    73.8    61.1    51.3    45.6
2016    49.1    53.6    56.4    65.9    68.8    73.1    76.0    79.5    69.6    60.6    56.0    41.9
2015    50.3    54.5    57.9    59.9    68.0    78.9    82.6    79.0    68.5    63.6    49.4    47.1

多个聚合操作

SELECT * FROM (
  SELECT year(date) year, month(date) month, temp
  FROM high_temps
  WHERE date BETWEEN DATE '2015-01-01' AND DATE '2018-08-31'
)
PIVOT (
  CAST(avg(temp) AS DECIMAL(4, 1)) avg, max(temp) max
  FOR month in (6 JUN, 7 JUL, 8 AUG, 9 SEP)
)
ORDER BY year DESC

 

year    JUN_avg    JUN_max    JUL_avg    JUL_max    AUG_avg    AUG_max    SEP_avg    SEP_max
2018    71.9    88    82.8    94    79.1    94    NULL    NULL
2017    72.1    96    78.3    87    81.5    94    73.8    90
2016    73.1    93    76.0    89    79.5    95    69.6    78
2015    78.9    92    82.6    95    79.0    92    68.5    81

查询6,7,8,9月最高,最低气温的平均值

SELECT * FROM (
  SELECT year(date) year, month(date) month, temp, flag
  FROM (
    SELECT date, temp, 'H' as flag
    FROM high_temps
    UNION ALL
    SELECT date, temp, 'L' as flag
    FROM low_temps
  )
  WHERE date BETWEEN DATE '2015-01-01' AND DATE '2018-08-31'
)
PIVOT (
  CAST(avg(temp) AS DECIMAL(4, 1))
  FOR (month, flag) in (
    (6, 'H') JUN_hi, (6, 'L') JUN_lo,
    (7, 'H') JUL_hi, (7, 'L') JUL_lo,
    (8, 'H') AUG_hi, (8, 'L') AUG_lo,
    (9, 'H') SEP_hi, (9, 'L') SEP_lo
  )
)
ORDER BY year DESC

 

year    JUN_hi    JUN_lo    JUL_hi    JUL_lo    AUG_hi    AUG_lo    SEP_hi    SEP_lo
2018    71.9    53.4    82.8    58.5    79.1    58.5    NULL    NULL
2017    72.1    53.7    78.3    56.3    81.5    59.0    73.8    55.6
2016    73.1    53.9    76.0    57.6    79.5    57.9    69.6    52.9
2015    78.9    56.4    82.6    59.9    79.0    58.5    68.5    52.5

  • 消除spark 内存数据复制数据2g限制
  • 实现 EXCEPT ALL and INTERSECT ALL
  • 实现不关联cache 实效

当cache间有依赖关关系时,需要两个cache同步,但是需要区别对待,试想当drop一个临时视图时,只需要将临时视图

失效(non-cascading mode),其它删除表及视图时,需要产生关联变化( regular mode)

  • 支持kafka.isolation.level参数,从而读取kafka事务提交的数据
  • structed streaming memorysink限制数据大小,防止OOM
  • 移除key值在value中

由于streaming aggregation操作会将key 值重复保存在value中,对性能没有提升,所以移除。

  • 提升mapGroupsWithState方法
  • 支持structed streaming最大,最小watermarks
  • 支持kafka2.0

 

 类似资料:

相关阅读

相关文章

相关问答