【摘要】 CarbonData是一种高性能大数据存储方案,已在100+企业生产环境上部署应用,其中最大的单一集群数据规模达到几万亿。针对当前大数据领域分析场景需求各异而导致的存储冗余问题,业务驱动下的数据分析灵活性要求越来越高,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持多种应用场景,万亿数据规模,查询性能秒级响应。
作者:陈亮,Apache CarbonData项目 PMC Chairman,技术Committer 来源:华为云社
CarbonData是一种高性能大数据存储方案,已在100+企业生产环境上部署应用,其中最大的单一集群数据规模达到几万亿。针对当前大数据领域分析场景需求各异而导致的存储冗余问题,业务驱动下的数据分析灵活性要求越来越高,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持多种应用场景,万亿数据规模,查询性能秒级响应。
当前CarbonData2.0正在rc2阶段,社区用了半年多时间,倾力打造2.0里程碑版本,新特性都是从实际业务需求演进而来的,尤其在性能和生态集成方面做了非常多的优化。
2.0新特性提前预览(注:下面内容不是官方正式的release notes,只是特性提前预览):
Use Case: CarbonData has a mechanism to load the min/max index cache into memory on the first query that is executed on the specified table. This causes degrade to the query performance. To improve the performance of the First time query, the user can enable prepriming feature which will load the min/max cache into memory on each load.
Usage:
carbon.indexserver.enable.prepriming=true
Use Case: Due to the tight integration of carbon with computer engine spark, Carbon requires CarbonSession to be created instead of SparkSession,
To make the integration layer modular, CarbonData now supports the SparkSessionExtention API which enables carbon to integrate its parser and optimizer to the existing SparkSession.
Example:
val spark = SparkSession
.builder()
.config(conf)
.master("spark://localhost:7077")
.appName("Test")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", "./warehouse")
.config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
.getOrCreate()
spark.sql("""CREATE TABLE IF NOT EXISTS test_table (
id string,
name string,
city string,
age Int)
STORED AS carbondata""")
UseCase: Analytics data such as application performance monitoring, network data, sensor data, events, clicks, banking, server metrics, etc., has to be aggregated and analyzed or monitored over a period of time for business needs. CarbonData supports pre-computation of aggregations and joins through Materialized views which provides faster performance results, timeseries support is required for many users.
Example:
spark.sql("""CREATE TABLE sales (
order_time timestamp, user_id string, xingbie string, country string, quantity int, price bigint)
STORED AS carbondata""")
spark.sql("""
CREATE MATERIALIZED VIEW agg_sales SELECT timeseries(order_time, 'minute'),avg(price) FROM sales
GROUP BY timeseries(order_time, 'minute')
""")
spark.sql("""
SELECT timeseries(order_time,'minute'), avg(price) FROM sales GROUP BY timeseries(order_time,'minute')
""")
Get more about usage: https://github.com/apache/carbondata/blob/master/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
UseCase: For queries which require a filter on a spatial object like a region on a 2D map, these type of queries would be treated as a full scan query, causing significant performance degrade.To solve this limitation in carbon, a concept called as ‘spatial indexing’, that allows for accessing a spatial object efficiently is implemented. It is a common technique used by spatial databases.
Example:
spark.sql("""create table source_index(id BIGINT, latitude long, longitude long) stored by 'carbondata' TBLPROPERTIES (
'INDEX_HANDLER'='mygeohash',
'INDEX_HANDLER.mygeohash.type'='geohash',
'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
'INDEX_HANDLER.mygeohash.originLatitude'='19.832277',
'INDEX_HANDLER.mygeohash.gridSize'='50',
'INDEX_HANDLER.mygeohash.minLongitude'='1.811865',
'INDEX_HANDLER.mygeohash.maxLongitude'='2.782233',
'INDEX_HANDLER.mygeohash.minLatitude'='19.832277',
'INDEX_HANDLER.mygeohash.maxLatitude'='20.225281',
'INDEX_HANDLER.mygeohash.conversionRatio'='1000000')
""")
spark.sql("""
select * from source_index where IN_POLYGON('16.321011 4.123503,16.137676 5.947911,16.560993 5.935276,16.321011 4.123503')
""")
Get more about usage:
https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
Use Case: When unsorted data is stored in carbon the pruning process tends to give false positives when comparing min/max. For example a blocklet might have 3,5,8 integer values in it which means the min=3 and max=8. If the user has a filter expression with the value 4 then the pruning process will give a false report that this blocklet will have data and the reading flow should decompress this page and read the contents. This would lead to unnecessary IO finally resulting in a performance degrade.
To improve the query performance, the Secondary Index has been designed on the existing min/max architecture which is basically a reverse index of the unique data to the blocklets it is present in. This will give the exact location of the data so that false positive scenarios during pruning are minimized.
Example:
spark.sql("""CREATE TABLE sales (order_time timestamp, user_id string, xingbie string, country string, quantity int, price bigint)
STORED AS carbondata
""")
spark.sql("""CREATE INDEX index_sales ON TABLE sales(user_id) AS 'carbondata'
""")
https://github.com/apache/carbondata/blob/master/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexTable.scala
In the current data warehouse world slowly changing dimensions (SCD) and change data capture(CDC) are very common scenarios. Legacy systems like RDBMS can handle these scenarios very well because of the support of transactions.
To keep up with the existing database technologies, CarbonData now supports CDC and SCD functionalities.
Example:
initframe.write .format("carbondata").option("tableName", "order").mode(SaveMode.Overwrite).save()
val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load()
val dwSelframe = dwframe.as("A")
val updateMap = Map("id" -> "A.id",
"name" -> "B.name",
"c_name" -> "B.c_name",
"quantity" -> "B.quantity",
"price" -> "B.price",
"state" -> "B.state").asInstanceOf[Map[Any, Any]]
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
Get more about usage:
https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/CDCExample.scala
Use Case: Carbonata needs to be integrated with fault-tolerant streaming dataflow engines like Apache Flink, where users can build a flink streaming job and use flink sink to write data to carbon through CarbonSDK. Flink sink will generate table stage files, data from stage files can be inserted to the carbon table by carbon Insert stage command, by making them visible for query.
Example:
spark.sql("""
CREATE TABLE test_flink (stringField string, intField int, shortField short) STORED AS carbondata
""")
// create flink streaming environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment()
environment.setParallelism(1)
environment.enableCheckpointing(2000L)
environment.setRestartStrategy(RestartStrategies.noRestart())
DataStreamSource<OUT> stream = environment.addSource(DataSource like Kafka/)
// create carbon sdk writer factory with LOCAL/S3/OBS builder
CarbonWriterFactory factory = CarbonWriterFactory.builder("Local").build(dbName,tableName,tablePath,
tableProperties,writerProperties,carbonProperties)
// create stream sink and add it to stream
StreamingFileSink<IN> streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build()
stream.addSink(streamSink)
// execute flink streaming job which generate’s stage files
environment.execute()
Get more about usage:
https://github.com/apache/carbondata/blob/master/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala
Use Case: Many users have already generated data with different formats like ORC, Parquet, JSON, CSV etc. If users want to migrate to CarbonData for better performance or for better features(SDK) then there was no mechanism. All the existing data had to be converted to CarbonData to migrate.
To solve this limitation, add segment is introduced so that the user can easily add segments of different formats to a carbon table and run the queries.
Example:
alter table test add segment options ('path'='hdfs://usr/oldtable','format'='parquet')
https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
UseCase: Hive expression has to be pushed down to carbon to filter data which improves the query performance.
Usage/Example: When set hive.optimize.index.filter = true, hive expression can be pushed down to carbon to filter the data.
Use Case: CarbonData now supports write and read from Hive execution engine. It will be helpful for users who want to try carbon without migrating to spark. Also, users can now convert their existing parquet/orc table directly to carbon format for ETL purposes.
Example:
CREATE TABLE hive_carbon_table(shortField SMALLINT , intField INT, bigintField BIGINT ,
doubleField DOUBLE, stringField STRING, timestampField TIMESTAMP, decimalField DECIMAL(18,2),
dateField DATE, charField CHAR(5), floatField FLOAT) stored by 'org.apache.carbondata.hive.CarbonStorageHandler'
Get more about usage: https://github.com/apache/carbondata/blob/master/integration/hive/src/test/java/org/apache/carbondata/hive/HiveCarbonTest.java
Use Case: Currently presto has two community, presto db and presto sql. To support CarbonData for users of both the community, now carbon supports prestodb-0.217 and prestosql-316.
Usage: By default prestosql profile is selected for maven build. User can use prestodb profile to build CarbonData for prestodb.
Use Case: Currently insert and load command have a common code flow, which includes many overheads to insert command because features like BadRecords are not required by the insert command.
Now load and insert flow have been separated and some additional optimizations are implemented to insert command such as,
1. Rearrange projections instead of rows.
2. Use internal row instead row object from spark.
It is observed that these optimization resulted in 40% insert performance improvement for TPCH data.
Use Case: Bucketing feature is used to distribute/organize the table/partition data into multiple files such that similar records are present in the same file. Join operation on datasets will cause a large volume of data shuffling making it quite slow, which can be avoided on bucket columns. Bucket tables have been made consistent with spark to improve the join performance by avoiding shuffle for the bucket column.
Example:
spark.sql("""CREATE TABLE bucket_table1 (stringField string, intField int, shortField short)
STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField') """)
spark.sql("""
CREATE TABLE bucket_table2 (stringField string, intField int, shortField short)
STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField') """)
Get more about usage: https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
Use Case: CarbonData now provides python API(PyCarbon) support for integrating it with AI frameworks like TensorFlow, PyTorch, MXNet. By using PyCarbon, AI framework should be able to read training data faster by leveraging CarbonData's indexing and caching ability. Since CarbonData is a columnar storage, AI developers should also be able to perform projection and filtering to pick required data for training efficiently.
Usage/Example:
Please refer the below link to use pycarbon https://github.com/apache/carbondata/blob/master/python/README.md
Use Case: CarbonData’s datamap interface can be used to improve the query performance of other formats like Parquet/ORC. One of the implementations of datamap interface is MV table which precompute the aggregation results based on the user input. By creating MV datamap on a parquet/orc table the user can get the benefit of quering a pre-computed data instead of raw data which results in better query results.
This is possible as carbon will redirect the query to the MV datamap instead of the parquet tables.
Example:
Spark.sql("""
create table source(empname String, designation String, deptno int, deptname String, salary int) using parquet
""")
Spark.sql("""
create materialized view mv_parquet as select empname, deptname, avg(salary) from source group by empname, deptname
""")