当前位置: 首页 > 知识库问答 >
问题:

在pyspark中以分布式方式高效生成大型DataFrame(无需pyspark.sql.行)

嵇出野
2023-03-14

问题归结为以下几点:我想在pyspark中生成一个数据帧,使用现有的并行化输入集合和一个给定一个输入可以生成相对大批量行的函数。在下面的示例中,我想使用1000个执行器生成10^12行数据帧:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(我真的不想研究给定种子的随机数分布——这只是我能够想出的一个例子,来说明大型数据帧不是从仓库加载的,而是由代码生成的情况)

上面的代码几乎完全符合我的要求。问题是,它以一种非常低效的方式来实现这一点——代价是为每一行创建一个python行对象,然后将python行对象转换为内部Spark列表示。

是否有一种方法可以通过让spark知道这些是一批值的列来转换已采用列表示法的一批行(例如,如上所述的一个或几个numpy数组np_数组)?

例如,我可以编写代码来生成python集合RDD,其中每个元素都是pyarrow。记录批或大熊猫。DataFrame,但我找不到一种方法,在不创建pyspark Row对象的RDD的情况下,将其中任何一个转换为Spark DataFrame。

至少有十几篇文章举例说明如何使用pyarrow pandas将本地(到驱动程序)pandas数据帧转换为Spark数据帧,但这不是我的选择,因为我需要在执行器上以分布式方式实际生成数据,而不是在驱动程序上生成一个数据帧并将其发送给执行器。

UPD。我发现了一种避免创建行对象的方法——使用Python元组的RDD。正如预期的那样,它仍然太慢,但仍然比使用行对象快一点。尽管如此,这并不是我真正要找的(这是一种从python向Spark传递柱状数据的非常有效的方法)。

还测量了在机器上进行某些操作的时间(粗略的方式在测量时间上有相当多的变化,但在我看来仍然具有代表性):所讨论的数据集是10M行,3列(一列是常数整数,另一列是整数范围0到10M-1,第三个是使用np.random.random_sample生成的浮点值:

  • 本地生成数据帧(10M行):~440-450ms

仅使用1个执行器和1个初始种子值生成Spark数据帧:

  • 使用spark.createDataFrame(row_rdd,模式=my_schema):~70-80
  • 使用spark.createDataFrame(tuple_rdd,模式=my_schema):~40-45s
  • (非分布式创建)使用spark.createDataFrame(pandas_df,模式=my_schema):~0.4-0.5s(没有熊猫df生成本身,这需要大约相同的时间)-将spark.sql.execution.arrow.enabled设置为true。

本地到驱动程序的熊猫数据帧在〜1秒内转换为10M行的火花数据帧的例子让我有理由相信在执行程序中生成的数据帧也应该是可能的。然而,我现在可以实现的最快速度是使用Python元组的RDD10M行的〜40秒。

所以问题仍然存在——在pyspark中有没有一种方法可以以分布式方式高效地生成大型Spark数据帧?

共有3个答案

彭建业
2023-03-14

以下是不使用仅基于RDD的行的问题解决方案。我认为这可能是最有效的方法,因为它使用map来计算函数输出,并使用platMap来组合这些输出——这两个操作都是在RDD上执行的,所以一切都应该是分布式的。

import numpy as np
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext

def generate_data(one_integer):
  M = 2 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  return [(one_integer, i, float(np_array[i])) for i in range(M)]

N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)

df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()
皮景龙
2023-03-14

这里有一个解决方案,它不使用RDD或创建行,而只使用dataframe操作:
代码在scala中,但在python中执行同样的操作应该很简单)

val N = 100000

//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)

spark
  .range(N)
  .toDF("seed")
  .withColumn("arr", generate_data_udf($"seed"))
  .select(
    $"seed",
    explode($"arr") as "exp"
  )
  .select(
    $"seed",
    $"exp._1" as "n",
    $"exp._2" as "x"
  )
吕琪
2023-03-14

听起来瓶颈是从RDD转换-

  1. 由于并行创建df很容易,因此使用df编写生成的df,而不是从执行器返回df。到拼花地板,即:
def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"

在生成的拼花文件中读取火花应该是微不足道的。然后瓶颈变成了IO限制,这应该比spark转换元组/行类型更快。

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

N = 10

df = spark.createDataFrame(
    [(i,) for i in range(N)], ["seed"]
)

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    return df.reset_index()

@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
    output = []
    for idx, row in pdf.iterrows():
        output.append(generate_data(row["seed"]))
    return pd.concat(output)


df.groupby("seed").apply(generate_data_udf).show()

较慢的部分将是groupby,您可能能够根据如何将种子批处理到生成数据\u udf,从而加快速度,即:

@udf(returnType=IntegerType())
def batch_seed(seed):
    return seed // 10

df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()
 类似资料:
  • 主要内容:1.UUID,2.数据库自增Id,3.基于数据库集群模式,4.基于数据库的号段模式,5.Redis,6.Snowflake,7.百度(uid-generator),8.Leaf,9.TinyId生成方式: 1.UUID 2.数据库自增ID 3.数据库多主模式 4.号段模式 5.Redis 6.雪花算法(SnowFlake) 7.滴滴出品(TinyID) 8.百度 (Uidgenerator) 9.美团(Leaf) 1.UUID UUID的生成简单到只有一行代码,输出结果 c2b8c2b

  • 有时我们需要能够生成类似MySQL自增ID这样不断增大,同时又不会重复的id。以支持业务中的高并发场景。比较典型的,电商促销时,短时间内会有大量的订单涌入到系统,比如每秒10w+。明星出轨时,会有大量热情的粉丝发微博以表心意,同样会在短时间内产生大量的消息。 在插入数据库之前,我们需要给这些消息、订单先打上一个ID,然后再插入到我们的数据库。对这个id的要求是希望其中能带有一些时间信息,这样即使我

  • 问题内容: 我的文件中有1亿条记录,需要一种有效且最快的方法来从中的文件读取数组数组。 文件看起来像: 我想逐行读取此文件为: 首先阅读: 然后: 依此类推:’ 我如何读取这样的文件,我知道它看起来并不完全像文件,但是我需要以另存为JSON的这种格式读取该文件 问题答案: 您可以使用JSON Processing API(JSR 353) 来以流方式处理数据:

  • 如何自定义生成固定长度的字符串ID,8-12个字符 格式:业务标记_xxxxxxxxxx 如:user_Nuxq23s24dxa1ScSx 要求:1ms生成100W个 或有什么现成的库可以使用,麻烦老大们贴下代码

  • 我正在尝试实现一个涉及几何级数(拆分)的计算。有什么有效的方法吗。数据集有数百万行。我需要“交易数量”栏 turtle=2(用户定义) base_quantity=1(用户自定义) 对于第0行,Traded_quantity应该为零(因为标记为零) 对于第一行,Traded_quantity应该是(1x1)(1x2)=3(标记2将被分成1和1,第一个1将与base_quantity相乘 对于第二行

  • 面试题 一般实现分布式锁都有哪些方式?使用 redis 如何设计分布式锁?使用 zk 来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 面试官心理分析 其实一般问问题,都是这么问的,先问问你 zk,然后其实是要过渡到 zk 相关的一些问题里去,比如分布式锁。因为在分布式系统开发中,分布式锁的使用场景还是很常见的。 面试题剖析 redis 分布式锁 官方叫做 RedLock 算法,是