当前位置: 首页 > 知识库问答 >
问题:

使用自定义分割器在Pyspark中分割数据帧

戈博易
2023-03-14

查找有关在 Pyspark 中使用自定义分区程序的一些信息。我有一个数据帧,其中包含各个国家/地区的国家/地区数据。因此,如果我在国家/地区列上重新分区,它会将我的数据分发到 n 个分区中,并将类似的国家/地区数据保留到特定分区。当我看到使用 glom() 方法时,这是创建一个倾斜分区数据。

一些国家,如美国和中国,在特定的数据帧中拥有大量数据。我想重新分区我的数据帧,这样如果国家是美国和中国,那么它将进一步拆分为大约 10 个分区,否则对其他国家(如印度、泰国、澳大利亚等)保持分区相同。我们可以在 Pyspark 代码中扩展分区程序类吗?

我在下面的链接中读到了这一点,我们可以在scalaSpark应用程序中扩展scalapartitioner类,并可以修改partitionerclass以使用自定义逻辑根据需求重新划分数据。就像我所做的一样,请帮助在Pyspark中实现这个解决方案。请参阅下面的链接。按列分区但保持固定分区计数的有效方法是什么?

我使用的是Spark版本2.3.0.2下面是我的Dataframe结构:

datadf= spark.sql("""
    SELECT    
        ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE
    from udb.sometable
""");

输入的数据包含六个国家的数据,如AUSINDTHARUS、CHNUSACHNUSA具有歪斜数据。

因此,如果我对COUNTRY_code执行重新分区,两个分区包含大量数据,而其他分区则很好。我使用glom()方法检查了这一点。

newdf = datadf.repartition("COUNTRY_CODE")

from pyspark.sql import SparkSession
from pyspark.sql import  HiveContext, DataFrameWriter, DataFrame

newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA")

我正在尝试将数据重新分区为仅针对美国和中国国家/地区的另外 3 个分区,并希望将其他国家/地区的数据保留在单个分区中。

This is what I am expecting 
AUS- one partition
IND- one partition
THA- one partition
RUS- one partition
CHN- three partition
USA- three partition

回溯(最近一次调用):文件“”,第1行,在文件“/usr/hdp/current/spark2 client/python/pysark/sql/dataframe.py”,第1182行,在getattr“%s”对象中没有属性“%s”%(self.class.name,name))AttributeError:“dataframe”对象没有属性“repartitionByRange”

共有3个答案

太叔志文
2023-03-14

没有在PySpark上应用用户定义的分区器的直接方法,捷径是创建一个带有UDF的新列,根据业务逻辑为每个记录分配一个分区ID。并使用新列进行分区,这样数据可以均匀分布。

numPartitions= 3
df = df.withColumn("Hash#", udf_country_hash(df['Country']))
df = df.withColumn("Partition#", df["Hash#"] % numPartitions)
df.repartition(numPartitions, "Partition#")

请查看在线版本的代码@https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8963851468310921/2231943684776180/5846184720595634/latest.html

根据我的经验,将DataFrame转换为RDD,然后再转换回DataFrame是一项开销很大的操作,最好避免。

端木权
2023-03-14

结构化API中没有自定义分区程序,因此为了使用自定义分区程序,您需要下拉到RDD API。简单的3个步骤如下:

  1. 将结构化 API 转换为 RDD API
dataRDD = dataDF.rdd
import random

# Extract key from Row object
dataRDD = dataRDD.map(lambda r: (r[0], r))

def partitioner(key):
    if key == "CHN":
        return random.randint(1, 10)
    elif key == "USA":
        return random.randint(11, 20)
    else:
        # distinctCountryDict is a dict mapping distinct countries to distinct integers
        # these distinct integers should not overlap with range(1, 20)
        return distinctCountryDict[key]

numPartitions = 100
dataRDD = dataRDD.partitionBy(numPartitions, partitioner)

# Remove key extracted previously
dataRDD = dataRDD.map(lambda r: r[1])
dataDF = dataRDD.toDF()

这样,您可以在结构化API中获得两个世界的最佳状态,Spark类型和优化的物理计划,以及在低级RDD API中的自定义分区器。只有在绝对必要时,我们才使用低级API。

苏君昊
2023-03-14

尝试这样的事情与哈希:

newDf = oldDf.repartition(N, $"col1", $"coln")

或者对于测距方法:

newDF = oldDF.repartitionByRange(N, $"col1", $"coln")

DF还没有自定义分区。

在你的情况下,我会去散列,但没有保证。

但是,如果您的数据有偏差,您可能需要一些额外的工作,例如用于分区的 2 列是最简单的方法

例如,现有或新列 - 在这种情况下,对给定国家/地区应用分组的列,例如 1 ..N 和两个列上的分区。

对于有许多分组的国家,您可以得到N个合成子分区;对于具有低基数的其他,仅具有1个这样的组号。不太难。两个分区都可以使用多于1个列。

在我看来,分区的统一数字填充需要付出很多努力,而且无法真正实现,但是像这里这样的下一个最佳方法就足够了。相当于一定程度的自定义分区。

否则,在 DF 上使用 .withColumn,您可以使用这些规则模拟自定义分区并填充新的 DF 列,然后应用 repartitionByRange。也不难。

 类似资料:
  • 在前几节讨论的目标检测问题中,我们一直使用方形边界框来标注和预测图像中的目标。本节将探讨语义分割(semantic segmentation)问题,它关注如何将图像分割成属于不同语义类别的区域。值得一提的是,这些语义区域的标注和预测都是像素级的。图9.10展示了语义分割中图像有关狗、猫和背景的标签。可以看到,与目标检测相比,语义分割标注的像素级的边框显然更加精细。 图像分割和实例分割 计算机视觉领

  • 可以通过相对位置把一堆节点放置在一起;最简单的相对摆放方式应该是使用auto-split特性。 两个节点之间的|会把节点分成两部分,然后水平放置在一起 同样,使用||(两个竖线)可以把节点分成两部分,然后把第二个节点放置在第一个节点的下一行。 如果|连接的两个节点中有一个是一个空格组成,那么就会生成一个不可见的节点 如果|连接的两个节点中有一个是超过一个空格组成,那么就会生成一个空节点。 开头和结

  • 本文向大家介绍android中RecyclerView自定义分割线实现,包括了android中RecyclerView自定义分割线实现的使用技巧和注意事项,需要的朋友参考一下 最近一直在看RecyclerView,较之ListView它确实是灵活多变,给予开发者更多自定义的空间,比如:需要添加头部和尾部、item的点击事件、自定义的LayoutManager,还有就是下面要说的自定义的分割线。 1

  • 问题内容: 如何用定界符分割字符串,但是如果转义了,则不能分割?例如,我有一个字符串: 定界符为,转义定界符为。此外,我想忽略转义的反斜杠,因此中的仍然是分隔符。 因此,使用上面的字符串,结果应该是: 问题答案: 使用黑魔法: 匹配一个反斜杠,后跟一个字符,将其跳过并匹配您的定界符。

  • 问题内容: 我有一个字符串如下: 我想提取数字:872226816,因此在这种情况下,我假设在第二个逗号开始读取数据之后,随后的逗号结束数据读取。 输出示例: 问题答案: 用于String.split()的 Javadoc

  • 我读过Kafka文档,但当有人谈论数据和分区时,我仍然感到困惑。在文档中,我看到客户机将向分区发送消息。然后将消息分区复制到副本(跨代理)。和使用者从分区读取数据。 我有一个有两个分区的主题。假设我有一个生产者,它向分区#1发送消息。但我有两个消费者,一个从分区1读取,另一个从分区2读取。这是否意味着我的分区1将有50%的消息,分区2将有50%的消息。或者,当客户端将数据发送到分区#1时,分区#1