我有以下问题,因为我是pyspark的新手。基于来自两列的值的条件,我想填充第三列。条件是:
这就是数据帧的样子。
我创建了下面的udf,它给出了错误“不能将列转换为布尔值:请使用”
import pyspark.sql.functions as F
def split_by_semicolon_if_exists(col1,col2):
if (col1.contains(';') == True):
if col2 == 1:
result = F.substring(col1, 0, (F.length(col1) - F.expr('locate('';'', col1) - 1')))
if col2 == 2:
result = F.substring(col1, F.expr('locate('';'', col1) - 1'), (F.length(col1) - F.expr('locate('';'', col1) - 1')))
return result
else:
return col1
df = df.withColumn('col3',
split_by_semicolon_if_exists(df['col1'],
df['col2']))
我通过谷歌搜索各种功能构建了这个udf,所以它可能有多个问题。你能帮我为这个案例构建一个udf吗?
您可以在此处使用 expr
,而不必使用 udf。由于 Python 索引从 0 开始,因此您需要从 col1 中减去 1:
from pyspark.sql import functions as F
df.withColumn("Result",F.expr("""split(col2,';')[int(col1)-1]""")).show()
+----+---------+-----+------+
|col1| col2| col3|Result|
+----+---------+-----+------+
| 1|24.9;34.9| 24.9| 24.9|
| 2|24.9;34.9| 34.9| 34.9|
| 1|80.8;90.9| 80.8| 80.8|
| 2|80.8;90.9| 90.9| 90.9|
| 1| 777|777.0| 777|
+----+---------+-----+------+
新列Result
与您在col3
中的输出相同
您可以使用以下代码:
import pyspark.sql.functions as F
import pyspark.sql.types as T
df =spark.createDataFrame(
data = [(1, "24.9;34.9"),
(2,"24.9;34.9"),
(1,"80.8;90.9"),
(2,"80.8;90.9"),
(1,"777")],
schema=["col1","col2"])
df.show()
def split_by_semicolon_if_exists(col1,col2):
if ';' in col2 :
if col1 == 1:
result = col2.split(';')[0]
if col1 == 2:
result = col2.split(';')[1]
return result
else:
return col2
split_by_semicolon_if_exists_udf =F.udf(split_by_semicolon_if_exists , T.StringType())
df = df.withColumn('col3', split_by_semicolon_if_exists_udf(df['col1'], df['col2']))
df.show()
要在数据帧的列上使用函数,必须使用F.udf(函数,返回参数类型)将它们声明为udf函数
可以查看这个文档https://sparkbyexamples . com/py spark/py spark-UDF-user-defined-function/
另外,python有更简单的函数来管理字符串,比如
if 'string' in stringVariable :
(如果主字符串中存在子字符串,则为True/False)
您还可以使用来分割特定字符中的字符串
string.split(';')
(返回分离部件的数组)
看看拆分
函数。
使用自动进稿器:
spark = SparkSession.builder.getOrCreate()
data = [
{"col1": 1, "col2": "24.9;34.9"},
{"col1": 2, "col2": "24.9;34.9"},
{"col1": 1, "col2": "80.8;90.9"},
{"col1": 1, "col2": "777"},
]
df = spark.createDataFrame(data)
def get_value(item, value):
if ";" in value:
return value.split(";")[item - 1]
return value
df = df.withColumn("col3", F.udf(get_value, StringType())(F.col("col1"), F.col("col2")))
没有UDF:
df = df.withColumn(
"col3",
F.when(
F.col("col2").contains(";"), F.split("col2", ";").getItem(F.col("col1") - 1)
).otherwise(F.col("col2")),
)
结果:
root
|-- col1: long (nullable = true)
|-- col2: string (nullable = true)
|-- col3: string (nullable = true)
+----+---------+----+
|col1|col2 |col3|
+----+---------+----+
|1 |24.9;34.9|24.9|
|2 |24.9;34.9|34.9|
|1 |80.8;90.9|80.8|
|1 |777 |777 |
+----+---------+----+
根据和的公共列在中使用填充列的最佳方法是什么? 得到: 编辑:我还想将列名更改为,这样预期的输出如下所示: 尝试过: 它发现了错误: 更新2: : :
问题内容: 我目前有一张看起来像这样的表: 我需要做的是获得“ 费率”列的信息,但每个名称仅获得一次。例如,我有三行John Doe,每行的比率为8。我需要将这些行的比率设为8,而不是24,因此它为每组名称都对比率进行一次计数。 当然是行不通的,因为我试图对比率列而不是名称求和。我知道在对单个记录进行计数时,我可以使用,这就是我试图从中得到的行为类型。 我怎样才能为每个名字获得一个比率? 提前致谢
我正在使用pyspark下面是我的数据
我想在Pandas数据集中创建一个新列,基于另外两个列的值。 现在,应该如下所示: 有什么帮助吗?
试图在熊猫中复制一个简单的Excel函数,但没有成功。还没有尝试np.where(),因为我想学习lambda函数,尽可能少依赖导入。 复制的函数: Lambda我测试和工作: 不起作用的熊猫的λ: 错误: 我猜它试图计算整个列,而不是逐行计算,我该如何解决这个问题?