当前位置: 首页 > 知识库问答 >
问题:

来自...错误有效负载的无效状态代码“400”:“需求失败:会话未激活

尤博达
2023-03-14

我正在运行Pyspark脚本,将数据帧写入jupyter笔记本中的csv,如下所示:

df.coalesce(1).write.csv('Data1.csv',header = 'true')

运行一小时后,我得到以下错误。

错误:来自超文本传输协议的无效状态代码://......

我的配置如下:

spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("shuffle.service.enabled","true")
spark.conf.set("spark.dynamicAllocation.minExecutors",6)
spark.conf.set("spark.executor.heartbeatInterval","3600s")
spark.conf.set("spark.cores.max", "4")
spark.conf.set("spark.sql.tungsten.enabled", "true")
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.app.id", "Logs")
spark.conf.set("spark.io.compression.codec", "snappy")
spark.conf.set("spark.rdd.compress", "true")
spark.conf.set("spark.executor.instances", "6")
spark.conf.set("spark.executor.memory", '20g')
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.conf.set("spark.driver.allowMultipleContexts", "true")
spark.conf.set("spark.master", "yarn")
spark.conf.set("spark.driver.memory", "20G")
spark.conf.set("spark.executor.instances", "32")
spark.conf.set("spark.executor.memory", "32G")
spark.conf.set("spark.driver.maxResultSize", "40G")
spark.conf.set("spark.executor.cores", "5")

我检查了容器节点,错误是:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed:container_e836_1556653519610_3661867_01_000005 on host: ylpd1205.kmdc.att.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143

无法找出问题所在。

共有2个答案

东郭淇
2023-03-14

我不太精通pyspark,但在scala解决方案将涉及这样的东西

首先我们需要创建一个创建头文件的方法:

def createHeaderFile(headerFilePath: String, colNames: Array[String]) {

//format header file path
val fileName = "dfheader.csv"
val headerFileFullName = "%s/%s".format(headerFilePath, fileName)

//write file to hdfs one line after another
val hadoopConfig = new Configuration()
val fileSystem = FileSystem.get(hadoopConfig)
val output = fileSystem.create(new Path(headerFileFullName))
val writer = new PrintWriter(output)

for (h <- colNames) {
  writer.write(h + ",")
}
writer.write("\n")
writer.close()

}

您还需要一种调用hadoop的方法来合并将由df编写的部件文件。写入方法:

def mergeOutputFiles(sourcePaths: String, destLocation: String): Unit = {

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
// in case of array[String] use   for loop to iterate over the muliple source paths  if not use the code below 
//   for (sourcePath <- sourcePaths) {
  //Get the path under destination where the partitioned files are temporarily stored
  val pathText = sourcePaths.split("/")
  val destPath = "%s/%s".format(destLocation, pathText.last)

  //Merge files into 1
  FileUtil.copyMerge(hdfs, new Path(sourcePath), hdfs, new Path(destPath), true, hadoopConfig, null)
 // }
//delete the temp partitioned files post merge complete
val tempfilesPath = "%s%s".format(destLocation, tempOutputFolder)
hdfs.delete(new Path(tempfilesPath), true)
}

这是一个生成输出文件的方法,或者你的df.write方法,你可以将你的巨大DF写到hadoopHDFS:

def generateOutputFiles( processedDf: DataFrame, opPath: String, tempOutputFolder: String,
                       spark: SparkSession): String = {

  import spark.implicits._

  val fileName = "%s%sNameofyourCsvFile.csv".format(opPath, tempOutputFolder)
  //write as csv to output directory and add file path to array to be sent for merging and create header file
  processedDf.write.mode("overwrite").csv(fileName)

  createHeaderFile(fileName, processedDf.columns)
  //create an array of the partitioned file paths

  outputFilePathList = fileName

  // you can use array of string or string only depending on  if the output needs to get divided in multiple file based on some parameter  in that case chagne the signature ot Array[String] as output
  // add below code 
  // outputFilePathList(counter) = fileName
  // just use a loop in the above  and increment it 
  //counter += 1

  return outputFilePathList
}

这里定义的所有方法都是如何实现它们的:

def processyourlogic( your parameters  if any):Dataframe=
{
// your logic to do whatever needs to be done to your data
}

假设上述方法返回一个dataframe,以下是您如何将所有内容放在一起:

val yourbigD f = processyourlogic(your parameters) // returns DF
yourbigDf.cache // caching just in case you need it 
val outputPathFinal = " location where you want your file to be saved"
val tempOutputFolderLocation = "temp/"
val partFiles = generateOutputFiles(yourbigDf, outputPathFinal, tempOutputFolderLocation, spark)
mergeOutputFiles(partFiles, outputPathFinal)

如果你还有其他问题,请告诉我。如果您寻求的答案不同,则应询问原始问题。

毛越
2023-03-14

根据输出判断,如果您的应用程序没有以失败状态完成,这听起来像是Livy超时错误:您的应用程序可能需要比Livy会话定义的超时更长的时间(默认为1h),因此,即使Spark应用程序成功,如果应用程序需要比Livy会话的超时更长的时间,您的笔记本也会收到此错误。

如果是这种情况,以下是解决方法:

  1. 编辑etc/livy/conf/livy。conf文件(在集群的主节点中)
 类似资料:
  • 使用Alamofire 4/Swift 3,您如何区分由于以下原因而失败的请求: 网络连接(主机关闭,无法连接到主机)vs 代码: 我们希望以不同的方式处理每个案件。在后一种情况下,我们要询问回答。(在前一种情况下,我们不知道,因为没有回应)。

  • 问题内容: 我在Azure中托管了一个非常简单的.NET Web API,它有两种非常简单的方法: 我创建了一个简单的插件来调用这些方法。在我的AngularJS代码中,我进行了两个非常简单的$ http调用。GET工作正常。但是,POST始终返回“ 400(错误请求)”,然后在WebStorm控制台中不久显示“ XMLHttpRequestion无法加载…无效的HTTP状态代码400”。 任何和

  • 我正在尝试使用Ajax进行REST调用(POST)。这是我的AJAX代码 最初,我得到了这个错误:XMLHttpRequest无法加载http://localhost:port/service/myservice。对preflight请求的响应未通过访问控制检查:请求的资源上没有“access-control-allow-origin”标头。因此不允许访问源“null”。响应的HTTP状态代码为4

  • 我正在尝试登录一个网站(https://dashboard.ngrok.com/user/login)使用jsoup。我对GET请求没有任何问题,但当我尝试使用凭证执行POST请求时,我收到: HTTP错误获取URL。状态=400 我尝试为请求设置一个更好的头,使用我在连接发出POST请求时发送的相同参数。 我也尝试过这种类型的请求: 结果显示: 线程“main”组织中出现异常。jsoup。Htt

  • 问题内容: 我知道有很多这样的问题,但是我所见的问题都没有解决。我已经使用了至少3个微框架。所有这些都无法执行简单的POST,它应该将数据返回: angularJS客户端: SlimPHP服务器: 我已启用CORS,并且GET请求有效。html使用服务器发送的JSON内容进行更新。但是我得到了 XMLHttpRequest无法加载 http:// localhost:8080 / server.p

  • 需要一些帮助!!我对Drools中的有状态和无状态会话没有清晰的理解。我正在努力理解这一点,所以尝试了一个例子。 我在drools6.5版本上使用有状态和无状态会话测试了下面的drl,在这两种情况下都得到了相同的输出。根据我对无状态会话的理解,它应该只执行第一条规则,当应用程序对象在第一条规则中被修改时,第二条规则不应该被激活(“有效期”)。附加源代码。感谢您在这方面的帮助。