当前位置: 首页 > 工具软件 > think-admin > 使用案例 >

think-queue tp自带的消息队列类库

柳韬
2023-12-01

简介

其中主要介绍了关于使用think-queue来实现普通队列和延迟队列的相关内容,think-queue是thinkphp官方提供的一个消息队列服务

基本特性:

消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
队列的多队列, 内存限制 ,启动,停止,守护等
消息队列可降级为同步执行

消息队列实现过程

  1. 通过生产者推送消息到消息队列服务中
  2. 消息队列服务将收到的消息存入redis队列中(zset)
  3. 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
  4. 处理获取下来的消息调用业务类进行处理相关业务
  5. 业务处理后,需要从队列中删除消息

安装

安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

composer require topthink/think-queue

配置

配置文件位于 config/queue.php

  return [
       'connector'  => 'Redis',		// Redis 驱动
       'expire'     => 60,		// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 
       'default'    => 'default',		// 默认的队列名称
       'host'       => '127.0.0.1',	// redis 主机ip
       'port'       => 6379,		// redis 端口
       'password'   => '',		// redis 密码
       'select'     => 0,		// 使用哪一个 db,默认为 db0
       'timeout'    => 0,		// redis连接的超时时间
       'persistent' => false,		// 是否是长连接
     
   //    'connector' => 'Database',   // 数据库驱动
   //    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
   //    'default'   => 'default',    // 默认的队列名称
   //    'table'     => 'jobs',       // 存储消息的表名,不带前缀
   //    'dsn'       => [],
 
   //    'connector'   => 'Topthink',	// ThinkPHP内部的队列通知服务平台 ,本文不作介绍
   //    'token'       => '', 
   //    'project_id'  => '',
   //    'protocol'    => 'https',
   //    'host'        => 'qns.topthink.com',
   //    'port'        => 443,
   //    'api_version' => 1,
   //    'max_retries' => 3,
   //    'default'     => 'default',
 
   //    'connector'   => 'Sync',		// Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
   ];

如果使用数据库的话

CREATE TABLE `prefix_jobs` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) NOT NULL,
  `payload` longtext NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

创建任务类

namespace app\job;

use think\queue\Job;

class Job1{
    
    public function fire(Job $job, $data){
    
            //....这里执行具体的任务 
            
             if ($job->attempts() > 3) {
                  //通过这个方法可以检查这个任务已经重试了几次了
             }
            
            
            //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
            $job->delete();
            
            // 也可以重新发布这个任务
            $job->release($delay); //$delay为延迟时间
          
    }
    
    public function failed($data){
    
        // ...任务达到最大重试次数后,失败了
    }

}

执行

  1. think\facade\Queue::push($job任务名(类名) , $data = ‘参数’, $queue = 队列名)
  2. think\facade\Queue::later($delay延时时间, $job任务名(类名@+方法名), $data = ‘参数’, $queue = 队列名)
use think\facade\Queue;
// 普通队列生成调用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);
// 延时队列生成调用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延时队列 10 秒后执行:
Queue::later(10 , Test::class, $data, $queueName);

前者是立即执行,后者是在$delay秒后执行

监听任务并执行

queue:work 默认只执行一次队列请求,当请求执行完成后就终止;这里可以看出,要是延迟队列,就不能使用该命令喽
queue:work --daemon 同listen 一样,只要运行着,就能一直接受请求,不一样的地方是在这个运行模式下,当新的请求到来的时候,**不重新加载整个框架 **, 而是直接 fire 动作。所以若是更改的代码,需要停止,然后重新启动。(该命令是在laravel4.2以后才加入的)

&> php think queue:work

queue:listen 监听队列请求,只要运行着,就能一直接受请求,除非手动终止

&> php think queue:listen 
 类似资料: