我运行一个DAG来提取MySQL数据,并在airflow中加载到BigQuery。我目前得到以下错误:
/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数被传递给MySqlToGoogleCloudStorageOperator。对传递此类参数的支持将在 Airflow 2.0 中删除。无效参数为:
*参数: ()
* * kwargs:{ ' Google _ cloud _ storage _ connn _ id ':' podioGCPConnection ' } category = PendingDeprecationWarning
/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数被传递给GoogleCloudStorageToBigQueryOperator。对传递此类参数的支持将在 Airflow 2.0 中删除。无效参数为:
*参数: ()
* * kwargs:{ ' project _ id ':' podio-data ' } category = PendingDeprecationWarning
dag的代码在这里:
my_connections = [
'podiotestmySQL'
]
my_tables = [
'logistics_orders',
'logistics_waybills',
'logistics_shipping_lines',
'logistics_info_requests'
]
default_args = {
'owner' : 'tia',
'start_date' : datetime(2018, 1, 2),
'depends_on_past' : False,
'retries' : 1,
'retry_delay':timedelta(minutes=5),
}
dag = DAG('etl', default_args=default_args,schedule_interval=timedelta(days=1))
slack_notify = SlackAPIPostOperator (
task_id = 'slack_notfiy',
token = 'xxxxxx',
channel='data-status',
username = 'airflow',
text = 'Successfully performed podio ETL operation',
dag=dag)
for connection in my_connections:
for table in my_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id = connection,
google_cloud_storage_connn_id = 'podioGCPConnection',
sql = "SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename= '%s/%s/%s{}.json'%(connection,table,table),
schema_filename='%s/schemas/%s.json'%(connection,table),
dag=dag)
load =GoogleCloudStorageToBigQueryOperator(
task_id = "load_bg_%s_%s"%(connection,table),
bigquery_conn_id = 'podioGCPConnection',
google_cloud_storage_conn_id = 'podioGCPConnection',
bucket = 'podio-reader-storage',
destination_project_dataset_table = "Podio_Data1.%s/%s"%(connection,table),
source_objects = ["%s/%s/%s*.json"%(connection,table,table)],
schema_object = "%s/schemas/%s.json"%(connection,table),
source_format = 'NEWLINE_DELIMITED_JSON',
create_disposition = 'CREATE_IF_NEEDED',
write_disposition = 'WRITE_TRUNCATE',
project_id = 'podio-data',
dag=dag)
load.set_upstream(extract)
slack_notify.set_upstream(load)
在这里阅读源码:https://github . com/Apache/incubator-air flow/blob/master/air flow/contrib/operators/GCS _ to _ bq . py
请从默认参数中删除这些参数:
google_cloud_storage_connn_id = 'podioGCPConnection'
project_id = 'podio-data',
您需要在气流仪表板中创建连接。
这里变量x(在主)的值是6,这是正确的,但当我写同样的事情在system.out.println== 当我尝试在print语句中放入x时,它工作得很好。但我想知道它给出错误的原因。 错误:
我想把BigQuery表名作为运行时参数传递给我的数据流模板,就像这样简单: 这个问题有一个解决方案:https://issues.apache.org/jira/browse/beam-1440,但到目前为止我还不明白结论。
我发现了几个相关的问题,但谷歌团队对这个问题没有明确的答案: 是一个Cloud DataFlow作业,写入BigQuery,限制为每秒100K行每表(即BigQuery)的配额。BQ流限制)? google数据流写入bigquery表性能 云数据流性能——我们的时代是否值得期待? 编辑:主要动机是找到一种方法来预测各种输入大小的运行时。 我成功地运行了显示
我第一次尝试Kafka,并使用AWS MSK设置Kafka群集。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。 MySQL配置: 注册Mysql连接器后,其状态为“正在运行”,并捕获MySQL表中所做的更改,并以以下格式在消费者控制台中显示结果: 我的第一个问题:在表中“金额”列是“十
我正在将数据流式传输到BigQuery 300行,大约每分钟3次。插入一周前停止工作,出现以下错误: 谷歌。API。请求。执行期间遇到RequestError错误。重试可能会解决问题。[503]错误[消息[执行过程中遇到错误。重试可能会解决问题。]位置[-]原因[备份错误]域[全局]] 我的服务在两个月内运行良好,自从我们开始使用它以来,没有任何代码更改。 这是调用BigQuery的代码: 还有其