当前位置: 首页 > 面试题库 >

如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系

鲁华皓
2023-03-14
问题内容

我有一个Web服务(Python 3.7,Flask 1.0.2),其工作流程包括3个步骤:

  • 步骤1:将远程计算作业提交到商业排队系统(IBM的LSF)
  • 步骤2:每61秒轮询一次远程计算作业状态(由于缓存了作业状态结果,所以每61秒轮询一次)
  • 步骤3:如果步骤2返回了远程计算作业状态==“ DONE”,则进行数据后处理

远程计算作业的长度是任意的(介于秒和天之间),并且每个步骤都取决于上一个步骤的完成:

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    job1 = q.enqueue(step1)
    job2 = q.enqueue(step2, depends_on=job1)
    job3 = q.enqueue(step3, depends_on=job2)

但是,最终所有工作人员(4个工作人员)将进行轮询(4个客户请求中的第2步),而他们应继续执行其他传入请求的第1步以及已成功通过第2步的那些工作流的第3步。

每次民意调查后都应释放工人。他们应该定期返回步骤2进行下一次轮询(每个作业最多每61秒一次),如果远程计算作业轮询未返回“
DONE”,则重新排队该轮询作业。

在这一点上,我开始使用rq-scheduler(因为间隔和重新排队功能听起来很有希望):

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    s = Scheduler('default')

    job1 = q.enqueue(step1, REQ_ID)

    job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
    job2.meta['interval'] = 61
    job2.origin = 'default'
    job2.save()
    s.enqueue_job(job2)

    job3 = q.enqueue(step3, REQ_ID, depends_on=job2)

Job2已正确创建(包括与depends_onjob1
的关系,但s.enqueue_job()立即执行它,而忽略了其与job1的关系。(q.enqueue_job()函数doc-
string实际上说它是立即执行的…) 。

depends_on
当将job2放在调度程序中而不是队列中时,如何创建job1,job2和job3之间的关系?(或者,如何在不立即执行job2并等待job1完成的情况下将job2交给调度程序?)

为了进行测试,步骤如下所示:

def step1():
    print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
    time.sleep(20)
    print(f'    <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
    return True

def step2():
    print(f'    --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'    <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
    return True

def step3():
    print(f'    --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
    return True

我收到的输出是这样的:

worker_1     | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2     | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2     |     --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1     | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...

job2不等待job1完成…

#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1

问题答案:

我对此问题的解决方案rq仅使用了(并且不再使用rq_scheduler):

  1. 升级到最新的python-rq软件包:

    # requirements.txt
    


    rq==1.1.0

  2. 为轮询作业创建专用队列,并相应地使作业入队(具有depends_on关系):

    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue('default')
    p = Queue('pqueue')
    job1 = q.enqueue(step1)
    job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queue
    job3 = q.enqueue(step3, depends_on=job2)
    
  3. 派遣专职工作人员进行轮询队列。它继承自标准Worker类:

    class PWorker(rq.worker.Worker):
    def execute_job(self, *args, **kwargs):
        seconds_between_polls = 65
        job = args[0]
        if 'lastpoll' in job.meta:
            job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()
            if job_timedelta < seconds_between_polls:
                sleep_period = seconds_between_polls - job_timedelta
                time.sleep(sleep_period)
        job.meta['lastpoll'] = datetime.utcnow()
        job.save_meta()
    
        super().execute_job(*args, **kwargs)
    

PWorker execute_job通过向作业的元数据添加时间戳来扩展该方法'lastpoll'

如果有lastpoll时间戳记的轮询作业进入,工作人员将检查此后的时间间隔lastpoll是否大于65秒。如果是,它将当前时间写入
'lastpoll'并执行轮询。如果没有,它将一直hibernate直到65s结束,然后将当前时间写入'lastpoll'并执行轮询。没有lastpoll时间戳的进来的作业是第一次轮询,而工作人员创建时间戳并执行轮询。

  1. 创建一个专用异常(由task函数抛出)和一个异常处理程序来处理它:
        # exceptions.py

    class PACError(Exception):
        pass

    class PACJobRun(PACError):
        pass

    class PACJobExit(PACError):
        pass


        # exception_handlers.py

    def poll_exc_handler(job, exc_type, exc_value, traceback):
        if exc_type is PACJobRun:
            requeue_job(job.get_id(), connection=job.connection)
            return False  # no further exception handling
        else:
            return True  # further exception handling


        # tasks.py

    def step2():
        # GET request to remote compute job portal API for status
        # if response == "RUN":
        raise PACJobRun
        return True

当定制异常处理程序捕获到定制异常(这意味着远程计算作业仍在运行)时,它将在轮询队列中重新排队该作业。

  1. 将定制异常处理程序放入异常处理层次结构中:
        # manage.py

    @cli.command('run_pworker')
    def run_pworker():
        redis_url = app.config['REDIS_URL']
        redis_connection = redis.from_url(redis_url)
        with rq.connections.Connection(redis_connection):
            pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler])
            pworker.work()

该解决方案的优点在于,它仅用几行额外的代码即可扩展python-rq的标准功能。另一方面,额外的队列和工作程序增加了复杂性……



 类似资料:
  • 我需要在Java中创建一个计划作业或Cron作业,以便每天早上6点运行。有没有一个简单的方法。我曾尝试将Timer类与scheduledAtFixedRate方法一起使用,但只需要一个延迟。 有一种方法可以将一天作为第一次,然后是第二次 如何创建启动计划程序的特定日期和时间? 我的类需要运行作业已经扩展了TimerWork。

  • 我只是从GitHub操作开始,并尝试正确配置作业。现在我有了一个建立python并安装依赖项的作业构建,我还有一个需要运行依赖项的behave test作业。当我在一个工作中进行测试和构建时,一切正常。但我想在不同的工作中进行构建和测试。但是当我在这个配置中运行它们时,我得到了错误。我在requirementx中安装了Behave。txt文件。我做错了什么?这种配置通常可能吗?

  • 当我通过命令行运行Beam管道时,使用direct runner或dataflow runner,它工作得很好。。。 例子: 但是当我尝试使用空气流时,我有两个选项,bash操作符或python操作符。 使用bash操作符会成功,但会限制我使用气流功能的能力。 但是我想做的是作为python操作员运行它。所以我将模块导入到airflow dg文件中,然后作为python操作符运行它。 如果我使用本

  • 问题内容: 您是否知道/使用任何针对python的分布式作业队列?您可以共享链接或工具吗 问题答案: 如果您使用的是Django,那么除了进行多处理之外,还有Celery项目。

  • 在job.yaml下面用于创建作业。未创建初始化容器。 [root@app]#kubectl版本客户端版本:version.info{Major:“1”,Minor:“15”,GitVersion:“v1.15.5”,GitCommit:“”,GitTreeState:“Clean”,BuildDate:“2019-10-15T19:16:51Z”,GoVersion:“Go1.12.10”,编译