当前位置: 首页 > 知识库问答 >
问题:

Python-如何-大查询异步任务

蔡理
2023-03-14

这可能是个假问题,但我似乎无法异步运行python google clood bigquery。

我的目标是并发运行多个查询,并等待所有查询在asyncio.wait()query gatherer中完成。我正在使用asyncio.create_tast()启动查询。问题是,每个查询在开始之前都要等待前一个查询完成。

这里是我的查询功能(相当简单):

async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:
  job = self.api.query(query, **kwargs)
  return job.result()

因为我不能等待job.result()我应该等待别的东西吗?

共有3个答案

夔波
2023-03-14

事实上,由于使用了asyncio.create\u task()函数,我找到了一种将查询封装在async调用中的方法,非常容易。我只需要将job.result()打包到一个协同程序中;下面是实现。它现在确实是异步运行的。

class BQApi(object):                                                                                                 
    def __init__(self):                                                                                              
        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               

    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
        job = self.api.query(query, **kwargs)                                                                        
        task = asyncio.create_task(self.coroutine_job(job))                                                          
        return await task                                                                                            

    @staticmethod                                                                                                    
    async def coroutine_job(job):                                                                                    
        return job.result()                                                                                          
丌官寒
2023-03-14

只是为了分享一个不同的解决方案:

import numpy as np
from time import sleep


query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'


def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True


jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

我宁愿使用bigquery作业本身的内置异步功能,而不是在完成时使用。这也使得我能够将数据管道分解为单独的云函数,而不必在整个管道期间保持主ThreadPoolExecutor活动。顺便说一句,这就是我研究这个问题的原因:我的管道比云函数的最大超时时间9分钟(甚至是云运行的15分钟)还要长。

缺点是我需要跟踪各种函数中的所有作业id,但在配置管道时,通过指定输入和输出,使其形成有向无环图,这相对容易解决。

越伟泽
2023-03-14

如果您在coroutine中工作,并且希望在不阻塞事件\u循环的情况下运行不同的查询,那么您可以使用run\u in\u executor函数,该函数基本上在后台线程中运行查询,而不阻塞循环。这里有一个很好的例子说明如何使用它。

确保这正是你所需要的;为在Python API中运行查询而创建的作业已经是异步的,它们仅在调用job.result()时才会阻塞。这意味着您不需要使用asyncio,除非您在协同程序中。

下面是一个在作业完成后立即检索结果的快速示例:

from concurrent.futures import ThreadPoolExecutor, as_completed
import google.cloud.bigquery as bq


client = bq.Client.from_service_account_json('path/to/key.json')
query1 = 'SELECT 1'
query2 = 'SELECT 2'

threads = []
results = []

executor = ThreadPoolExecutor(5)

for job in [client.query(query1), client.query(query2)]:
    threads.append(executor.submit(job.result))

# Here you can run any code you like. The interpreter is free

for future in as_completed(threads):
    results.append(list(future.result()))

结果将是:

[[Row((2,), {'f0_': 0})], [Row((1,), {'f0_': 0})]]

 类似资料:
  • 异步查询能够在数据库执行查询时避免线程阻塞。这对于避免冻结胖客户端应用程序(thick-client application)的 UI 来说很有用。异步操作还能够提升 Web 应用程序的生产能力,在数据库执行查询时线程可以被空出来为其他请求服务。更多信息请查阅 C#异步编程。 警告 EF Core 不支持在同一个上下文实例上运行多并行操作。应该总是在下一个操作开始之前等待上一个操作的完成。这通常是

  • 问: 如何异步处理繁重的业务,避免主业务被长时间阻塞。例如我要给1000用户发送邮件,这个过程很慢,可能要阻塞数秒,这个过程中因为主流程被阻塞,会影响后续的请求,如何将这样的繁重任务交给其它进程异步处理。 答: 可以在本机或者其它服务器甚至服务器集群预先建立一些任务进程处理繁重的业务,任务进程数可以开多一些,例如cpu的10倍,然后调用方利用AsyncTcpConnection将数据异步发送给这些

  • 问: 如何异步处理繁重的业务,避免主业务被长时间阻塞。例如我要给1000用户发送邮件,这个过程很慢,可能要阻塞数秒,这个过程中因为主流程被阻塞,会影响后续的请求,如何将这样的繁重任务交给其它进程异步处理。 答: 可以在本机或者其它服务器甚至服务器集群预先建立一些任务进程处理繁重的业务,任务进程数可以开多一些,例如cpu的10倍,然后调用方利用AsyncTcpConnection将数据异步发送给这些

  • Spring Data repository中的查询可以异步执行,参考Spring执行异步方法。这意味着方法可以在被调用时立刻返回,而真正的查询执行会被当做一个任务提交到Spring的TaskExecutor。 @Async Future<User> findByFirstname(String firstname); //1 @Async CompletableFu

  • 问题内容: 我正在尝试向表中插入一些行…我正在使用 postgressql-7.2.jar。 我得到以下异常 org.postgresql.util.PSQLException:查询未返回任何结果。 在org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:255) 我已经用Googl

  • 主要内容:本节引言:,1.相关概念,2.AsyncTask全解析:,3.AsyncTask使用示例:,本节小结:本节引言: 本节给大家带来的是Android给我们提供的一个轻量级的用于处理异步任务的类:AsyncTask,我们一般是 继承AsyncTask,然后在类中实现异步操作,然后将异步执行的进度,反馈给UI主线程~ 好吧,可能有些概念大家不懂,觉得还是有必要讲解下多线程的概念,那就先解释下一些概念性的东西吧! 1.相关概念 1)什么是多线程: 答:先要了解这几个名称:应用程序,进程,线程,