我创建了一个DAG,它从数据库中提取MySQL数据并将其加载到云存储,然后将BigQuery作为json文件加载。
DAG适用于某些表,但不是所有表,因为它不能解码表中的某些字符。这是相当多的数据,所以我不能准确指出错误或无效字符的位置。
我尝试将我的数据库、表和列字符集从utf8更改为utf8mb4。这没有帮助。
我也尝试过调用encoding='utf-8'和'iso-8859-1',但我认为我没有正确调用它们,因为我一直在使用我的连接执行此操作,但仍然会遇到相同的错误。
我正在运行Python 2.7.12和airflow v1.8.0
更新:阅读此文后:https://cwiki.apache.org/confluence/display/AIRFLOW/Common 建议使用定义字符集的连接字符串的陷阱,例如:sql\u alchemy\u conn=mysql://airflow@本地主机:3306/台?字符集=utf8
如何使用云SQL实例实现这一点?
podio_connections = [
'mysql_connection'
]
podio_tables = [
'finance_banking_details',
'finance_goods_invoices',
]
default_args = {
'owner': 'xxxxxx',
'start_date': datetime(2018,1,11),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')
slack_notify = SlackAPIPostOperator(
task_id='slack_notify',
token='xxxxxx',
channel='data-status',
username='airflow',
text='Successfully performed Podio ETL operation',
dag=dag)
for connection in podio_connections:
for table in podio_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id=connection,
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename="%s/%s/%s{}.json"%(connection,table,table),
schema_filename="%s/schemas/%s.json"%(connection,table),
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bg_%s_%s"%(connection,table),
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='podio-reader-storage',
#destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
source_objects=["%s/%s/%s*.json"%(connection,table,table)],
schema_object="%s/schemas/%s.json"%(connection,table),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
slack_notify.set_upstream(load)
[2018-01-12 15:36:10221]{models.py:1417}错误-“utf8”编解码器无法解码第36位的字节0x96:无效的开始字节
Traceback(最近的调用最后):
文件“/usr/local/lib/python2.7/dist packages/afflow/models.py”,第1374行,运行结果=task\u copy。执行(上下文=上下文)
file"/usr/local/lib/python2.7/dist-包/airflow/contrib/操作员/mysql_to_gcs.py",第91行,在执行files_to_upload=自我。_write_local_data_files(游标)
文件“/usr/local/lib/python2.7/dist packages/afflow/contrib/operators/mysql\u to\u gcs.py”,第136行,在json文件中写入本地数据。转储(行目录、tmp文件句柄)
文件“/usr/lib/python2.7/json/init.py”,第189行,位于iterable中块的转储中:
文件"/usr/lib/python2.7/json/encoder.py",第434行,_iterencode_iterencode_dict(o,_current_indent_level)中的块:
文件“/usr/lib/python2.7/json/encoder.py”,第390行,在编码器(值)中
UnicodeDecodeError:“utf8”编解码器无法对位置36中的字节0x96进行解码:起始字节无效
96
是latin1十六进制的en-dash。要么将数据更改为utf8,要么更改到MySQL的连接,表示您正在使用charset latin1。
我有一个Go语言的后端,想知道如何连接到Firebase存储来创建/删除桶和添加/删除文件。我可以使用管理sdk。我在谷歌云存储的Go语言中找到了客户端存储库。我可以用那个吗?Firebase存储和谷歌云存储有什么区别?
我尝试安装storage-resize-images扩展,但收到以下错误: 部署资源时出错:resource_error:/deployments/firebase-ext-storage-resize-images/resources/generateresizedimage{“resourceType”:“gcp-types/cloudfunctions-v1:projects.locatio
在谷歌云存储中,我在名为图像的根桶中有一个名为猫的桶。我正在使用google-api-ruby-Client gem上传文件。我可以将文件上传到根桶“图像”,但上传到“图像/猫”不起作用。我知道谷歌云存储中的存储桶没有斜杠的概念,所以我无法弄清楚如何指定嵌套存储桶的名称。 这给出了nil:NilClass的错误
我打算将包含所有文件和目录的整个目录从一个谷歌云存储桶递归复制到另一个谷歌云存储桶。 从本地到Google云存储桶,以下代码运行良好: 如何在同一个项目中将目录从一个bucket递归复制到另一个bucket?
我在谷歌云存储桶中有文件。 如何将这些文件设置为Cloudflare CDN的原点? (Cloudflare控制面板似乎只想要根域上的网站...?)