在从< code>RDD制作< code >数据帧时,我遇到了一个错误。
from pyspark.ml.fpm import FPGrowth
sogou = sc.textFile("SogouQ.sample.utf8", use_unicode = False)
def parse(line):
value = [ x for x in line.split(",") if x]
return list(set(value))
rdd = sogou.map(parse)
df = sogou.toDF('items')
我收到以下错误:
py spark . SQL . utils . parse exception:u " \ nmis matched input ' '应为{'SELECT ',' FROM ',' ADD ',' AS ',' ALL ',' DISTINCT ',' WHERE ',' GROUP ',' BY ',' GROUPING ',' SETS ',' CUBE ',' ROLLUP ',' ORDER ',' HAVING ',' LIMIT ',' AT ',' OR ',' AND ',' IN ',NOT,' NO ',' EXISTS ',' BETWEEN ',' LIKE ',RLIKE,' IS ',' NULL ',' TRUE ',' FALSE ',' NULL ',' ASC ',' FOR ',' INTERVAL FORMAT ',' LOGICAL ',' CODEGEN ',' COST ',' CAST ',' SHOW ',' TABLES ',' COLUMNS ',' DROP ',' UNION ',' EXCEPT ',' MINUS ',' INTERSECT ',' TO ',' TABLESAMPLE ',' STRATIFY ',' ALTER ',' RENAME ',' ARRAY ',' DATA ',' START ',' TRANSACTION ',' COMMIT ',' ROLLBACK ',' MACRO ',' IGNORE ',' BOTH ',' BOTH ',' LEADING ',' TRAILING ',' IF ',' POSITION ',' DIV ',' percenter 目录','位置','交换','存档','取消存档','文件格式','触摸','压缩','连接','更改','级联','限制','聚集','排序','清除','输入格式','数据库,数据库,' DFS ','截断','分析','计算','列表','统计','分区','外部','定义','撤销','授予','锁定','解锁',' MSCK ','修复','恢复','导出','导入','加载','角色','角色','压缩','委托人','事务','索引','索引','
文本包含中文
。有关系吗?文字是这样的:
360,安全卫士,
123,123,范冰冰,
当我使用< code > py spark . ml lib . FP growth 时,< code>rdd工作正常。怎么转换成dataframe?
这里有两个不同的问题:
> < li>
toDF
调用。< code>RDD.toDF具有以下签名:
Signature: rdd.toDF(schema=None, sampleRatio=None)
架构
应为的位置
参数模式:pyspark.sql.types.结构类型
或列名列表
所以在你的情况下,它应该是:
sogou.toDF(["items"])
解析
方法:
createDataFrame
由df
调用的方法需要一个RDD[tuple]
或等价物,它可以映射到结构
,除非提供了架构。如果您只想使用名称,它应该返回一个tuple
def parse(line):
value = [ x for x in line.split(",") if x]
return list(set(value)),
组合:
>>> def parse(line):
... value = [ x for x in line.split(",") if x]
... return list(set(value)),
...
...
>>> rdd = sc.parallelize(["360,安全卫士,", "123,123,范冰冰,"])
>>> rdd.map(parse).toDF(["items"]).show()
+--------------+
| items|
+--------------+
| [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+
另一种选择(保留当前的解析实现)是
>>> from pyspark.sql.types import ArrayType, StringType
>>> def parse(line):
... value = [ x for x in line.split(",") if x]
... return list(set(value))
>>> rdd.map(parse).toDF(ArrayType(StringType())).toDF("items").show()
+--------------+
| items|
+--------------+
| [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+
RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:
我尝试使用以下代码获取数据帧的分区数量: 按照我的理解,dataframe通过元数据给rdd增加了一个结构层。那么,为什么在转换成rdd时要花这么多时间呢?
我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:
我用Avro(序列化器和反序列化器)收到Kafka主题的推文。然后,我创建了一个spark consumer,它在RDD[GenericRecord]的数据流中提取推文。现在,我想将每个rdd转换为数据帧,通过SQL分析这些推文。有什么解决方案可以将RDD[GenericRecord]转换为数据帧吗?
我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。
我需要使用 DataFrame上不可用的方法。所有DataFrame方法都只引用DataFrame结果。那么,如何从数据帧数据中创建RDD呢? 注意:这是对 1.2.0 的更改(在 1.3.0 中)。 更新来自@dpangmao的回答:方法是. rdd。我很想知道(a)它是否是公共的,以及(b)它对性能有何影响。 好吧(a)是和(b)-好吧,您可以在这里看到有显着的perf含义:必须通过调用map