最开始spark版本为2.4.3,因业务需要升级为3.1.0,版本升级时踩了几个坑,将这些错误记录下来,供大家参考
spark2.4.3版本中需要我们指定该方法的返回类型,但正因为这个, 导致在Spark3中报错
def udfGenPartitionWindow(features: Array[FeatureInfo]): UserDefinedFunction = {
udf((featureId: String, timestamp: Long) => genPartitionWindow(features, featureId, timestamp), ArrayType(StringType))
}
报错信息:
Exception in thread "main" org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:
1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`
2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { override def call(s: String): Integer = s.length() }, IntegerType)`, if input types are all non primitive
3. set spark.sql.legacy.allowUntypedScalaUDF to true and use this API with caution;
从报错信息中,我们可以看出,官方不再建议我们自己添加返回类型,如果添加了则会报错,并且提供了三种解决方法
将返回类型直接删除即可
def udfGenPartitionWindow(features: Array[FeatureInfo]): UserDefinedFunction = {
udf((featureId: String, timestamp: Long) => genPartitionWindow(features, featureId, timestamp))
}
然后会报错:
Caused by: java.io.NotSerializableException: ...
Serialization stack:
- object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)
- field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)
- object (class ...$ConversionUtils$$anonfun$3, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)
- element of array (index: 35)
- array (class [Ljava.lang.Object;, size 36)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,.....
这里我们直接将此类继承 Serializable即可
object UdfUtils extends Serializable {...}
到此,问题解决
sparkSession.sql("set spark.sql.legacy.allowUntypedScalaUDF=true")
因为业务需求,表名前有两个namespace空间命名, 但是spark3不支持这种方式
select *
from udw_ns.default.test_table
报错信息:
User class threw exception: org.apache.spark.sql.AnalysisException: spark_catalog requires a single-part namespace, but got [udw_ns, default]
详情:
[23-01-31 10:42:44.542][INFO][main][Client][Logging.scala:57]
client token: N/A
diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: spark_catalog requires a single-part namespace, but got [udw_ns, default]
at org.apache.spark.sql.errors.QueryCompilationErrors$.requiresSinglePartNamespaceError(QueryCompilationErrors.scala:850)
at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog$TableIdentifierHelper.asTableIdentifier(V2SessionCatalog.scala:192)
at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.loadTable(V2SessionCatalog.scala:65)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:281)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$lzycompute$1(Analyzer.scala:1250)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$1(Analyzer.scala:1250)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1287)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1286)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$15.applyOrElse(Analyzer.scala:1204)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$15.applyOrElse(Analyzer.scala:1167)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1122)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1121)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1122)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1121)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1167)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1133)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:222)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:218)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:167)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:218)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:182)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:203)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:75)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:65)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at com.baidu.anti.module.process.impl.Extractor.udwMartLogExtract(Extractor.scala:145)
at com.baidu.anti.module.process.impl.Extractor.getLogData(Extractor.scala:125)
at com.baidu.anti.module.process.impl.Extractor.run(Extractor.scala:68)
at com.baidu.anti.runner.OfflineThemis$.runJob(OfflineThemis.scala:64)
at com.baidu.anti.runner.OfflineThemis$.main(OfflineThemis.scala:32)
at com.baidu.anti.runner.OfflineThemis.main(OfflineThemis.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
对于此种办法, 可以尝试在表后添加 .history
select *
from udw_ns.default.test_table.history
也可去尝试 iceberg 方式.