当前位置: 首页 > 知识库问答 >
问题:

scala中的Spark SQL执行

殳智志
2023-03-14

我有一个以下数据(alldata),它有SQL查询和视图名称。

Select_Query|viewname
select v1,v2 from conditions|cond
select w1,w2 from locations|loca

我已经拆分并正确地将其分配给诱惑(alldata)

val Select_Querydf = spark.sql("select Select_Query,ViewName from alldata")

当我尝试执行查询并从中注册tempview或表时,它显示空指针错误。但是当我注释掉spark时,PRINTLN显示了表中的所有值。sql语句。

 Select_Querydf.foreach{row => 
          val Selectstmt = row(0).toString()
          val viewname = row(1).toString()
          println(Selectstmt+"-->"+viewname)
      spark.sql(Selectstmt).registerTempTable(viewname)//.createOrReplaceTempView(viewname)
       }
output:
select v1,v2 from conditions-->cond
select w1,w2 from locations-->loca

但是当我用spark.sql执行它时,它会显示以下错误,请帮助我出错的地方。

19/12/09 02:43:12错误执行器:在阶段4.0任务0.0中的异常(TID 4)org.apache.spark.rdd.NullPointerException在28.applySparkSorg.apache.spark.rdd.状态$lzycompute(SparkS28.apply)在org.apache.spark.SparkSession.session状态(SparkSession.scala:126)在org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623)在Sparkcalacode1. SQLQueryPertewith头$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:36)在Sparkcalacode1. SQLQueryPertewith头$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:32)在scala.collection.迭代器$class.foreach(Iterator.scala:891)在scala.collection.AbstractIterator.foreach(Iterator.scala:1334)在java.lang.RDD$anonfun$foreach1美元$anonfun$申请$org.apache.spark.sql.(RDD. scala: 918)在ession.sessionRDD$anonfun$foreach1美元$anonfun$申请$ession.scala:128(RDD. scala: 918)在org.apache.spark.sql.SparkContext$$anonfunjava. util. concam上的ThreadPoolExecitor. runWorker(未知源)。java. lang上的ThreadPoolExecitor$Worker. run(未知源)。线程. run(未知源)19/12/09 02:43:12 ERROR TaskSetManager:阶段4.0中的任务0失败1次;中止线程“main”org. apache. sql中的作业异常。SparkException:由于阶段失败而中止的作业:阶段4.0中的任务0失败1次,最近一次失败:阶段4.0中丢失的任务0.0(TID 4、localhost、执行器驱动程序):java. lang。org. apache. sql上的NullPointerException。org. apache. sql上的SparkSession. sesionState$lzycompute(SparkSession. scala:128)。org. apachesion. sql上的SparkSession. sesionState(SparkSessionRDD$anonfun1$foreach1美元$anonfun28美元应用程序(RDD. scala: 918)在org. apache. spark. rdd。RDD$anonfun1$foreach1美元$anonfun28美元应用程序(RDD. scala: 918)在org. apache. spark。SparkContext$anonfun$runJob5美元。应用程序(SparkContext. scala: 2062)在org. apache. spark。SparkContext$anonfun$runJob5美元。应用程序(SparkContext. scala: 2062)在org. apache. spark.调度器。org. apache. spark.调度器上的ResultTask. runTask(Resultask. scala: 87)。在org. apache. spark.执行器上运行任务(Task. scala: 108)。执行器$TaskRunner. run(Exec

共有1个答案

楚墨一
2023-03-14

这里是火花。sql是SparkSession,不能用于数据帧的foreach<代码>Sparksession在驱动程序中创建,foreach在worker中执行,而不是序列化。

我希望你有一个Select_Querydf的小列表,如果是这样,你可以收集为一个列表并如下所示使用它。

Select_Querydf.collect().foreach { row =>
  val Selectstmt = row.getString(0)
  val viewname = row.getString(1)
  println(Selectstmt + "-->" + viewname)
  spark.sql(Selectstmt).createOrReplaceTempView(viewname)
}

希望这有帮助!

 类似资料:
  • <code>Spark</code>版本为1.3.0。 来自< code > sqlcontext . Scala (https://github . com/Apache/spark/blob/master/SQL/core/src/main/Scala/org/Apache/spark/SQL/sqlcontext . Scala)的源代码: 我真的不能理解上面的代码。 是如何工作的? (_)

  • 我正在尝试使用一个继承的Scala函数(stuctType.diff())并获得一个NoSuchMethodError。 有人有什么想法吗?我使用的是Spark 1.6.2和Scala 2.10

  • 我正在运行一个简单的sparkSQL查询,它在2个数据集上进行匹配每个数据集大约500GB。所以整个数据大约是1TB。 作业工作良好,直到数据加载(分配了10K任务)。在行分配了200个任务。失败的地方!我知道我不是在缓存一个巨大的数据,它只是一个数字,为什么它会在这里失败。 以下是错误详细信息:

  • ConsumptionExecutor: 然而,我想使用Akka流/Akka Actor,在这里我不需要给出固定的线程池大小,Akka负责所有的事情。我对Akka和流媒体和演员的概念很陌生。有人能给我任何线索,以示例代码的形式,以适合我的用例?提前道谢!

  • 假设我有以下一组代码,可以在将来做一些事情: 假设我为这段代码提供了默认的ExecutionContext,我知道在后台会发生什么,但我想知道的是如何处理未来?我的意思是,应该有一些线程或一组线程可能会等待未来完成?这些线程被阻塞了吗?从某种意义上说,他们是在等待未来的结束? 现在在以下场景中: 假设x有一个超时,我可以这样调用: 我真的在阻挡吗?有没有更好的异步超时方法? 编辑:下面的超时比我上

  • 我对Spark和Scala非常陌生(比如两个小时的新体验),我正在尝试玩CSV数据文件,但我无法做到,因为我不确定如何处理“标题行”,我在互联网上搜索了加载或跳过它的方法,但我真的不知道怎么做。我正在粘贴我正在使用的代码,请帮助我。