当前位置: 首页 > 知识库问答 >
问题:

Spark SQL:子查询的交叉联接

顾俊哲
2023-03-14

我尝试在pyspark中(在Spark 1.5.0上)运行以下SQL查询:

SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet")
a.registerTempTable("a")

b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet")
b.registerTempTable("b")

result = sqlCtx.sql("SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o19.sql.
: java.lang.RuntimeException: [1.67] failure: ``union'' expected but identifier CROSS found

SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2
                                                                  ^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:36)
at org.apache.spark.sql.catalyst.DefaultParserDialect.parse(ParserDialect.scala:67)
at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:175)
at org.apache.spark.sql.SQLContext$$anonfun$3.apply(SQLContext.scala:175)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:115)
at org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:172)
at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:172)
at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:42)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

共有1个答案

夏俊人
2023-03-14

使用HiveContext而不是SQLContext似乎可以解决这个问题。此解决方案运行良好:

from pyspark.sql import HiveContext
sqlCtx = HiveContext(sc)

a = sqlCtx.parquetFile("hdfs/path/to/table/a.parquet")
a.registerTempTable("a")

b = sqlCtx.parquetFile("hdfs/path/to/table/b.parquet")
b.registerTempTable("b")

result = sqlCtx.sql("SELECT * FROM ( SELECT obj as origProperty1 FROM a LIMIT 10) tab1 CROSS JOIN ( SELECT obj AS origProperty2 FROM b LIMIT 10) tab2").collect()

但据我所知,只要支持子查询和交叉联接,这也应该适用于Spark SQL的SQLContext...

 类似资料:
  • jpaQuery.from(tableA,tableb)。(如何编写以下条件)。id=表格b。id() 如何使用查询dsl编写左向外连接?? 这是编写eq连接的示例代码 JPA query query = new JPA query(em); Q表A = Q表A.QTableB 表 B = QTableB 表 B query.from(tableA, tableB). where(tableA.i

  • 让我们以交通摄像头案例为例。假设,我有一个巨大的数据集,其中包含交通摄像头记录,看起来像:plate_no|camera_id|城市|位置|方向|时间戳|etc|etc。 我希望得到满足以下所有条件的结果: 像''%George Street'这样的位置 10月1日 我们现在采用的方法是在SolrCloud中索引数据,然后得到三个结果集,如1 只是想知道什么是这项任务的正确工具,或者有比这更好的方

  • 你好,我被querydsl(hibernate 4.2.3和postgresql)卡住了。一般来说,它工作得很好,但是我有一个谓词是这样构建的: 现在的问题是用户并不总是有客户。Querydsl 生成交叉联接到客户表的 sql,当用户没有客户时,此查询将返回零个条目,即使应返回这些条目。此客户已保留联接(稍微修改了Spring数据以允许联接提取字段),但仍会创建其他交叉联接。任何想法如何写这样的查

  • 我正在运行一个简单的sparkSQL查询,它在2个数据集上进行匹配每个数据集大约500GB。所以整个数据大约是1TB。 作业工作良好,直到数据加载(分配了10K任务)。在行分配了200个任务。失败的地方!我知道我不是在缓存一个巨大的数据,它只是一个数字,为什么它会在这里失败。 以下是错误详细信息:

  • 问题内容: 对于开发人员何时使用联接而不是子查询是否有经验法则还是相同的? 问题答案: 取决于RDBMS。您应该比较两个查询的执行计划。 根据我对Oracle 10和11的经验,执行计划始终是相同的。

  • 问题内容: 我想知道,是否可以使用PostgreSQL将查询结果与自身连接? 问题答案: 您可以使用WITH来做到这一点: 或者通过创建一个包含查询的VIEW并加入该查询: 还是蛮力的方法:两次键入子查询: