当前位置: 首页 > 知识库问答 >
问题:

当我使用SparkStreaming处理Kafka的消息时,我得到了NullPointerException

何安宜
2023-03-14

我正在为Kafka和SparkStreaming编写一些代码,当我将它们放在Yarn-Cluster上时,它报告了NullPointerException

但它在我的电脑上运行良好(独立模式)

那它有什么问题呢?

//这是代码

import java.util.Properties

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.html" target="_blank">sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DealLog extends App {

  val spark=SparkSession.builder().appName(" DealLog").getOrCreate()
  val sc = spark.sparkContext
  val ssc: StreamingContext= new StreamingContext(sc, Seconds(3))

  val log = Logger.getLogger(this.getClass)
  val pro = new Properties()
  val in = Thread.currentThread().getContextClassLoader.getResourceAsStream("config.properties")
  pro.load(in)
  //  ssc.checkpoint("hdfs://192.168.0.240:8022/bigdata/checkpoint2")
  val bootstrap=pro.getProperty("kafka.brokers")
  val kafkaParams = Map[String, Object]("bootstrap.servers" -> bootstrap,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "userlabel",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )
  val topicsSet = Array(pro.getProperty("kafkaconsume.topic"))
  val ds = KafkaUtils.createDirectStream[String,String](
    ssc,
    PreferConsistent,
    Subscribe[String,String](topicsSet,kafkaParams)
  ).map(s=>{(s.value())})

  ds.foreachRDD(p=>{
    log.info("ds.foreachRdd p=="+ p)
    p.foreachPartition(per=>{
      log.info("per-------"+per)
      per.foreach(rdd=> {
        log.info("rdd---------"+ rdd)
        if(rdd.isEmpty){
          log.info("null ")
        }
        else{
          log.info("not null..")
        }
        log.info("complete")

      })
    })
  })
  ssc.start()
  ssc.awaitTermination()
}

这里例外-----------------------------------

19/07/26 18:21:56警告Scheduler.TaskSetManager:在stage 0.0中丢失任务0.0(TID 0,cdh102,执行器2):java.lang.NullPointerException at Recomment.Deallog$$AnonFun$2$$AnonFun$Apply$1.Apply(Deallog.scala:42)at Recomment.Deallog$$AnonFun$2$$AnonFun$Apply$1.Apply(Deallog.scala:41)at org.apache.spark.rdd.rdd$$AnonFun$ForeachPartition在java.lang.Thread.Run(thread.java:745)

19/07/26 18:21:56 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, cdh102, executor 2, partition 0, PROCESS_LOCAL,

4706字节)19/07/26 18:21:56信息调度器.TaskSetManager:丢失了cdh102上阶段0.0(TID 1)中的任务0.1,executor 2:java.lang.NullPointerException(null)[duplicate 1]19/07/26 18:21:56 INFO Scheduler.TaskSetManager:在stage 0.0(TID 2,cdh102,executor 2,partition 0,PROCESS_LOCAL,4706字节)中启动任务0.2 19/07/26 18:21:56 INFO Scheduler.TaskSetManager:在stage 0.0(TID 2,cdh102,executor 2,partition 0,PROCESS_LOCAL,4706字节)中丢失任务0.2,cdh102,executor 2,executor 2,executor 2,executor正在中止作业19/07/26 18:21:56信息集群。YarnClusterScheduler:已从池19/07/26 18:21:56信息集群中删除任务已全部完成的TaskSet 0.0。YarnClusterScheduler:取消阶段0 19/07/26 18:21:56信息调度器。DagScheduler:由于阶段失败导致作业中止而导致ResultStage 0(foreachPartition在Deallog.Scala:41)在1.092s中失败:阶段0.0中的任务0失败了4次,最近的失败:丢失了阶段0.0中的任务0.3(TID 3,cdh102,executor 2):java.lang.NullPointerException(建议)。71)在org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87)在org.apache.spark.scheduler.task.run(task.scala:109)在org.apache.spark.executor.executor$taskrunner.run(executor.scala:338)在java.util.concurrent.ThreadPoolExecutor.runworker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor

共有1个答案

琴刚豪
2023-03-14

我想你的问题可能来自这条线

if(rdd.isEmpty)

因为您编写代码的方式实际上并不是RDD。在调用foreachPartition之后,您将获得该分区的迭代器。然后,当您在该迭代器上调用foreach时,您将访问该分区迭代器上的实际记录。所以在这一行上处理的是来自dStream的记录。因此,您可能会对抛出该异常的空字符串/值调用.isempty

可以将.isempty替换为

if(record == null)

但你不必那么做。您可以只检查RDD本身是否为空。你能试试下面的吗?

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DealLog extends App {

  val spark = SparkSession.builder().appName(" DealLog").getOrCreate()
  val sc = spark.sparkContext
  val ssc: StreamingContext = new StreamingContext(sc, Seconds(3))

  val log = Logger.getLogger(this.getClass)
  val pro = new Properties()
  val in = Thread.currentThread().getContextClassLoader.getResourceAsStream("config.properties")
  pro.load(in)
  //  ssc.checkpoint("hdfs://192.168.0.240:8022/bigdata/checkpoint2")
  val bootstrap = pro.getProperty("kafka.brokers")
  val kafkaParams = Map[String, Object]("bootstrap.servers" -> bootstrap,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "userlabel",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (true: java.lang.Boolean)
  )
  val topicsSet = Array(pro.getProperty("kafkaconsume.topic"))
  val ds = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topicsSet, kafkaParams)
  ).map(s => {
    (s.value())
  })

  ds.foreachRDD(rdd => {
    log.info("ds.foreachRdd p==" + rdd)
    if (!rdd.isEmpty) {
      rdd.foreachPartition(partition => {
        log.info("per-------" + partition)
        partition.foreach(record => {
          log.info("record---------" + record)
        })
      })
    } else log.info("rdd was empty")

    log.info("complete")
  })
  ssc.start()
  ssc.awaitTermination()
  ssc.stop()
}
 类似资料:
  • 问题内容: 我试图用groovy为Swagger页面编写Selenium对象构建器。为了便于讨论,我的问题代码可以简化为以下几种: 调用它非常简单(来自JUnit3): 该构造函数是一个可怕的眼睛痛!我试图用以下等待替换它: 要么: 两者都产生了相同的结果:以开头的行中的闭包中的“ org.openqa.selenium.StaleElementReferenceException:元素不再连接到

  • 我看过很多关于如何使用硒的示例脚本 switch_to.window 这是一个关于我所学的示例脚本,根本不起作用: 我得到了错误消息: InvalidArgumentException:预期“handle”为字符串,但得到了[object Undefined]未定义 很明显,我也厌倦了同样结果的普通网页。 有人有同样的问题吗?

  • 当我运行旧的android项目时,我得到一条错误消息: 应用插件:'com.google.gms.google-services' “警告:配置”编译“已过时,已被”实现“和”API“所取代。将于2018年底删除。有关详细信息,请参阅:http://d.android.com/r/tools/update-dependency-configurations.html受影响的模块:app”

  • 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何

  • 我用的是spring-kafka-2.1.10。RELEASE尝试跳过错误数据,但失败, 它落在一个无限消耗的循环中 我有什么要说的吗? 如果你能帮忙,我将不胜感激 简单代码: ========================= ====================================================配置:==============================

  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。