当前位置: 首页 > 工具软件 > Php-Resque > 使用案例 >

php resque队列常驻,php-resque 队列简单使用

章景同
2023-12-01

一、安装 php-resque

进入项目根目录,composer 安装 php-resque

composer require chrisboulton/php-resque

二、常用方法

1、连接 redis

//setBackend($server, $database = 0)

Resque::setBackend('127.0.0.1:6379');

2、向队列中添加工作

//enqueue($queue, $class, $args = null, $trackStatus = false)

$token = Resque::enqueue('default', 'My_Job', ['name'=>'test'], true);

3、查看工作状态

$status = (new Resque_Job_Status($token))->get();

4、停止(移除)工作

(new Resque_Job_Status($token))->stop();

三、常驻任务处理队列(示例:worker.php)

//处理 default 队列;也可以填 *,代表所有队列

$worker = new Resque_Worker('default');//LOG_NONE 不写日志, LOG_NORMAL 普通,LOG_VERBOSE 详细

$worker->logLevel = Resque_Worker::LOG_VERBOSE;//队列处理时间间隔,单位:秒

$worker->work(5);

注:worker.php 要以命令行的方法执行,并长驻后台,/usr/local/php/bin/php /xxx/xxx/worker.php

四、处理工作的类

classMy_Job

{/**

* 前置操作

* @return void*/

public functionsetUp()

{//... Set up environment for this job

}/**

* 消费队列

* @return void*/

public functionperform()

{//execute a job

}/**

* 后置操作

* @return void*/

public functiontearDown()

{//... Remove environment for this job

}

}

五、完整例子

test.php

* php-resque 队列服务*/

classQueue

{private $jobStatusObject =[];function __construct($server, $database = 0)

{//连接 redis

Resque::setBackend($server);

}private function getJobStatusObject($token)

{if (!isset($this->jobStatusObject[$token])) {$this->jobStatusObject[$token] = new Resque_Job_Status($token);

}return $this->jobStatusObject[$token];

}/**

* 入队并返回 token

* @param array $args 参数

* @return string*/

public function enqueue($args =[])

{//队列名称、指定消费类、参数

return Resque::enqueue('default', 'My_Job', $args, true);

}/**

* 查询 job 的状态

* @param string $token 任务token

* @return array 状态信息*/

public function getJobStatus($token)

{$status = $this->getJobStatusObject($token)->get();$statusOptions =[1 => 'STATUS_WAITING',

2 => 'STATUS_RUNNING',

3 => 'STATUS_FAILED',

4 => 'STATUS_COMPLETE'];return isset($statusOptions[$status])? $statusOptions[$status]:$status;

}/**

* 清除 job

* @param string $token 任务token

* @return boolean 状态信息*/

public function clearJob($token)

{if ($this->getJobStatusObject($token) == 'STATUS_COMPLETE')

{return $this->getJobStatusObject($token)->stop();

}else{error_log("非已完成任务,不可清除。token: {$token}\n", 3, 'logs.txt');return false;

}

}

}empty($_GET) && die('url参数或数据不能为空!!');$token = isset($_GET['token'])? trim($_GET['token']):'';$act = isset($_GET['act'])? trim($_GET['act']):'';$queue = new Queue('127.0.0.1:6379');if (empty($act))

{//任务入队

$token = $queue->enqueue($_GET);error_log("token: {$token}\n", 3, 'logs.txt');echo 'token: ', $token, '


';exit;

}elseif ($act == 'status' && !empty($token))

{//查看任务状态

var_dump($queue->getJobStatus($token));exit;

}elseif ($act == 'del' && !empty($token))

{//移除任务

var_dump($queue->clearJob($token));exit;

}?>

测试url:

http://localhost/redis/queue/test.php?name=test

// token: 37efa5889472ef04108a55ba8cc448e4

http://localhost/redis/queue/test.php?act=status&token=37efa5889472ef04108a55ba8cc448e4

// string(14) "STATUS_WAITING"

http://localhost/redis/queue/test.php?act=del&token=37efa5889472ef04108a55ba8cc448e4

// bool(false)

worker.php

require 'vendor/autoload.php';/**

* 任务*/

classMy_Job

{/**

* 前置操作

* @return void*/

public functionsetUp()

{//... Set up environment for this job

}/**

* 消费队列

* @return void*/

public functionperform()

{//按自己的业务处理 job

error_log(var_export($this->args, true)."\n\n", 3, 'logs.txt');//抛出错误则表示工作处理失败,否则工作状态将更新为完成(STATUS_COMPLETE)

// 注:return false 不管用,一定要 throw Exception 才行

// throw new Exception('Unable to run this job!');

}/**

* 后置操作

* @return void*/

public functiontearDown()

{//... Remove environment for this job

}

}//处理 default 队列;也可以填 *,代表所有队列

$worker = new Resque_Worker('default');//LOG_NONE 不写日志, LOG_NORMAL 普通,LOG_VERBOSE 详细

$worker->logLevel = Resque_Worker::LOG_VERBOSE;//队列处理时间间隔,单位:秒

$worker->work(5);?>

 类似资料: