当前位置: 首页 > 知识库问答 >
问题:

sql SparkSession load()with schema:schema中的非StringType字段使所有值为空

白星渊
2023-03-14
python
Python 3.6.5 (default, Jun 17 2018, 12:13:06) 
[GCC 4.2.1 Compatible Apple LLVM 9.1.0 (clang-902.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyspark
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>> 
>>> data_flnm = 'four_cols.csv'

>>> lines = [ln.rstrip() for  ln in open(data_flnm).readlines()[:3]]
>>> lines
['zzzc7c09:66d7:47d6:9415:87e5010fe282|2019-04-08|0|f', 'zzz304fa:6fc0:4337:91d0:05ef4657a6db|2019-07-08|1|f', 'yy251cf0:aa11:44e9:88f4:f6f9c1899cee|2019-05-13|0|t']


>>> parts = [ln.split("|") for ln in lines]
>>> parts
[['zzzc7c09:66d7:47d6:9415:87e5010fe282', '2019-04-08', '0', 'f'], ['zzz304fa:6fc0:4337:91d0:05ef4657a6db', '2019-07-08', '1', 'f'], ['yy251cf0:aa11:44e9:88f4:f6f9c1899cee', '2019-05-13', '0', 't']]

>>> cols1 = [StructField('u_id', StringType(), True), StructField('week', StringType(), True), StructField('flag_0_1', StringType(), True), StructField('flag_t_f', StringType(), True)]
>>> cols2 = [StructField('u_id', StringType(), True), StructField('week', DateType(), True), StructField('flag_0_1', IntegerType(), True), StructField('flag_t_f', BooleanType(), True)]
>>> sch1 = StructType(cols1)
>>> sch2 = StructType(cols2)
>>> sch1
StructType(List(StructField(u_id,StringType,true),StructField(week,StringType,true),StructField(flag_0_1,StringType,true),StructField(flag_t_f,StringType,true)))
>>> sch2
StructType(List(StructField(u_id,StringType,true),StructField(week,DateType,true),StructField(flag_0_1,IntegerType,true),StructField(flag_t_f,BooleanType,true)))

>>> spark_sess = SparkSession.builder.appName("xyz").getOrCreate()
19/09/10 19:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

>>> df1 = spark_sess.read.format('csv').option("nullValue", "null").load([data_flnm], sep='|', schema = sch1)
>>> df2 = spark_sess.read.format('csv').option("nullValue", "null").load([data_flnm], sep='|', schema = sch2)

>>> df1.show(5)
+--------------------+----------+--------+--------+
|                u_id|      week|flag_0_1|flag_t_f|
+--------------------+----------+--------+--------+
|zzzc7c09:66d7:47d...|2019-04-08|       0|       f|
|zzz304fa:6fc0:433...|2019-07-08|       1|       f|
|yy251cf0:aa11:44e...|2019-05-13|       0|       t|
|yy1d2f8e:d8f0:4db...|2019-07-08|       1|       f|
|zzz5ccad:2cf6:44e...|2019-05-20|       1|       f|
+--------------------+----------+--------+--------+
only showing top 5 rows

>>> df2.show(5)
+----+----+--------+--------+
|u_id|week|flag_0_1|flag_t_f|
+----+----+--------+--------+
|null|null|    null|    null|
|null|null|    null|    null|
|null|null|    null|    null|
|null|null|    null|    null|
|null|null|    null|    null|
+----+----+--------+--------+
only showing top 5 rows

>>> 

PS:无法添加标记“structfield”和“structtype”:信誉不够(__。

共有1个答案

梅欣然
2023-03-14

解析时,youu需要将flag_t_f列作为字符串读取。以下模式将起作用:

StructType(List(StructField(u_id,StringType,true),StructField(week,DateType,true),StructField(flag_0_1,IntegerType,true),StructField(flag_t_f,StringType,true)))

之后,如果需要,可以向dataframe添加布尔列:

import pyspark.sql.functions as f
df = df.withColumn("flag_t_f", 
      f.when(f.col("flag_t_f") == 'f', 'False')
      .when(f.col("flag_t_f") == 't', 'True')          
     )

如果有多个布尔列的值为“f”和“not”,则可以通过遍历所有列来转换所有这些列

cols = df.columns
for col in cols:
    df = df.withColumn(col, 
        f.when(f.col(col) == 'f', 'False')
         .when(f.col(col) == 't','True')
         .otherwise(f.col(col))
    )
 类似资料:
  • 我有一个包含以下列的表:id、col1、col2、col3、col4、col5、col6。 约束表示至少有3列被填充(所以最多3个NULs)。(列不按顺序填充,所以可以有col1、col2、col5被填充,col3、col4、col6是NULs) 如何确保当该列不为NULL时,它在此行的其他列中是唯一的?如何确保非空值的组合在所有行中都是唯一的? 我目前添加了以下约束(以确保至少3个非空):

  • 问题内容: 是否可以在每个表的每个字段中搜索Oracle中的特定值? 有几百个表,有些表中有成千上万的行,因此我知道这可能需要很长时间才能查询。但是我唯一知道的是我要查询的字段的值是1/22/2008P09RR8。< 我已经尝试过使用以下语句根据我认为应命名的内容找到合适的列,但未返回任何结果。 这个数据库上绝对没有文档,我也不知道该字段是从哪里提取的。 有什么想法吗? 问题答案: 引用: 我已尝

  • 我的预期结果如下,即“resource”元素下的所有非空字段。 我的当前编码:

  • 问题内容: 我需要将单行的所有而不是空值放入一个字符串中,例如 表: 导致: 重要说明-我不知道字段名称/类型,因此它应该遍历所有字段,并且所有非null值都将添加到列表中。 看起来它可以使用xquery做到这一点,但找不到正确的语法。有什么提示吗? 谢谢! 问题答案: select T2.N.value(‘local-name(.)’, ‘nvarchar(128)’)+’: ‘+ T2.N.v

  • 在Postgres9.5中,我无法从JSONB字段中的属性中选择非空值 我还尝试使用NotNull。 当我运行其中一个时,我收到错误42883。“错误:运算符不存在。JSONB->>布尔提示:没有与给定名称和参数类型匹配的运算符。您可能需要添加显式类型转换。”

  • 我有一个表,它可以被几个列过滤。过滤器很严格。启动数据时,它会显示所有值。然而,在按某列过滤并返回到empty选项以显示所有值之后,它将显示一个空表。如何仅对非空值应用严格筛选器?

  • 我有一个包含json字符串的数据框架df,如下所示, df 架构: 如何将其转换为字符串数组(数组类型(字符串类型())? 结果应该是这样的, 结果模式: 任何帮助都将不胜感激。谢谢你!