当前位置: 首页 > 知识库问答 >
问题:

无法将RDD转换为DataFrame(RDD有数百万行)

华君浩
2023-03-14

我使用的是Apache Spark 1.6.2

我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame

映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误

Traceback (most recent call last):
  File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module>
    result_iso = input_iso.map(extract_iso).toDF()
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema
  File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first
  File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take
  File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob
  File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
  File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco
  File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error

以下是我的代码:

def extract_iso(line):
    fields = line.split(',')
    return [fields[-2], fields[1]]

input_iso = sc.textFile("data.csv")
result_iso = input_iso.map(extract_iso).toDF()

data.csv有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了

数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数据总是位于完全相同的索引[1]和[-2](第二列和最后第二列),这些列之间的总列因行而异

非常感谢你的回答:)

共有1个答案

勾海超
2023-03-14

最有可能的原因是Spark试图识别新创建的DataFrame的模式。尝试第二种将RDD映射到DF-Specific模式的方法,并通过createDataFrame,例如:

>>> from pyspark.sql.types import *
>>> schema = StructType([StructField('a', StringType()),StructField('b', StringType())])
>>> df = sqlContext.createDataFrame(input_iso.map(extract_iso), schema)
 类似资料:
  • 我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext

  • 我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:

  • 我对Spark和Scala相对较新。 我从以下数据帧开始(由密集的双倍向量组成的单列): 直接转换为RDD将生成一个org实例。阿帕奇。火花rdd。RDD[org.apache.spark.sql.Row]: 有人知道如何将此DF转换为org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.向量]的实例吗?到目前为止,我的各种尝试都没有成功。

  • 嗨,伙计们,我有下一个问题。我正在使用Java的Apache Spark Streaming v1.6.0来获取来自IBMMQ的一些消息。我为MQ制作了自定义接收器,但我遇到的问题是我需要将RDD从JavaDStream转换为DataFrame。为此,我使用foreachRDD迭代JavaDStream,并定义了DataFrame的模式,但当我运行作业时,第一条消息会引发下一个异常: Java语言

  • 有人能分享一下如何将转换为吗?

  • 问题内容: 我想 在Databricks中将转换为。 有人可以帮忙吗? 背景 (也欢迎一个更好的解决方案):我有一个Kafka流,经过一些步骤后,该流变成了2列数据帧。我想将其放入Redis缓存中,第一列作为键,第二列作为值。 更具体地说 ,输入的类型是:。我尝试放入Redis,如下所示: 错误消息如下所示: 我已经玩过一些想法(例如function ),但是没有一个帮助。 问题答案: 如果要将行