当前位置: 首页 > 知识库问答 >
问题:

气流GCS到BQuery操作员无法识别云存储桶URI

孙永思
2023-03-14

我有下面的代码,它基本上将表从MySql数据库读入Google云存储,然后读入Google大查询。

我已将连接详细信息以及MySQL和GCP的服务帐户都包含在该应用程序的admin选项卡中。

extract = MySqlToGoogleCloudStorageOperator(
    task_id='extract_actors',
    mysql_conn_id='factset_test',
    google_cloud_storage_conn_id='gcp_test',
    sql='SELECT * FROM mysql.time_zone',
    bucket='airflow_1',
    filename='actors/actors{}.json',
    schema_filename='schemas/actors.json',
    dag=dag)

load = GoogleCloudStorageToBigQueryOperator(
            task_id="load_bq_time_zone",
            bigquery_conn_id='gcp_test',
            google_cloud_storage_conn_id='gcp_test',
            bucket='airflow_1',
            destination_project_dataset_table="airflow.mysql_time_zone",
            source_objects='actors/actors0.json',
            schema_object='schemas/actors.json',
            source_format='NEWLINE_DELIMITED_JSON',
            create_disposition='CREATE_IF_NEEDED',
            write_disposition='WRITE_TRUNCATE',
            dag=dag)

我看到MySql数据库中的表time_zone被复制到云存储桶airflow_1。但是当气流试图将数据从云存储复制到BigQuery时,它会抱怨无法找到云存储桶。以下是日志详细信息:

[2018-03-12 02:16:59,031] {models.py:167} INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:16:59,601] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run mysql_to_gcs load_bq_time_zone 2018-03-12T02:16:48.974591 --job_id 465 --raw -sd DAGS_FOLDER/mysql_to_gcs.py']
[2018-03-12 02:16:59,822] {base_task_runner.py:95} INFO - Subtask: /usr/local/lib/python2.7/dist-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2018-03-12 02:16:59,822] {base_task_runner.py:95} INFO - Subtask:   """)
[2018-03-12 02:16:59,858] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,857] {__init__.py:57} INFO - Using executor SequentialExecutor
[2018-03-12 02:16:59,949] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,949] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-03-12 02:16:59,973] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:16:59,973] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-03-12 02:17:00,157] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,157] {models.py:167} INFO - Filling up the DagBag from /airflow/dags/mysql_to_gcs.py
[2018-03-12 02:17:00,712] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,712] {models.py:1126} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,720] {models.py:1126} INFO - Dependencies all met for <TaskInstance: mysql_to_gcs.load_bq_time_zone 2018-03-12 02:16:48.974591 [queued]>
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,721] {models.py:1318} INFO - 
[2018-03-12 02:17:00,721] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 2
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2018-03-12 02:17:00,722] {base_task_runner.py:95} INFO - Subtask: 
[2018-03-12 02:17:00,738] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,737] {models.py:1342} INFO - Executing <Task(GoogleCloudStorageToBigQueryOperator): load_bq_time_zone> on 2018-03-12 02:16:48.974591
[2018-03-12 02:17:00,792] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,792] {gcp_api_base_hook.py:81} INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {__init__.py:44} WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:00,795] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask:     from . import file_cache
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask:     'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {discovery.py:274} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2018-03-12 02:17:00,796] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,795] {transport.py:157} INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:00,838] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,838] {client.py:777} INFO - Refreshing access_token
[2018-03-12 02:17:00,910] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:00,909] {discovery.py:868} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/airflow_1/o/%2Fschemas%2Factors.json?alt=media
[2018-03-12 02:17:01,209] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,208] {gcp_api_base_hook.py:81} INFO - Getting connection using a JSON key file.
[2018-03-12 02:17:01,210] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {__init__.py:44} WARNING - file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,210] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 41, in autodetect
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:     from . import file_cache
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask:     'file_cache is unavailable when using oauth2client >= 4.0.0')
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: ImportError: file_cache is unavailable when using oauth2client >= 4.0.0
[2018-03-12 02:17:01,211] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {discovery.py:274} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/bigquery/v2/rest
[2018-03-12 02:17:01,212] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,210] {transport.py:157} INFO - Attempting refresh to obtain initial access_token
[2018-03-12 02:17:01,248] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,248] {client.py:777} INFO - Refreshing access_token
[2018-03-12 02:17:01,325] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,325] {bigquery_hook.py:961} INFO - project not included in destination_project_dataset_table: airflow.mysql_time_zone; using project "bigquery-1210"
[2018-03-12 02:17:01,339] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,339] {discovery.py:868} INFO - URL being requested: POST https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs?alt=json
[2018-03-12 02:17:01,888] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:01,887] {discovery.py:868} INFO - URL being requested: GET https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L?alt=json
[2018-03-12 02:17:02,064] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,062] {models.py:1417} ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/gcs_to_bq.py", line 153, in execute
[2018-03-12 02:17:02,065] {base_task_runner.py:95} INFO - Subtask:     schema_update_options=self.schema_update_options)
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 476, in run_load
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:     return self.run_with_configuration(configuration)
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:   File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/hooks/bigquery_hook.py", line 513, in run_with_configuration
[2018-03-12 02:17:02,066] {base_task_runner.py:95} INFO - Subtask:     job['status']['errorResult'], job
[2018-03-12 02:17:02,069] {base_task_runner.py:95} INFO - Subtask: Exception: BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}
[2018-03-12 02:17:02,069] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,068] {models.py:1433} INFO - Marking task as UP_FOR_RETRY
[2018-03-12 02:17:02,087] {base_task_runner.py:95} INFO - Subtask: [2018-03-12 02:17:02,085] {models.py:1462} ERROR - BigQuery job failed. Final error was: {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}. The job was: {u'status': {u'state': u'DONE', u'errors': [{u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}], u'errorResult': {u'reason': u'notFound', u'message': u'Not found: URI gs://airflow_1/t'}}, u'kind': u'bigquery#job', u'statistics': {u'endTime': u'1520821021658', u'creationTime': u'1520821021410', u'startTime': u'1520821021658'}, u'jobReference': {u'projectId': u'bigquery-1210', u'jobId': u'job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}, u'etag': u'"OhENgf8ForUUnKbYWWdbr5aJHYs/FMMuqgSZof0jtlMAniagOYxmgWA"', u'user_email': u'@bigquery-1210.iam.gserviceaccount.com', u'configuration': {u'load': {u'sourceFormat': u'NEWLINE_DELIMITED_JSON', u'destinationTable': {u'projectId': u'bigquery-1210', u'tableId': u'mysql_time_zone', u'datasetId': u'airflow'}, u'writeDisposition': u'WRITE_TRUNCATE', u'sourceUris': [u'gs://airflow_1/p', u'gs://airflow_1/u', u'gs://airflow_1/s', u'gs://airflow_1/h', u'gs://airflow_1/k', u'gs://airflow_1/a', u'gs://airflow_1/r', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1//', u'gs://airflow_1/a', u'gs://airflow_1/c', u'gs://airflow_1/t', u'gs://airflow_1/o', u'gs://airflow_1/r', u'gs://airflow_1/s', u'gs://airflow_1/0', u'gs://airflow_1/.', u'gs://airflow_1/j', u'gs://airflow_1/s', u'gs://airflow_1/o', u'gs://airflow_1/n'], u'createDisposition': u'CREATE_IF_NEEDED', u'schema': {u'fields': [{u'type': u'INTEGER', u'name': u'Time_zone_id', u'mode': u'REQUIRED'}, {u'type': u'STRING', u'name': u'Use_leap_seconds', u'mode': u'REQUIRED'}]}}}, u'id': u'bigquery-1210:job_8m-z9qzE1K6KiY5lTGDreQe9f_0L', u'selfLink': u'https://www.googleapis.com/bigquery/v2/projects/bigquery-1210/jobs/job_8m-z9qzE1K6KiY5lTGDreQe9f_0L'}

共有1个答案

谢承颜
2023-03-14

问题是source\u objects='actors/actors0.json'

需要是source\u objects=['actors/actors0.json']

从留档:

:param source_objects: List of Google cloud storage URIs to load from. (templated)
    If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
:type object: list

下面是代码:

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
                   for source_object in self.source_objects]

因此,它会遍历它。

 类似资料:
  • ElasticsearchJava客户端SearchACK无法解析聚合结果。我在网上看到一些文章,建议添加以键为前缀的聚合类型。我添加了我认为适用于我的用例的内容,例如“sterms#和sum#”,但我无法确定哪种类型适用于主过滤器(在我的情况下键:“匹配”)。我希望桶对象被填充,但尽管elasticsearch的响应包含聚合,但它目前仍作为空数组返回。 注:这是为了能够进行单元测试。 Json响

  • 我试图使用Java客户机将数据从S3传输到GCS,但我遇到了这个错误。 由于权限不足,无法获取Google云存储(GCS)存储桶的位置。请验证是否已授予必要的权限。 我正在使用具有项目所有者角色的服务号,它应该授予对所有项目资源的无限制访问权限。

  • 我正在评估我当前的气流部署从Celery executor到Kubernetes(K8s)executor的迁移,以利用Pods提供的资源动态分配和任务隔离。 我很清楚,我们可以使用本机的KubernetesPodOperator通过K8s Executor在K8s集群上运行任务。但是,我找不到关于K8s executor与其他操作符(如bash和Athena)之间兼容性的信息。 这里的问题是,是

  • 我试图创建和运行一个豆荚使用气流kubernetes豆荚操作员。下面的命令被尝试并确认有效,我正试图在本地使用kubernetes pod操作符复制相同的命令 有没有办法将serviceaccount标志传递给airflow kubernetes操作员? 谢了!

  • 我尝试安装storage-resize-images扩展,但收到以下错误: 部署资源时出错:resource_error:/deployments/firebase-ext-storage-resize-images/resources/generateresizedimage{“resourceType”:“gcp-types/cloudfunctions-v1:projects.locatio

  • 参考项目:在Apache Beam中观察与文件夹匹配的新文件 您可以将其用于简单的用例吗?我的用例是我让用户将数据上传到Cloud Storage- 我想做的是保持管道在流式模式下运行,一旦文件上传到Cloud Storage,它就会通过管道进行处理。watchfornewfile可以做到这一点吗? 我编写的代码如下: 没有任何内容被转发到Big Query,但管道显示它正在流式传输。