当前位置: 首页 > 面试题库 >

PySpark DataFrames-枚举而不转换为熊猫的方法?

诸葛苏燕
2023-03-14
问题内容

我有一个很大的 pyspark.sql.dataframe.DataFrame 名为df。我需要某种枚举记录的方式-
因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)

在大熊猫中,我可以

indexes=[2,3,6,7] 
df[indexes]

在这里我想要类似的东西 (并且不将数据框转换为熊猫)

我最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:
        indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
* 使用where()函数搜索所需的值。

问题:

  1. 为什么它不起作用以及如何使其起作用?如何在数据框中添加一行?
  2. 以后可以做类似的事情吗:
         indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
  1. 有没有更快,更简单的处理方法?

问题答案:

它不起作用,因为:

  1. 的第二个参数withColumn应该Column不是一个集合。np.array在这里不会工作
  2. 当您将"index in indexes"SQL表达式传递给时where indexes超出范围,并且不能将其解析为有效标识符

PySpark > = 1.4.0

您可以使用相应的窗口函数添加行号,并使用 Column.isin方法或格式正确的查询字符串进行查询:

    from pyspark.sql.functions import col, rowNumber
    from pyspark.sql.window import Window

    w = Window.orderBy()
    indexed = df.withColumn("index", rowNumber().over(w))

    # Using DSL
    indexed.where(col("index").isin(set(indexes)))

    # Using SQL expression
    indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))

看起来调用无PARTITION BY子句的窗口函数会将所有数据移动到单个分区,因此上述毕竟不是最佳解决方案。

有没有更快,更简单的处理方法?

并不是的。Spark DataFrames不支持随机行访问。

PairedRDD``lookup如果使用进行分区,则可以使用相对较快的方法进行访问HashPartitioner。还有一个index-
rdd
项目,它支持有效的查找。

编辑

与PySpark版本无关,您可以尝试执行以下操作:

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, LongType

    row = Row("char")
    row_with_index = Row("char", "index")

    df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
    df.show(5)

    ## +----+
    ## |char|
    ## +----+
    ## |   a|
    ## |   b|
    ## |   c|
    ## |   d|
    ## |   e|
    ## +----+
    ## only showing top 5 rows

    # This part is not tested but should work and save some work later
    schema  = StructType(
        df.schema.fields[:] + [StructField("index", LongType(), False)])

    indexed = (df.rdd # Extract rdd
        .zipWithIndex() # Add index
        .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
        .toDF(schema)) # It will work without schema but will be more expensive

    # inSet in Spark < 1.3
    indexed.where(col("index").isin(indexes))


 类似资料:
  • 我有一个非常大的pyspark.sql.dataframe.dataframe名为df。我需要一些枚举记录的方法--因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组) 在熊猫身上,我可以 null > 为什么不起作用,如何使其起作用?如何向DataFrame添加行? 以后制作类似: 有没有更快更简单的处理方法?

  • 今天,我尝试迁移到新的(谷歌)Firebase。 不仅API不同,数据库中存储数据的方式也不同。例如,旧Firebase最初以这种方式将数据写入数据库时,数据库中保存为“7.5”的数字(双精度)没有正确解析(标识为字符串)。必须手动重写双打(“7.5”到7.5)、布尔型(“true”到true)等数据库。 现在我遇到了一个新问题。尝试将枚举写入数据库时出错: 致命异常:主进程:com.aayaff

  • 包中的collections.list()方法返回而不是有充分的理由吗?

  • 问题内容: 我已经读过一个对Pandas的SQL查询,并且值以dtype’object’的形式出现,尽管它们是字符串,日期和整数。我能够将日期“ object”转换为Pandas datetime dtype,但是在尝试转换字符串和整数时遇到错误。 这是一个例子: 将转换为日期时间可以: 但是尝试将转换为整数时出现错误: 注意:我尝试时遇到类似的错误 当尝试转换为字符串时,似乎什么也没有发生。 问

  • 我有一个字段在熊猫DataFrame被导入为字符串格式。它应该是日期时间变量。如何将其转换为日期时间列,然后根据日期进行筛选。 示例: 数据帧名称:原始数据

  • 问题内容: 给定以下枚举,将Int强制转换为Java中的枚举的正确方法是什么? 问题答案: 尝试在必须为or的地方,即该枚举的有效序数。 请注意,在Java中,枚举实际上是类(因此,枚举值是对象),因此您不能将an 甚至转换为枚举。