安装thinkphp-queue
composer install topthink/think-queue
win10安装redis
redis下载地址:点我去下载
找到相应的msi文件下载安装即可
php安装redis扩展
redis扩展下载地址:点我去下载
需要注意选择相应的VC扩展包,同时php.ini开启redis扩展
app\index\controller\Demo.php
namespace app\index\controller;
use think\facade\Queue;
class Demo{
public function index()
{
//当前任务将由哪个类来负责处理。
//当轮到该任务时,系统将生成一个该类的实例,并默认调用其 fire 方法
$jobHandlerClassName = 'app\task\job\Order';
//当前任务归属的队列名称,如果为新队列,会自动创建
//php think queue:work --queue orderJobQueue
//php think queue:work --queue orderJobQueue --daemon
$jobQueueName = "orderJobQueue";
//数组数据
$orderData = [
'id' => uniqid(),
'time' => time(),
];
//将该任务推送到消息队列,等待对应的消费者去执行
//这里只是负责将数据添加到相应的队列名称的队列里,消费者与生产者并无联系
$isPushed = Queue::push($jobHandlerClassName , $orderData, $jobQueueName);
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " 队列添加成功";
}else{
echo '队列添加失败';
}
}
}
app\task\job\Order.php
namespace app\task\job;
use think\queue\Job;
use think\facade\Log;
/**
* @Title: app\task\job$Order
* @Package package_name
* @Description: todo(测试订单消费者)
* @author 苏晓信 <654108442@qq.com>
* @date 2020年11月20日
*/
class Order{
/**
* @Title: fire
* @Description: todo(fire方法是消息队列默认调用的方法)
* @param Job $job
* @param array $data
* @author 苏晓信 <654108442@qq.com>
* @date 2020年11月20日
* @throws
*/
public function fire(Job $job, $data)
{
//有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
}
$isJobDone = $this->orders($data);
if ($isJobDone) {
//如果任务执行成功,记得删除任务
$job->delete();
}else{
//通过这个方法可以检查这个任务已经重试了几次了
if ($job->attempts() > 3){
Log::error('试了3次了');
$job->delete();
//也可以重新发布这个任务
//print("Hello Job will be availabe again after 2s."."\n");
//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
}
}
}
/**
* @Title: checkDatabaseToSeeIfJobNeedToBeDone
* @Description: todo(有些消息在到达消费者时,可能已经不再需要执行了)
* @param array $data
* @return boolean
* @author 苏晓信 <654108442@qq.com>
* @date 2020年11月20日
* @throws
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}
/**
* @Title: orders
* @Description: todo(数据处理)
* @param array $data
* @author 苏晓信 <654108442@qq.com>
* @date 2020年11月20日
* @throws
*/
public function orders($data)
{
//对订单进行数据库操作或其他等等
Log::info(date('Y-m-d H:i:s').' - data:'.json_encode($data));
return true;
}
}
监听任务并执行
php think queue:work --queue orderJobQueue
多任务
app\index\controller\Demo.php
namespace app\index\controller;
use think\Exception;
use think\facade\Queue;
class Demo{
public function index()
{
$taskType = $_GET['taskType'];
switch ($taskType) {
// 域名地址/index/demo/index?taskType=taskA
case 'taskA':
$jobHandlerClassName = 'app\task\job\Order@taskA';
$jobDataArr = ['a' => '1'];
//php think queue:work --queue orderAJobQueue
$jobQueueName = "orderAJobQueue";
break;
// 域名地址/index/demo/index?taskType=taskB
case 'taskB':
$jobHandlerClassName = 'app\task\job\Order@taskB';
$jobDataArr = ['b' => '2'];
//php think queue:work --queue orderBJobQueue
$jobQueueName = "orderBJobQueue";
break;
default:
break;
}
$isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
if ($isPushed !== false) {
echo("$taskType 添加至 ".$jobQueueName ."
");
}else{
throw new Exception("push a new $taskType of MultiTask Job Failed!");
}
}
}
app\task\job\Order.php
namespace app\task\job;
use think\queue\Job;
use think\facade\Log;
class Order{
public function taskA(Job $job, $data){
$isJobDone = $this->_doTaskA($data);
if ($isJobDone) {
$job->delete();
}else{
if ($job->attempts() > 3) {
$job->delete();
}
}
}
public function taskB(Job $job, $data){
$isJobDone = $this->_doTaskB($data);
if ($isJobDone) {
$job->delete();
}else{
if ($job->attempts() > 2) {
$job->release();
}
}
}
private function _doTaskA($data) {
Log::info(date('Y-m-d H:i:s').' - TaskA - data : '.json_encode($data));
return true;
}
private function _doTaskB($data) {
Log::info(date('Y-m-d H:i:s').' - TaskB - data : '.json_encode($data));
return true;
}
}
监听任务并执行
php think queue:work --queue orderAJobQueue
php think queue:work --queue orderBJobQueue