我正在尝试将两个PySpark数据帧与仅位于其中一个上的列连接起来:
from pyspark.sql.functions import randn, rand
df_1 = sqlContext.range(0, 10)
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+
df_2 = sqlContext.range(11, 20)
+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))
现在我想生成第三个数据帧。我想要像熊猫这样的东西:
df_1.show()
+---+--------------------+--------------------+
| id| uniform| normal|
+---+--------------------+--------------------+
| 0| 0.8122802274304282| 1.2423430583597714|
| 1| 0.8642043127063618| 0.3900018344856156|
| 2| 0.8292577771850476| 1.8077401259195247|
| 3| 0.198558705368724| -0.4270585782850261|
| 4|0.012661361966674889| 0.702634599720141|
| 5| 0.8535692890157796|-0.42355804115129153|
| 6| 0.3723296190171911| 1.3789648582622995|
| 7| 0.9529794127670571| 0.16238718777444605|
| 8| 0.9746632635918108| 0.02448061333761742|
| 9| 0.513622008243935| 0.7626741803250845|
+---+--------------------+--------------------+
df_2.show()
+---+--------------------+--------------------+
| id| uniform| normal_2|
+---+--------------------+--------------------+
| 11| 0.3221262660507942| 1.0269298899109824|
| 12| 0.4030672316912547| 1.285648175568798|
| 13| 0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876| -0.678915153834693|
| 15| 0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17| 0.6411908952297819| 0.9161177183227823|
| 18| 0.5669232696934479| 0.7270125277020573|
| 19| 0.513622008243935| 0.7626741803250845|
+---+--------------------+--------------------+
#do some concatenation here, how?
df_concat.show()
| id| uniform| normal| normal_2 |
+---+--------------------+--------------------+------------+
| 0| 0.8122802274304282| 1.2423430583597714| None |
| 1| 0.8642043127063618| 0.3900018344856156| None |
| 2| 0.8292577771850476| 1.8077401259195247| None |
| 3| 0.198558705368724| -0.4270585782850261| None |
| 4|0.012661361966674889| 0.702634599720141| None |
| 5| 0.8535692890157796|-0.42355804115129153| None |
| 6| 0.3723296190171911| 1.3789648582622995| None |
| 7| 0.9529794127670571| 0.16238718777444605| None |
| 8| 0.9746632635918108| 0.02448061333761742| None |
| 9| 0.513622008243935| 0.7626741803250845| None |
| 11| 0.3221262660507942| None | 0.123 |
| 12| 0.4030672316912547| None |0.12323 |
| 13| 0.9690555459609131| None |0.123 |
| 14|0.011913836266515876| None |0.18923 |
| 15| 0.9359607054250594| None |0.99123 |
| 16| 0.45680471157575453| None |0.123 |
| 17| 0.6411908952297819| None |1.123 |
| 18| 0.5669232696934479| None |0.10023 |
| 19| 0.513622008243935| None |0.916332123 |
+---+--------------------+--------------------+------------+
这可能吗?
unionByName是spark中的内置选项,从spark 2.3.0开始提供。
在spark 3 . 1 . 0版中,有一个allowMissingColumns选项,它的默认值设置为False,以处理缺少的列。即使两个数据帧没有相同的列集,该函数也能工作,将结果数据帧中缺少的列值设置为null。
df_1.unionByName(df_2, allowMissingColumns=True).show()
+---+--------------------+--------------------+--------------------+
| id| uniform| normal| normal_2|
+---+--------------------+--------------------+--------------------+
| 0| 0.8122802274304282| 1.2423430583597714| null|
| 1| 0.8642043127063618| 0.3900018344856156| null|
| 2| 0.8292577771850476| 1.8077401259195247| null|
| 3| 0.198558705368724| -0.4270585782850261| null|
| 4|0.012661361966674889| 0.702634599720141| null|
| 5| 0.8535692890157796|-0.42355804115129153| null|
| 6| 0.3723296190171911| 1.3789648582622995| null|
| 7| 0.9529794127670571| 0.16238718777444605| null|
| 8| 0.9746632635918108| 0.02448061333761742| null|
| 9| 0.513622008243935| 0.7626741803250845| null|
| 11| 0.3221262660507942| null| 1.0269298899109824|
| 12| 0.4030672316912547| null| 1.285648175568798|
| 13| 0.9690555459609131| null|-0.22986601831364423|
| 14|0.011913836266515876| null| -0.678915153834693|
| 15| 0.9359607054250594| null|-0.16557488664743034|
| 16| 0.45680471157575453| null| -0.3885563551710555|
| 17| 0.6411908952297819| null| 0.9161177183227823|
| 18| 0.5669232696934479| null| 0.7270125277020573|
| 19| 0.513622008243935| null| 0.7626741803250845|
+---+--------------------+--------------------+--------------------+
df_concat = df_1.union(df_2)
数据帧可能需要具有相同的列,在这种情况下,您可以使用 withColumn()
创建normal_1
和normal_2
也许您可以尝试创建不存在的列并调用Union
(对于Spark 1.6或更低版本的unionAll
):
from pyspark.sql.functions import lit
cols = ['id', 'uniform', 'normal', 'normal_2']
df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)
result = df_1_new.union(df_2_new)
我正在编写一个用户定义的函数,它将接受数据帧中除第一列之外的所有列,并进行求和(或任何其他操作)。现在,数据帧有时可以有3列或4列或更多。会有所不同。 我知道我可以在UDF中硬编码4个列名作为传递,但在这种情况下它会有所不同,所以我想知道如何完成它? 这里有两个示例,第一个示例中我们有两列要添加,第二个示例中有三列要添加。
我有一个用斯卡拉写的UDF,我希望能够通过Pyspark会话调用它。UDF 采用两个参数:字符串列值和第二个字符串参数。我已经能够成功地调用UDF,如果它只需要一个参数(列值)。如果需要多个参数,我很难调用UDF。以下是到目前为止我在斯卡拉和Pyspark中能够做的事情: Scala UDF: 在Scala中使用它时,我已经能够注册和使用这个UDF: Scala主类: 以上工作成功。下面是Pysp
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
我在Python中有一个有效的lambda函数,它计算dataset1中的每个字符串和dataset2中的字符串之间的最高相似度。在迭代过程中,它将字符串、最佳匹配和相似性以及一些其他信息写入bigquery。没有返回值,因为该函数的目的是向bigquery数据集中插入一行。这个过程需要相当长的时间,这就是为什么我想使用Pyspark和Dataproc来加速这个过程。 将熊猫的数据帧转换成spar
我正在开发一个基于SAAS的站点,我必须将两个DBs中的两个表连接起来,比如说DB1中的table1和DB2中的table2。我必须使用cakephp中的join从表1和表2获取匹配记录,但它会抛出如下错误: 错误:SQLSTATE[42000]:语法错误或访问冲突:1142 SELECT命令拒绝用户'dbname'@'localhost'访问表'table_name'。 有谁能解释一下如何使用c
在计算附加信息时发生内部错误。org.eclipse.jdt.internal.core.SearchableEnvironment.(Lorg/eclipse/jdt/内部/核心/JavaProject; Lorg/eclipse/jdt/核心/WorkingCopyOwner;)