当前位置: 首页 > 知识库问答 >
问题:

使用气流将 mysql 数据加载到 bigquery 的 dag 的“传递了无效参数”错误

栾弘新
2023-03-14

我运行一个DAG来提取MySQL数据,并在airflow中加载到BigQuery。我目前得到以下错误:

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数被传递给MySqlToGoogleCloudStorageOperator。对传递此类参数的支持将在 Airflow 2.0 中删除。无效参数为:

*参数: ()

* * kwargs:{ ' Google _ cloud _ storage _ connn _ id ':' podioGCPConnection ' } category = PendingDeprecationWarning

/usr/local/lib/python2.7/dist-packages/airflow/models.py:1927:PendingDeprecationWarning:无效参数被传递给GoogleCloudStorageToBigQueryOperator。对传递此类参数的支持将在 Airflow 2.0 中删除。无效参数为:

*参数: ()

* * kwargs:{ ' project _ id ':' podio-data ' } category = PendingDeprecationWarning

dag的代码在这里:

my_connections = [
    'podiotestmySQL'
]

my_tables = [
    'logistics_orders',
    'logistics_waybills',
    'logistics_shipping_lines',
    'logistics_info_requests'
]

default_args = {
    'owner' : 'tia',
    'start_date' : datetime(2018, 1, 2),
    'depends_on_past' : False,
    'retries' : 1,
    'retry_delay':timedelta(minutes=5),
}

dag = DAG('etl', default_args=default_args,schedule_interval=timedelta(days=1))

slack_notify = SlackAPIPostOperator (
    task_id = 'slack_notfiy',
    token = 'xxxxxx',
    channel='data-status',
    username = 'airflow',
    text = 'Successfully performed podio ETL operation',
    dag=dag)

for connection in my_connections:
    for table in my_tables: 
        extract = MySqlToGoogleCloudStorageOperator(
           task_id="extract_mysql_%s_%s"%(connection,table),
           mysql_conn_id = connection,
           google_cloud_storage_connn_id = 'podioGCPConnection',
           sql = "SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
           bucket='podio-reader-storage',
           filename= '%s/%s/%s{}.json'%(connection,table,table),
           schema_filename='%s/schemas/%s.json'%(connection,table),
           dag=dag)

       load =GoogleCloudStorageToBigQueryOperator(
           task_id = "load_bg_%s_%s"%(connection,table),
           bigquery_conn_id = 'podioGCPConnection',
           google_cloud_storage_conn_id = 'podioGCPConnection',
           bucket = 'podio-reader-storage',
           destination_project_dataset_table = "Podio_Data1.%s/%s"%(connection,table),
           source_objects = ["%s/%s/%s*.json"%(connection,table,table)],
           schema_object = "%s/schemas/%s.json"%(connection,table),
           source_format = 'NEWLINE_DELIMITED_JSON',
           create_disposition = 'CREATE_IF_NEEDED',
           write_disposition = 'WRITE_TRUNCATE',
           project_id = 'podio-data',
           dag=dag)

      load.set_upstream(extract)
      slack_notify.set_upstream(load)

共有1个答案

孔礼骞
2023-03-14

在这里阅读源码:https://github . com/Apache/incubator-air flow/blob/master/air flow/contrib/operators/GCS _ to _ bq . py

请从默认参数中删除这些参数:

google_cloud_storage_connn_id = 'podioGCPConnection'
project_id = 'podio-data',

您需要在气流仪表板中创建连接。

 类似资料:
  • 这里变量x(在主)的值是6,这是正确的,但当我写同样的事情在system.out.println== 当我尝试在print语句中放入x时,它工作得很好。但我想知道它给出错误的原因。 错误:

  • 我想把BigQuery表名作为运行时参数传递给我的数据流模板,就像这样简单: 这个问题有一个解决方案:https://issues.apache.org/jira/browse/beam-1440,但到目前为止我还不明白结论。

  • 我发现了几个相关的问题,但谷歌团队对这个问题没有明确的答案: 是一个Cloud DataFlow作业,写入BigQuery,限制为每秒100K行每表(即BigQuery)的配额。BQ流限制)? google数据流写入bigquery表性能 云数据流性能——我们的时代是否值得期待? 编辑:主要动机是找到一种方法来预测各种输入大小的运行时。 我成功地运行了显示

  • 我第一次尝试Kafka,并使用AWS MSK设置Kafka群集。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。 MySQL配置: 注册Mysql连接器后,其状态为“正在运行”,并捕获MySQL表中所做的更改,并以以下格式在消费者控制台中显示结果: 我的第一个问题:在表中“金额”列是“十

  • 我正在将数据流式传输到BigQuery 300行,大约每分钟3次。插入一周前停止工作,出现以下错误: 谷歌。API。请求。执行期间遇到RequestError错误。重试可能会解决问题。[503]错误[消息[执行过程中遇到错误。重试可能会解决问题。]位置[-]原因[备份错误]域[全局]] 我的服务在两个月内运行良好,自从我们开始使用它以来,没有任何代码更改。 这是调用BigQuery的代码: 还有其