我正在使用Azure Database ricks Autoloader将文件从ADLS Gen 2处理到Delta Lake。我以以下方式编写了我的Foreach批处理函数(pyspark):
#Rename incoming dataframe columns
schemadf = transformschema.renameColumns(microBatchDF,fileconfig)
# Apply simple tranformation on schemadf using createOrReplaceTempView
modifieddf = applytransform(schemadf,targettable,targetdatabase,fileconfig)
# Add audit cols to modifieddf
transformdf = auditlineage.addauditcols(modifieddf,fileconfig,appid
重命名列的代码
def renameColumns(dataframe, schema):
str = schema['Schema']
splitstr = list(str.split(','))
for c,n in zip(dataframe.columns,splitstr):
dataframe=dataframe.withColumnRenamed(c,n)
return dataframe
应用程序转换代码
def applytransform(inputdf,targettable,targetdatabase,fileconfig):
logger.info('Inside applytransform for Database/Table {}.{}',targetdatabase,targettable)
inputdf.createOrReplaceTempView("src_to_transform")
lspark = inputdf._jdf.sparkSession()
if 'TransformQuery' in fileconfig and fileconfig['TransformQuery'] is not None:
vsqlscript = fileconfig['TransformQuery']
df = lspark.sql(vsqlscript)
logger.info("Applied Tranform")
return df
else:
logger.info("Passed DF")
return inputdf
addauditcols的代码
def addauditcols(inputdf,fileconfig,app_id):
now = datetime.datetime.now()
print(type(inputdf))
createdby = 'DatabricksJob-'+app_id
datasource = fileconfig['Datasource']
recordactiveind = 'Y'
df = inputdf.withColumn('datasource',lit(datasource)).\
withColumn('createdtimestamp',lit(now)).\
withColumn('lastmodifiedtimestamp',lit(now)).\
withColumn('createduserid',lit(createdby)).\
withColumn('lastmodifieduserid',lit(createdby)).\
withColumn('filepath',input_file_name()).\
withColumn('recordactiveind',lit(recordactiveind))
return df
applytransform模块返回一个py4j.java_gateway。JavaObject,而不是常规的py spark . SQL . data frame . data frame,因此我无法对addauditcols模块内的modifieddf执行简单的withColumn()类型转换
我得到的错误如下:
2021-12-05 21:09:57.274 | INFO | __main__:main:73 - modifieddf Type:::
<class 'py4j.java_gateway.JavaObject'>
2021-12-05 21:09:57.421 | ERROR | __main__:main:91 - Operating Failed for md_customer, with Exception Column is not iterable
Traceback (most recent call last):
File "c:/Users/asdsad/integration-app\load2cleansed.py", line 99, in <module>
main()
└ <function main at 0x000001C570C263A0>
> File "c:/Users/asdsad/integration-app\load2cleansed.py", line 76, in main
transformdf = auditlineage.addauditcols(modifieddf,fileconfig,appid)
│ │ │ │ └ 'local-1638760184357'
│ │ │ └ {'Schema': 'customernumber,customername,addrln1,city,statename,statecode,postalcode,countrycode,activeflag,sourcelastmodified...
│ │ └ JavaObject id=o48
│ └ <function addauditcols at 0x000001C570B55CA0>
└ <module 'core.wrapper.auditlineage' from 'c:\\Users\\asdsad\integration-app\\core\\wrapper\\a...
File "c:\Users\1232\Documents\Code\ntegration-app\core\wrapper\auditlineage.py", line 30, in addauditcols
df = inputdf.withColumn('datasource',lit(datasource)).\
│ │ └ 'DUMMY-CUST'
│ └ <function lit at 0x000001C570B79F70>
└ JavaObject id=o48
File "C:\Users\testapp\lib\site-packages\py4j\java_gateway.py", line 1296, in __call__
args_command, temp_args = self._build_args(*args)
│ │ └ ('datasource', Column<'DUMMY-CUST'>)
│ └ <function JavaMember._build_args at 0x000001C5704B9280>
└ <py4j.java_gateway.JavaMember object at 0x000001C570C5B910>
File "C:\Users\testapp\lib\site-packages\py4j\java_gateway.py", line 1260, in _build_args
(new_args, temp_args) = self._get_args(args)
│ │ └ ('datasource', Column<'DUMMY-CUST'>)
│ └ <function JavaMember._get_args at 0x000001C5704B91F0>
└ <py4j.java_gateway.JavaMember object at 0x000001C570C5B910>
File "C:\Users\testapp\lib\site-packages\py4j\java_gateway.py", line 1247, in _get_args
temp_arg = converter.convert(arg, self.gateway_client)
│ │ │ │ └ <py4j.java_gateway.GatewayClient object at 0x000001C5705C89A0>
│ │ │ └ <py4j.java_gateway.JavaMember object at 0x000001C570C5B910>
│ │ └ Column<'DUMMY-CUST'>
│ └ <function ListConverter.convert at 0x000001C5704CE5E0>
└ <py4j.java_collections.ListConverter object at 0x000001C5704C3FD0>
File "C:\Users\testapp\lib\site-packages\py4j\java_collections.py", line 510, in convert
for element in object:
└ Column<'DUMMY-CUST'>
File "C:\Users\testapp\lib\site-packages\pyspark\sql\column.py", line 470, in __iter__
raise TypeError("Column is not iterable")
TypeError: Column is not iterable
感谢任何帮助
谢了。我没有使用createOrReplaceTempView,而是更新为createOrReplaceGlobalTempView,并将vsqlscript更新为从select * from global _ temp . src _ to _ transform中读取在applytransform中做了以下更改:
def applytransform(inputdf,targettable,targetdatabase,fileconfig):
logger.info('Inside applytransform for Database/Table {}.{}',targetdatabase,targettable)
**## Store in global Temp databricks database**
inputdf.createOrReplaceGlobalTempView("src_to_transform")
lspark = inputdf._jdf.sparkSession()
if 'TransformQuery' in fileconfig and fileconfig['TransformQuery'] is not None:
vsqlscript = fileconfig['TransformQuery']
#df = lspark.sql(vsqlscript)
df = spark.sql(vsqlscript)
logger.info("Applied Tranform")
df.show()
return df
else:
logger.info("Passed DF")
return inputdf
请删除< code>lspark = inputdf。_jdf.sparkSession()
它用于将sql upsert命令转换为delta,就像merge不返回数据帧一样。
请只使用spark.sql(vsqlscript)
如果没有帮助,请分享你的vsqlscript代码。
给定
我有一列和另一列。列['currency']包含以下格式的对象:美元、欧元、瑞典克朗、澳元等。 我想通过转换所有其他货币,将所有['amount']值转换为美元,并在新列中打印新的转换,但我在这一点上卡住了: 我应该把每一个符号和每一个符号联系起来吗?
问题内容: 我目前正在尝试将某些行PIVOT转换为列。问题是我并不总是知道有多少行可用。让我们看一个例子: 结果” 静态枢纽的SQL FIDDLE范例。我正在尝试实现动态枢轴-http://sqlfiddle.com/#!3/ 2be82/1 因此,这是我的难题:在这种情况下,我希望能够基于GroupID透视未知数量的列。 我希望能够将GroupID 3中的所有行都列为PIVOT。我需要在不知道g
问题内容: 有一个对象,是否有比列表理解更快,更好或更正确的方法来获取迭代器返回的对象的列表? 问题答案:
本文向大家介绍Python reversed反转序列并生成可迭代对象,包括了Python reversed反转序列并生成可迭代对象的使用技巧和注意事项,需要的朋友参考一下 英文文档: reversed(seq) Return a reverse iterator. seq must be an object which has a __reversed__() method or supports
假设您有一个类似数组的Javascript ES6 Iterable,您事先知道它的长度是有限的,将其转换为Javascript数组的最佳方法是什么? 这样做的原因是许多js库(如underscore和lodash)仅支持数组,因此如果您希望在可迭代上使用它们的任何函数,则必须首先将其转换为Array。 在python中,您可以只使用list()函数。ES6中是否有同等标准?