使用spark job导数之后,job显示执行成功,去dbeaver查询表但是显示Syntax error or analysis exception: Union can only be performed on tables with the compatible column types.
Query execution failed
原因:
SQL 错误 [20000] [42000]: (SQLState=42000 Severity=20000) (Server=rf-sd0003.rayfay.io/192.168.106.96[2527] Thread=ThriftProcessor-3982) Syntax error or analysis exception: Union can only be performed on tables with the compatible column types. TimestampType <> DecimalType(38,0) at the first column of the second table;;
'Union
:- Project [id#13361, name#13362, creat_time#13363, upt#13364, money#13365, _upt#13366, _SID#13379, timestamptomill(_UPT#13366) AS __T_UPT_T#13407L]
: +- Project [id#13361, name#13362, creat_time#13363, upt#13364, money#13365, _upt#13366, 0 AS _SID#13379]
: +- SubqueryAlias app_db_tset_t_talbe_test_lq
: +- Relation[id#13361,name#13362,creat_time#13363,upt#13364,money#13365,_upt#13366] RowFormatRelation[app_db_tset.app_db_tset_t_talbe_test_lq]
+- Filter (1 = 1)
+- Project [_upt#13387, money#13388, upt#13389, creat_time#13390, name#13391, id#13392, _SID#13399, timestamptomill(_UPT#13387) AS __T_UPT_T#13416L]
+- Project [_upt#13387, money#13388, upt#13389, creat_time#13390, name#13391, id#13392, 1 AS _SID#13399]
+- Relation[_upt#13387,money#13388,upt#13389,creat_time#13390,name#13391,id#13392] parquet
<?xml version="1.0" encoding="UTF-8"?>
<daas-model xmlns="http://daas-model.rayfay.cn/pipline/job/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://daas-model.rayfay.cn/pipline/job/ http://daas-model.rayfay.cn/pipline/job/daas-model.xsd">
<domain viural="true">
<sources>
<source view="source_DF" provider="csv">
<options>
<option key="path">file:///apps/daas/data/F_T_TALBE_TEST_LQ/7.csv</option>
<option key="header">true</option>
<option key="inferSchema">false</option>
<option key="charset">UTF-8</option>
<option key="delimiter">,</option>
<option key="lastDelimiter">false</option>
</options>
</source>
</sources>
<process>
<dataframe name="t_transform_DF" show="true">
select CAST(CURRENT_TIMESTAMP as TIMESTAMP) _UPT,CAST(MONEY as DECIMAL(10,2)) MONEY,CAST(CURRENT_TIMESTAMP as TIMESTAMP) UPT,CAST(CURRENT_TIMESTAMP as TIMESTAMP) CREAT_TIME,CAST(NAME as VARCHAR(100)) NAME,CAST(ID as DECIMAL(38,0)) ID from source_DF
</dataframe>
</process>
<sinks>
<sink provider="merge" dataframe="t_transform_DF">
<options>
<option key="table">APP_DB_TSET.F_T_TALBE_TEST_LQ</option>
<option key="import_mode">snappydata</option>
</options>
</sink>
</sinks>
</domain>
</daas-model>
解决过程:
初步判断是字段对应不上导致的数据表字段混乱报的错
错误:
把文件头和SQL调整为如下顺序:
[ID#13439,CREAT_TIME#13440,UPT#13441,MONEY#13442,NAME#13443]
select CAST(ID as DECIMAL(38,0)) ID,CAST(NAME as VARCHAR(100)) NAME,CAST(CURRENT_TIMESTAMP as TIMESTAMP) CREAT_TIME,CAST(CURRENT_TIMESTAMP as TIMESTAMP) UPT,CAST(MONEY as DECIMAL(10,2)) MONEY,CAST(CURRENT_TIMESTAMP as TIMESTAMP) _UPT from source_DF
执行spark job发生如下错误:
{
"duration": "1.076 secs",
"classPath": "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml",
"startTime": "2020-09-03T15:09:25.914+08:00",
"context": "a8bdc2b4-org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml",
"result": {
"message": "org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. DecimalType(38,0) <> TimestampType at the first column of the second table;;\n'Union\n:- Relation[_upt#13562,money#13563,upt#13564,creat_time#13565,name#13566,id#13567] parquet\n+- SubqueryAlias t_transform_df, `t_transform_df`\n +- Project [cast(id#13439 as decimal(38,0)) AS id#13449, cast(name#13443 as string) AS name#13450, cast(current_timestamp() as timestamp) AS creat_time#13451, cast(current_timestamp() as timestamp) AS upt#13452, cast(money#13442 as decimal(10,2)) AS money#13453, cast(current_timestamp() as timestamp) AS _upt#13454]\n +- SubqueryAlias source_df, `source_df`\n +- Relation[ID#13439,CREAT_TIME#13440,UPT#13441,MONEY#13442,NAME#13443] csv\n",
"errorClass": "java.lang.RuntimeException",
"stack": ["org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)", "org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16$$anonfun$apply$17.apply(CheckAnalysis.scala:338)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16$$anonfun$apply$17.apply(CheckAnalysis.scala:335)", "scala.collection.Iterator$class.foreach(Iterator.scala:893)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1336)", "scala.collection.IterableLike$class.foreach(IterableLike.scala:72)", "scala.collection.AbstractIterable.foreach(Iterable.scala:54)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16.apply(CheckAnalysis.scala:335)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$16.apply(CheckAnalysis.scala:324)", "scala.collection.immutable.List.foreach(List.scala:381)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:324)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)", "org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)", "org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)", "org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)", "org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)", "org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)", "org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:2860)", "org.apache.spark.sql.Dataset.union(Dataset.scala:1635)", "org.apache.spark.sql.ray.merge.SnMergeSink.sink(SnMergeSink.scala:60)", "org.apache.spark.sql.ray.PiplineSink$class.sink(PiplineSink.scala:11)", "org.apache.spark.sql.ray.merge.SnMergeSink.sink(SnMergeSink.scala:12)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$$anonfun$runSnappyJob$6.apply(PiplineStagesSubmitAsXml.scala:108)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$$anonfun$runSnappyJob$6.apply(PiplineStagesSubmitAsXml.scala:100)", "scala.collection.Iterator$class.foreach(Iterator.scala:893)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1336)", "scala.collection.IterableLike$class.foreach(IterableLike.scala:72)", "scala.collection.AbstractIterable.foreach(Iterable.scala:54)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$.runSnappyJob(PiplineStagesSubmitAsXml.scala:100)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$.runSnappyJob(PiplineStagesSubmitAsXml.scala:19)", "org.apache.spark.sql.SnappySQLJob$class.runJob(SnappySessionFactory.scala:128)", "org.apache.spark.sql.ray.pipline.PiplineStagesSubmitAsXml$.runJob(PiplineStagesSubmitAsXml.scala:19)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:351)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)", "java.lang.Thread.run(Thread.java:748)"]
},
"status": "ERROR",
"jobId": "b9f1e969-b14b-444b-8610-571ff46eca73"
}
又显示字段匹配不上了, 显然以上思路方法是不对的。
正确:
使用snappydata导入的是进入内存表的,hdfs导入的是通过建表时关联的parquet文件来导入的,由于之前的parquet文件夹里有旧文件,顺序为:[_upt#13387,money#13388,upt#13389,creat_time#13390,name#13391,id#13392], 然而建表时的字段顺序为[id#13361, name#13362, creat_time#13363, upt#13364, money#13365, _upt#13366, 0 AS _SID#13379],匹配不上字段顺序,所以报错。
INSERT INTO RAYDM.T_RAYDM_TABLE_JOB_META VALUES('APP_DB_TSET.T_TALBE_TEST_IMPORT_LQ','hdfs://192.168.106.31:8020/data/snappydata/test/APP_DB_TSET/F_T_TALBE_TEST_IMPORT_LQ/merged/TOTAL_MERGED',0 );
解决方法:
删除之前的表,重新建表,使用新的parquet路径,问题解决。
注意:SQL中字段顺序要和建表字段顺序一致,导入文件不必一致。