当前位置: 首页 > 工具软件 > Php-Resque > 使用案例 >

php-resque操作详解

戚俊健
2023-12-01

php-resque中的角色:

  • Job,需要在后台处理的任务。php-resque将任务抽象成PHP的类。每一个任务都是一个PHP类的对象。
  • Queue, 消息队列,php-resque将任务以List的形式保存。
  • Work, 消息处理者。php-resque 以daemon的形式在后台运行。

php-resque的流程:

Created with Raphaël 2.1.0 Start enqueue job,将Job类名及参数加入队列 php queue.php开启Worker,并指定Worker需要处理的任务。 Worker以daemon运行在后台,实时监控是否有需要处理的任务。 End

enqueue, 将Job插入到队列

当调用Resque::enqueue后,php-resque会执行下列操作

  1. enqueue方法内部调用Resque_Job::create()方法,将接收到的参数传递到create()。
  2. Resque_Job::create()方法检查参数列表中的$args(Resqueue::enqueue()的第三个参数)为空,还是是个数组。
  3. Resque_Job::create()声称job ID(大多数文章称之为"token")。
  4. Resque_Job::create()将job插入制定的队列(Resqueue::enqueue()的第一个参数)。
  5. 如果Resqueue::enqueue()指定了监控job状态(第四个参数),Resque_Job::create()调用Resque_Job_Status::create()方法,job ID作为参数。
  6. Resque_Job_Status::create()在Redis中生成一个key,key name中包含job ID,当前的status为这个key的value和一对timestamp,然后将控制权交还给Resque_Job::create()
  7. Resque_Job::create()将控制权交还给Resque::enqueue(),并返回job ID。
  8. Resque::enqueue() 出发afterEnqueue事件,将控制权交还给应用,返回job ID。

Worker的工作

Worker是怎么处理队列的呢?

  1. Resque_Worker::work(), worker主进程,调用Resque_Worker->reserve()方法来查看是否有待处理任务。
  2. Resque_Worker->reserve() 方法检查是否使用block pop从队列取出任务然后做出相应的处理:
    • Blocking Pop
      • Resque_Worker->reserve()调用 Resque_Job::reserveBlocking() 方法,整个队列和timeout作为参数。
      • Resque_Job::reserveBlocking() 调用 Resque::blpop() (这个方法调用Redis的blpop命令, 队列做好blpop的准备后,最终返回控制权[包括队列和检索的内容]给Resque_Job::reserveBlocking()之前, 处理跟这个lib其他方面关于一致性方面的回应。
      • Resque_Job::reserveBlocking()检查任务内容是否是一个数组(数组中包含任务的类型(class),参数,和job ID), 是否中断处理。
      • Resque_Job::reserveBlocking() 创建一个新的Resque任务对象返回给Resque_Worker->reserve(), 实例话对象时利用队列和内容作为参数来初始化任务。
    • 队列轮询
      • Resque_Worker->reserve() 迭代整个队列,调用Resque_job::reserve()iterates through the queue list, calling Resque_Job::reserve() with the current queue’s name as the sole argument on each pass
      • Resque_Job::reserve() passes the queue name on to Resque::pop(), which in turn calls Redis’ lpop with the same argument, then returns control (and the job content, if any) to Resque_Job::reserve()
      • Resque_Job::reserve() checks whether the job content is an array (as before, it should contain the job’s type [class], payload [args], and ID), and aborts processing if not
      • Resque_Job::reserve() creates a new Resque_Job object in the same manner as above, and also returns this object (along with control of the process) to Resque_Worker->reserve()
  3. In either case, Resque_Worker->reserve() returns the new Resque_Job object, along with control, up to Resque_Worker::work(); if no job is found, it simply returns FALSE
    • No Jobs
      • If blocking mode is not enabled, Resque_Worker::work() sleeps for INTERVAL seconds; it calls usleep() for this, so fractional seconds are supported
    • Job Reserved
      • Resque_Worker::work() triggers a beforeFork event
      • Resque_Worker::work() calls Resque_Worker->workingOn() with the new Resque_Job object as its argument
      • Resque_Worker->workingOn() does some reference assignments to help keep track of the worker/job relationship, then updates the job status from WAITING to RUNNING
      • Resque_Worker->workingOn() stores the new Resque_Job object’s payload in a Redis key associated to the worker itself (this is to prevent the job from being lost indefinitely, but does rely on that PID never being allocated on that host to a different worker process), then returns control to Resque_Worker::work()
      • Resque_Worker::work() forks a child process to run the actual perform()
      • The next steps differ between the worker and the child, now running in separate processes:
        • Worker
          1. The worker waits for the job process to complete
            If the exit status is not 0, the worker calls Resque_Job->fail() with a Resque_Job_DirtyExitException as its only argument.
          2. Resque_Job->fail() triggers an onFailure event
            Resque_Job->fail() updates the job status from RUNNING to FAILED
          3. Resque_Job->fail() calls Resque_Failure::create() with the job payload, the Resque_Job_DirtyExitException, the internal ID of the worker, and the queue name as arguments
          4. Resque_Failure::create() creates a new object of whatever type has been set as the Resque_Failure “backend” handler; by default, this is a Resque_Failure_Redis object, whose constructor simply collects the data passed into Resque_Failure::create() and pushes it into Redis in the failed queue
          5. Resque_Job->fail() increments two failure counters in Redis: one for a total count, and one for the worker
          6. Resque_Job->fail() returns control to the worker (still in Resque_Worker::work()) without a value
      • Job
        1. The job calls Resque_Worker->perform() with the Resque_Job as its only argument.
        2. Resque_Worker->perform() sets up a try…catch block so it can properly handle exceptions by marking jobs as failed (by calling Resque_Job->fail(), as above)
        3. Inside the try…catch, Resque_Worker->perform() triggers an afterFork event
        4. Still inside the try…catch, Resque_Worker->perform() calls Resque_Job->perform() with no arguments
        5. Resque_Job->perform() calls Resque_Job->getInstance() with no arguments
        6. If Resque_Job->getInstance() has already been called, it returns the existing instance; otherwise:
        7. Resque_Job->getInstance() checks that the job’s class (type) exists and has a perform() method; if not, in either case, it throws an exception which will be caught by Resque_Worker->perform()
        8. Resque_Job->getInstance() creates an instance of the job’s class, and initializes it with a reference to the Resque_Job itself, the job’s arguments (which it gets by calling Resque_Job->getArguments(), which in turn simply returns the value of args[0], or an empty array if no arguments were passed), and the queue name
        9. Resque_Job->getInstance() returns control, along with the job class instance, to Resque_Job->perform()
        10. Resque_Job->perform() sets up its own try…catch block to handle Resque_Job_DontPerform exceptions; any other exceptions are passed up to Resque_Worker->perform()
        11. Resque_Job->perform() triggers a beforePerform event
        12. Resque_Job->perform() calls setUp() on the instance, if it exists
        13. Resque_Job->perform() calls perform() on the instance
        14. Resque_Job->perform() calls tearDown() on the instance, if it exists
        15. Resque_Job->perform() triggers an afterPerform event
        16. The try…catch block ends, suppressing Resque_Job_DontPerform exceptions by returning control, and the value FALSE, to Resque_Worker->perform(); any other situation returns the value TRUE along with control, instead
        17. The try…catch block in Resque_Worker->perform() ends
        18. Resque_Worker->perform() updates the job status from RUNNING to COMPLETE, then returns control, with no value, to the worker (again still in Resque_Worker::work())
        19. Resque_Worker::work() calls exit(0) to terminate the job process cleanly
      • SPECIAL CASE: Non-forking OS (Windows)
        1. Same as the job above, except it doesn’t call exit(0) when done
    • Resque_Worker::work() calls Resque_Worker->doneWorking() with no arguments
    • Resque_Worker->doneWorking() increments two processed counters in Redis: one for a total count, and one for the worker
    • Resque_Worker->doneWorking() deletes the Redis key set in Resque_Worker->workingOn(), then returns control, with no value, to Resque_Worker::work()
  4. Resque_Worker::work() returns control to the beginning of the main loop, where it will wait for the next job to become available, and start this process all over again
 类似资料: