当前位置: 首页 > 知识库问答 >
问题:

使用pyspark的rdd中的子字符串

葛奇
2023-03-14

我不是RDD方面的专家,正在寻找一些答案,我试图在pyspark RDD上执行一些操作,但无法实现,特别是子串。我知道我可以通过将RDD转换为DF来做到这一点,但想知道在DF时代之前是如何做到这一点的?公司仍然更喜欢在RDD或数据帧中工作吗?

我的代码:

rdd= sc.textFile("Sales.txt")
##Taking only required columns and changing the data types
rdd_map = rdd.map(lambda line: (int((line.split("|")[0])),int((line.split("|")[1])),line.split("|")[4]))
##Filtering the data
rdd_filter = rdd_map.filter(lambda x: (x[0] > 43668) & ('-' in x[2]))
## Trying to perform substring
rdd_clean = rdd_filter.map(lambda x: x.substr(x[2],1,3))

数据样本:

43665|63|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R
43668|87|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R

完整的错误消息:

org.apache.spark.SparkException:由于阶段失败而中止作业:阶段50.0中的任务0失败1次,最近的失败:在阶段50.0中丢失任务0.0(TID 152,localhost,执行程序驱动程序):org.apache.spark.api.python.PythonException:Traceback(最近调用最后一次):

共有1个答案

濮阳宁
2023-03-14

我认为您可以使用flatMap()和列表理解来简化一些转换步骤:

>>> rdd = sc.parallelize([
      '43665|63|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R'
    , '43668|87|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R'
])

>>> rdd_clean = rdd.flatMap(lambda x: [ (int(e[0]), int(e[1]), e[4][:3]) for e in [x.split('|')] if ('-' in e[4]) & (int(e[0]) > 43665) ])

>>> rdd_clean.collect()
[(43668, 87, 'HL-')]    

其中,我使用flatMap()设置三项元组,并移动filter(),将x[2]的子字符串带入列表理解。如果您坚持原来的方法,只需执行以下操作:

rdd_clean = rdd_filter.map(lambda x: (x[0], x[1], x[2][:3]))
 类似资料:
  • 我必须将Scala代码转换为python。 scala代码将string的RDD转换为case类的RDD。代码如下: 可以在PySpark中实现吗?我尝试使用以下代码,但出现错误 错误Py4JJavaError:调用z:org时出错。阿帕奇。火花应用程序编程接口。蟒蛇蟒蛇。收集和服务:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段21.0中的任务0失败1次,最近的失败:

  • 这篇文章给出了一个关于如何更改列名的很好的概述。如何在PySpark中更改dataframe列名? 不过,我需要一些更多的/稍微调整,我没有能力做。有人能帮我删除所有colnames中的空格吗?它需要例如连接命令,系统方法减少了处理30列的工作量。我认为regex和UDF的组合最好。 示例:root--客户机:string(nullable=true)--分支号:string(nullable=t

  • null 有人能帮我弄清楚为什么上面的代码给出了错误的答案吗?

  • 我习惯于在JavaScript中这样做: 斯威夫特没有这个功能,怎么做类似的东西?

  • 我想转换一个组织。阿帕奇。火花sql。数据框到组织。阿帕奇。火花rdd。RDD[(字符串,字符串)]在数据块中。有人能帮忙吗? 背景(也欢迎使用更好的解决方案):我有一个Kafka流,它(经过一些步骤)变成了2列数据帧。我想将其放入Redis缓存,第一列作为键,第二列作为值。 更具体地说,输入的类型是:。我尝试将以下内容放入Redis: 错误消息如下所示: 我已经尝试过一些想法(比如函数、rdd)

  • 场景是:EventHub- 文件格式:CSV(带引号、管道分隔和自定义架构) 我正在尝试读取来自eventhub的CSV字符串。Spark成功地使用正确的模式创建了数据框,但在每条消息之后,数据框最终都是空的。 我设法在流媒体环境之外做了一些测试,当从文件中获取数据时,一切都很顺利,但当数据来自字符串时,一切都失败了。 所以我找到了一些链接来帮助我,但没有一个工作: can-i-read-a-cs