我正在尝试优化我的火花应用工作。
我试图理解这个问题的要点:如何在唯一键上连接数据帧时避免混乱?
>
我已经确保必须发生加入操作的键分布在同一分区中(使用我的自定义分区程序)。
我也不能做广播加入,因为我的数据可能会根据情况变大。
在上面提到的问题的答案中,重新分区只优化了连接,但我需要的是无需切换即可连接。在分区内的键的帮助下,我对连接操作很满意。
有可能吗?如果不存在类似的功能,我想实现像joinperpartition这样的东西。
只是对之前好答案的补充。如果在整个pyspark应用程序中多次连接一个大的数据帧,那么将该表保存为分桶表,并在pyspark中作为数据帧读回。这样,您可以避免在连接过程中多次洗牌,因为数据已经被预先洗牌和排序。
因此,当Spark在两个大型数据帧上选择排序合并连接时,它会在连接操作中跳过排序和洗牌阶段。(可以边看wholecodegen边在spark UI确认)
df_data_1.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table1')
df_data_2.coalesce(1).write.format('orc').bucketBy(20, 'joincolumn').sortBy("sortcolumn").mode("overwrite").saveAsTable('bucketed_table2')
df_bucket_table_1 = spark.table("bucketed_table1");
df_bucket_table_2 = spark.table("bucketed_table2");
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.join.preferSortMergeJoin","true")
#creating alias for the dataframes:
from pyspark.sql.functions import *
df1 = df_bucket_table_1.alias('df1')
df2 = df_bucket_table_2.alias('df2')
DfInnerJoin = df1.join(df2, df1.joincolumn == df2.joincolumn,'inner').select('df1.*')
上述联接不会有随机排列,但仅当您必须在整个应用程序中多次联接同一数据帧时,这才有用。
重新分区只能优化连接,但我需要的是无需洗牌的连接
这不是真的。重新分区不仅“优化”了连接。Repartition将< code>Partitioner绑定到您的RDD,这是映射端连接的关键组件。
我已经确保必须进行连接操作的键分布在同一个分区中
Spark必须知道这一点。使用适当的 API 构建数据帧,以便它们具有相同的分区程序
,Spark 将负责其余的工作。
我有一个 Spark 流式处理作业,它读取 Cosmos 更改源数据,如下所示,在具有 DBR 8.2 的数据砖集群中运行。 虽然作业正常工作,但偶尔,流会突然停止,并且在log4j输出中出现以下循环。重新启动作业将处理“待办事项”中的所有数据。以前有人经历过这样的事情吗?我不确定是什么原因造成的。有什么想法吗?
我有一张小桌子(2k)的记录和一张大桌子(5 mil)的记录。我需要从小表中获取所有数据,只从大表中获取匹配数据,因此我在下面执行了查询
我有一个Spark Spark集群,其中主节点也是工作节点。我无法从驱动程序代码节点到达主程序,并得到错误: driver-code节点中的SparkContext配置为: 我可以成功地,但不能成功地。意味着机器可以到达,但端口不能到达。 会有什么问题?我已经为主节点和驱动程序代码运行的节点(客户端)禁用了Ubuntu的防火墙。
我在aws s3和emr上使用Spark 2.4进行项目,我有一个左连接,有两个巨大的数据部分。火花执行不稳定,它经常因内存问题而失败。 集群有10台m3.2xlarge类型的机器,每台机器有16个vCore、30 GiB内存、160个SSD GB存储。 我有这样的配置: 左侧连接发生在 150GB 的左侧和大约 30GB 的右侧之间,因此有很多随机播放。我的解决方案是将右侧切得足够小,例如 1G
我有一个c#应用程序,可以创建拼花地板文件并将其上载到远程HDFS。如果我使用scp将文件复制到安装了HDFS客户端的目标计算机上,然后将文件“HDFS放入”HDFS中,spark可以正确读取文件。 如果我使用curl针对webhdf服务从客户端应用程序直接将文件上传到HDFS,则在尝试读取拼花文件时会从Spark收到以下错误: df=sqlContext。阅读parquet(“/tmp/test
我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre