+----------------------------------+
|amount |
+----------------------------------+
|[1197, 8797, 6146, 253, 4521, 955]|
|[1197, 8797, 6146, 253, 4521, 955]|
+----------------------------------+
object JsonSpark {
def main(args: Array[String]): Unit = {
val spark:SparkSession=SparkSession.builder()
.master("local[1]")
.appName(("Spark"))
.getOrCreate()
val df=spark.read.option("multiline",true).json("C:\\Users\\Admin\\Desktop\\jsonfile.json")
val df2=df.select("transactions")
val df2extract=df2.select("transactions.amount")
df2extract.selectExpr(
"aggregate(amount, 0, (x, y) -> x + y) as details_sum"
).show()
}
}
我得到的错误是
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '>' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 29)
== SQL ==
aggregate(amount, 0, (x, y) -> x + y) as details_sum
-----------------------------^^^
请帮助我解决这个错误。非常感谢!
假设你试图对一个数组求和-
spark>=2.4.0
val df = spark.sql(
"""
|select amount
|from values
|(array(1197, 8797, 6146, 253, 4521, 955)),
|(array(1197, 8797, 6146, 253, 4521, 955))
|T(amount)
""".stripMargin)
df.show(false)
df.printSchema()
/**
* +----------------------------------+
* |amount |
* +----------------------------------+
* |[1197, 8797, 6146, 253, 4521, 955]|
* |[1197, 8797, 6146, 253, 4521, 955]|
* +----------------------------------+
*
* root
* |-- amount: array (nullable = false)
* | |-- element: integer (containsNull = false)
*/
df.selectExpr(
"aggregate(amount, 0, (x, y) -> x + y) as details_sum"
).show()
/**
* +-----------+
* |details_sum|
* +-----------+
* | 21869|
* | 21869|
* +-----------+
*/
火花<2.4
或者-
//spark < 2.4
val intArraySum = udf((arrayInt: mutable.WrappedArray[Int]) => arrayInt.sum)
df.select(
intArraySum($"amount").as("details_sum")
).show()
/**
* +-----------+
* |details_sum|
* +-----------+
* | 21869|
* | 21869|
* +-----------+
*/
val intSeqSum = udf((arrayInt: Seq[Int]) => arrayInt.sum)
df.select(
intSeqSum($"amount").as("details_sum")
).show()
/**
* +-----------+
* |details_sum|
* +-----------+
* | 21869|
* | 21869|
* +-----------+
*/
df.withColumn("details_sum", intSeqSum(function.col("amount")))
.show()
当我尝试使用Hive执行非聚合命令时,查询似乎可以正常工作,如下所示: 从Airlines_Analysis.Airline中选择*;从Airlines_Analysis.Airline中选择Airlines.Month; org.apache.hive.service.cli.hivesqlexception:处理语句时出错:失败:执行错误,从org.apache.hive.service.cl
聚合功能让你可以汇总或更改数据的粒度。 点击字段框中的向下箭头。 选择“聚合”,然后选择一个聚合函数。 函数 描述 数字 总计 返回所有值的总和。Null 值将被忽略。 平均 返回所有值的平均值。Null 值将被忽略。 计数 返回项目数量。Null 值不计算在内。 计数(非重复) 返回不同项目的数量。Null 值不计算在内。 最小 返回所有记录的最小值。Null 值将被忽略。 最大 返回所有记录的
聚合功能让你可以汇总或更改数据的粒度。 点击字段框中的向下箭头。 选择“聚合”,然后选择一个聚合函数。 函数 描述 数字 总计 返回所有值的总和。Null 值将被忽略。 平均 返回所有值的平均值。Null 值将被忽略。 计数 返回项目数量。Null 值不计算在内。 计数(非重复) 返回不同项目的数量。Null 值不计算在内。 最小 返回所有记录的最小值。Null 值将被忽略。 最大 返回所有记录的
聚合功能让你可以汇总或更改数据的粒度。 点击字段框中的向下箭头。 选择“聚合”,然后选择一个聚合函数。 函数 描述 数字 总计 返回所有值的总和。Null 值将被忽略。 平均 返回所有值的平均值。Null 值将被忽略。 计数 返回项目数量。Null 值不计算在内。 计数(非重复) 返回不同项目的数量。Null 值不计算在内。 最小 返回所有记录的最小值。Null 值将被忽略。 最大 返回所有记录的
主要内容:1.COUNT函数,2. SUM函数,3. AVG函数,4. MAX函数,5. MIN函数SQL聚合函数用于对表的单个列的多行执行计算,它只返回一个值。它还用于汇总数据。 SQL聚合函数的类型,如下图所示 - 接下来,我们一个个地讲解。 1.COUNT函数 函数用于计算数据库表中的行数,它可以在数字和非数字数据类型上工作。 函数使用返回指定表中所有行的计数。 包函重复值和值。 语法 假设有一个表,它的结构和数据如下所示 - PRODUCT COMPANY QTY RATE COST I
问题内容: 在解释CTE的一些概念时,有人问了一个可爱的问题..我们可以找到行的乘法吗,而我们总是从新手开始集中精力。那给了我一个想法!仅使用SQL是否有可能。我还考虑了我们甚至可以支持的最大精度,因为该产品可能非常庞大。 话虽如此,我们不能编写自己的聚合函数。(可以吗?)我在想仅使用SQL就有可能。 我想到的就像是自己添加2,3次。.但是当集合很大时..由于繁琐,我无法实现。 另一个可能是和,为