Spark Dataframes有一个方法WithColumn
每次添加一个新列。若要添加多列,需要一个With Column
的链。这是做这件事的最佳实践吗?
df2 = df.rdd.mapPartitions(add_new_cols).toDF()
def add_new_cols(rows):
db = open_db_connection()
new_rows = []
new_row_1 = Row("existing_col_1", "existing_col_2", "new_col_1", "new_col_2")
i = 0
for each_row in rows:
i += 1
# conditionally omit rows
if i % 3 == 0:
continue
db_result = db.get_some_result(each_row.existing_col_2)
new_col_1 = ''.join([db_result, "_NEW"])
new_col_2 = db_result
new_f_row = new_row_1(each_row.existing_col_1, each_row.existing_col_2, new_col_1, new_col_2)
new_rows.append(new_f_row)
db.close()
return iter(new_rows)
第二部分,在WithColumn
和Filter
的链中使用MapPartitions
有哪些权衡?
我在某个地方读到过,在Spark DFs中使用可用的方法总是比推出自己的实现要好。如果我的论点错了,请让我知道。谢谢!欢迎所有的想法。
这种方法是否有任何不可预见的问题?
多重。最严重的影响是:
dataframe
代码相比,内存占用要高几倍,垃圾收集开销也很大。TODF
调用的模式推断成本(如果提供了正确的模式,可以避免)以及可能重新执行前面所有步骤的成本。当您开始使用Executor端Python逻辑时,您已经与Spark SQL不同了。无论您使用UDF
、RDD
还是新添加的向量化UDF都无关紧要。最后,您应该根据代码的整体结构做出决定--如果它主要是直接在数据上执行的Python逻辑,那么使用rdd
或完全跳过Spark可能会更好。
如果它只是逻辑的一小部分,并且不会导致严重的性能问题,不要为此担心。
那么,如何使用PySpark向现有的DataFrame添加一个新列(基于Python vector)呢?
几个小时以来,我一直试图找到一种方法来复制n次列,并为它们添加一个,但运气不佳。请帮帮忙! 当前数据帧: 输出:
我想添加一个新列,并将其设置为带有的MultIndex,但我收到一个错误。 我的代码: 错误: 文件"/库/框架/Python.framework/Versions/3.7/lib/python3.7/site-packages/pandas/core/indexes/base.py",第3078行,get_loc返回自己。_engine.get_loc(键)文件"熊猫/_libs/index.p
问题内容: 我的问题是,将一行添加到DataFrame会更改列的dtype: 我将dtype特别指定为int32(即’i4’),可以看出: 但是,添加一行会将dtype更改为float64: 我尝试指定添加的值的dtype: 但这也不起作用。有没有不使用返回新对象的函数的解决方案? 问题答案: 放大分为两个阶段,首先将a放在该列中,然后将其分配,这就是为什么要强制它的原因。我将其放在错误/增强列表
本文向大家介绍向Pandas中的现有DataFrame添加新列,包括了向Pandas中的现有DataFrame添加新列的使用技巧和注意事项,需要的朋友参考一下 Pandas 数据框是一种二维数据结构,即,数据以表格的形式在行和列中对齐。可以使用python dict,list和series等创建它。在本文中,我们将看到如何在现有数据框中添加新列。因此,首先让我们使用pandas系列创建一个数据框。