基础
require "vendor/autoload.php"; //引入组件
use Pheanstalk\Pheanstalk;//引入组件
$ph = new Pheanstalk('127.0.0.1',11301);//创建链接
维护类
1.查看目前pheanStalkd状态信息
$ph->stats()
2.显示目前存在的管道
$ph->listTubes()
3.查看NewUsers管道的信息
$ph->useTube('NewUsers')->put('test');
$ph->useTube('NewUsers')->put('up'); //向NewUsers管道添加一个up任务
print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的信息
6.查看指定管道中某一个任务的情况
$job = $ph->watch('NewUsers')->reserve(); //从管道中取出任务(消费)
print_r($ph->statsJob($job)); //查看指定管道中某一个任务的情况
7.查看任务id为1的任务详情
$job = $ph->peek(1);//直接取出任务id为1的任务 [注:beanstalkd中所有任务的id都具有唯一性]
print_r($ph->statsJob($job));//查看任务id为1的任务详情
生产类
第一种 put()
$tube = $ph->useTube('NewUsers');//连接NewUsers管道
print_r($tube->put('four'));//向NewUsers管道添加任务four,并返回结果
注: put()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)
第二种 putInTube() [注: putInTube()就是对useTube()和put()的封装]
$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任务three
注: putInTube()方法还有3个可选参数(依次为: 优先级priority,延迟时间delay,任务超时重发ttr)
print_r($res);//返回任务id
print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的详细情况
消费类
1.watch 监听NewUsers管道 [ 注: watch()同样可以监听多个管道 ]
$tube = $ph->watch('NewUsers');
print_r($ph->listTubesWatched());//打印已经监听的管道
2.watch 监听多个管道
$tube = $ph->watch('NewUsers')->watch('default');
print_r($ph->listTubesWatched());//打印已经监听的管道
3.ignore 监听NewUsers管道,忽略default管道
$tube = $ph->watch('NewUsers')->ignore('default');
print_r($ph->listTubesWatched());//打印已经监听的管道
4.reserve 监听NewUsers管道,并且取出任务
$job = $ph->watch('NewUsers')->reserve();
$job = $ph->reserveWithTimeout (3);//过期时间 秒
var_dump($job);//打印已经取出的任务
$ph->delete($job);//删除已经取出的任务
注:reserve()有1个参数,阻塞的时间,过了阻塞时间,不管有没有东西,直接返回。4.0版本以后基本废弃使用,改为reserveWithTimeout (3)
5.putInTube/put 向NewUsers管道写入任务 [ 注:此为生产者方法,放到此处是为了方便理解 ]
$ph->putInTube('NewUsers','number_1',5);
$ph->putInTube('NewUsers','number_2',3);
$ph->putInTube('NewUsers','number_3',0);
$ph->putInTube('NewUsers','number_4',4);
print_r($ph->statsTube('NewUsers'));//查看NewUsers管道详细信息
6.release 将取出的任务放回ready状态,还有2个参数(优先级和延迟)
$job = $ph->watch('NewUsers')->reserve();//监听NewUsers管道,并取出任务
if (true) {
sleep(30);
$ph->release($job);//将任务取出之后,停留30秒,然后将任务状态重新变为ready
} else {
$ph->delete($job);
}
7.bury (预留) 将任务取出之后,发现后面执行的逻辑不成熟(比如发邮件,突然发现邮件服务器挂掉了),或者说还不能执行后面的逻辑,需要把任务先封存起来,等待时机成熟了,再拿出这个任务进行消费
$job = $ph->watch('NewUsers')->reserve();//取出任务
$ph->bury($job);//取出任务后,将任务放到一边(预留)
8.peekBuried() 将处在bury状态的任务读取出来
$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
var_dump($ph->statsJob($job));//打印任务状态(此时任务状态应该是bury)
9.kickJob() 将处在bury任务状态的任务转化为ready状态
$job = $ph->peekBuried('NewUsers');//将NewUsers管道中处在bury状态的任务读取出来
$ph->kickJob($job);
10.kick() 将处在bury任务状态的任务转化为ready状态,有第二个参数int, 批量将任务id小于此数值的任务转化为ready
$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任务id小于65,并且任务状态处于bury的任务全部转化为ready
11.peekReady() 将管道中处于ready状态的任务读出来
$job = $ph->peekReady('NewUser');//将NewUser管道中处于ready状态的任务读取出来
var_dump($job);
$ph->delete($job);
12.peekDelay() 将管道中所有处于delay状态的任务读取出来
$job = $ph->peekDelayed('NewUser');
var_dump($job);
$ph->delete($job);
13.pauseTube() 对整个管道进行延迟设置,让管道处于延迟状态
$ph->pauseTube('NewUser',10);//设置管道NewUser延迟时间为10s
$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
var_dump($job);
14.resumeTube() 恢复管道,让管道处于不延迟状态,立即被消费
$ph->resumeTube('NewUser');//取消管道NewUser的延迟状态,变为立即读取
$job = $ph->watch('NewUser')->reserve();//监听NewUser管道,并取出任务
var_dump($job);
15.touch() 让任务重新计算任务超时重发ttr时间,相当于给任务延长寿命