当前位置: 首页 > 知识库问答 >
问题:

NoSuchElementException火花流查询中的错误,尽管集群配置很重

仲孙飞文
2023-03-14

有人能帮我理解这个错误背后的原因吗:

ERROR Query alert [id = d19f51b1-8131-40dd-ab62, runId = 276833a0-235f-4d2e-bd61] terminated with error
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:180)
at org.apache.spark.sql.execution.streaming.FileStreamSink.basicWriteJobStatsTracker(FileStreamSink.scala:103)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:140)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:568)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:566)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:565)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:207)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:296)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

群集配置为:

数据库运行时5.5 LTS

Scala 2.11

Spark 2.4.3

驱动程序:64GB内存,16核,3DBU

工人:64GB mem,16核,3DBU(2-4个工人,自动扩展)

fairscheduler中定义了3个并行运行的流式查询。xml

Spark配置是:

spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.broadcastTimeout=1200
spark.executor.instances=4
spark.executor.cores=16
spark.executor.memory=29g
spark.sql.shuffle.partitions=32
spark.default.parallelism=32
spark.driver.maxResultSize=25g
spark.scheduler.mode=FAIR
spark.scheduler.allocation.file=/dbfs/config/fairscheduler.xml

在下面添加代码流:

implicit class PipedObject[A](value: A) {
def conditionalPipe(f: A => A)(pred: Boolean): A =
  if (pred) f(value) else value
}
implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("MyApp")
  .conditionalPipe(sess => sess.master("local[6]"))(false)
  .getOrCreate()
import spark.implicits._

val cookedData = getCookedStreamingData() // streaming data as input from event hub
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "cook")
cookedData.writeStream
          .option("checkpointLocation", "checkpointLocation1")
          .queryName("queryName1")
          .format("avro")
          .option("path", "dir1")
          .start()

val scoredData = score(cookedData)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "score")
scoredData.writeStream
          .option("checkpointLocation", "checkpointLocation2")
          .queryName("queryName2")
          .format("avro")
          .option("path", "dir2")
          .start()

val alertData = score(scoredData)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "alert")
alertData.writeStream
          .option("checkpointLocation", "checkpointLocation3")
          .queryName("queryName3")
          .format("avro")
          .option("path", "dir3")
          .start()

fairScheduler示例。xml文件:

<allocations>
<pool name="default">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>2</minShare>
</pool>
<pool name="cook">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>5</minShare>
</pool>
<pool name="score">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>5</minShare>
</pool>
<pool name="alert">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>5</minShare>
</pool>
</allocations>

共有1个答案

孙凌龙
2023-03-14

java.util.NoSuchElementException: None.get

纯粹是您的scala编程错误。因为没有代码片段,所以我无法指向它。

如果正在使用选项,则在读取元素之前,需要检查

选项上使用get之前定义

或者,您可以使用Option中的getOrElse()函数来提供默认值。

如果您使用多个火花上下文,它可能会出现...

看看这个。。。Spark流异常:java。util。NoSuchElementException:无。收到

 类似资料:
  • 我不能用火花流运行Kafka。以下是我迄今为止采取的步骤: > 将此行添加到- Kafka版本:kafka_2.10-0.10.2.2 Jar文件版本:spark-streaming-kafka-0-8-assembly_2.10-2.2.0。罐子 Python代码: 但我仍然得到以下错误: 我做错了什么?

  • 如果spark streaming在10秒的批处理间隔中获得50行消息,并且在40.5行消息之后,这10秒就结束了,剩下的时间落入另一个10秒的间隔中,前40.5行的文本是一个RDD被首先处理,在我的用例中,前40行是有意义的,但是下一个。5行没有意义,第二个RDD首先也是这样。5行,我的问题是否有效?。请提供建议如何处理这个问题?。 谢谢比尔。

  • 在调用参数化版本的CreateStream时,我也会遇到类似的错误。 你知道有什么问题吗?

  • 在配置spark应用程序时,我试图从集群中挤出每一点,但似乎我并没有完全正确地理解每一件事。因此,我正在AWS EMR集群上运行该应用程序,该集群具有1个主节点和2个m3类型的核心节点。xlarge(每个节点15G ram和4个vCPU)。这意味着,默认情况下,每个节点上为纱线调度的应用程序保留11.25 GB。因此,主节点仅由资源管理器(纱线)使用,这意味着剩余的2个核心节点将用于调度应用程序(

  • 我遵循火花流水槽集成的指导。但我最终无法获得任何事件。(https://spark.apache.org/docs/latest/streaming-flume-integration.html)谁能帮我分析一下?在烟雾中,我创建了“avro_flume.conf”的文件,如下所示: 在文件中,123.57.54.113是本地主机的ip。 最后,根本没有任何事件。 怎么了?谢谢!

  • 我正在使用twitter cloudera示例创建一个表,虽然我已经成功地创建了表并获得了数据,但我遇到了一个问题。 我可以执行并返回数据,但当我进行更深入的操作(如)时,我会收到一个错误。 以下是错误和堆栈跟踪: hive>从tweets中选择计数(*);MapReduce作业总数=1启动作业1编译时确定的1个reduce任务中的1个:1为了更改还原器的平均负载(以字节为单位):set hive