当前位置: 首页 > 工具软件 > Redis Queue > 使用案例 >

使用workerman/redis-queue做队列(订阅)

邵阳辉
2023-12-01

前言

手上有一个短信群发通知功能,考虑到日后单个时间通知的手机号会有点多,故打算用workerman里的redis队列功能来实现(thinkphp6框架下)。

部署

1. 本地安装redis服务(如在本地调试的话),并在.env写入配置文件

[REDIS]
#AUTH为密码,本地为空
AUTH=
HOST=127.0.0.1
PORT=6379

2. compser安装workerman/redis-queue

 composer require workerman/redis-queue:1.0.10

3. 使用make命令创建command(命令类)

 php think make:command Redis redis

4. 将workman启动参数复制到command文件夹下的Redis里

<?php
declare (strict_types = 1);

namespace app\command;

use common\Service\ImportFileService;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use Workerman\RedisQueue\Client;
use Workerman\Worker;

class Redis extends Command
{
    //修改命令文件
    protected function configure()
    {
        // 指令配置
        $this->setName('redis')
            ->addArgument('status', Argument::REQUIRED, 'start/stop/reload/status')
            ->addOption('d', null, Option::VALUE_NONE, 'daemon(守护进程)方式启动')
            ->setDescription('start/stop/restart loadItem');
    }

    protected function init(Input $input, Output $output)
    {
        global $argv;

        $argv[1] = $input->getArgument('status') ?: 'start';
        if ($input->hasOption('d')) {
            $argv[2] = '-d';
        } else {
            unset($argv[2]);
        }
    }

    protected function execute(Input $input, Output $output)
    {
        $this->init($input, $output);
        $worker = new Worker();
        $worker->count = 1;
        $worker::$pidFile = rtrim(root_path(), '/') . '/runtime/load.pid';
        $worker::$logFile = rtrim(root_path(), '/') . '/runtime/load.log';
        $worker->name = 'loadItem';
        $worker->onWorkerStart = [$this, 'start'];
        $worker->runAll();
    }
}

5. 在config/console.php声明

    'commands' => [
        "redis" => app\command\Redis::class,
    ],

使用

1. 将待处理的数据写入redis队列(生产),在需要的地方引入

function redisQueueSend($redis, $queue, $data, $delay = 0)
{
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id' => rand(),
        'time' => $now,
        'delay' => 0,
        'attempts' => 0,
        'queue' => $queue,
        'data' => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting . $queue, $package_str);
}
public function insertData(){
	//连接redis
	$redis = new \Redis();
	$redis->connect(config('app.redis_host'), intval(config('app.redis_port')));
	$redis->select(15);
	//写入队列
	$mobileList=['15267081542','15267081543'];
	foreach ($mobileList as &$v) {
		$v["mobile"] = $r->id;
		//数据推入redis中
		redisQueueSend($redis, "redis", $v);
	}
}

2. 将数据从redis取出后处理(消费),写在command下的redis文件夹里

    public function start($work)
    {
        $options = [
            'db' => 15,
//            "auth" => config('app.redis_auth')
        ];
        $client = new Client('redis://' . config('app.redis_host') . ":" . config('app.redis_port'), $options);
        $client->subscribe('load', function ($data) {
            ImportFileService::do($data);//data格式为['mobile'=>1]
        });
    }

3. 打开redis客户端

这里的redis就是上面configure里的name(名),start就是Argument(动作),启动worker用的

 php think redis start

参考资料:
https://www.kancloud.cn/manual/thinkphp6_0/1037651
https://www.workerman.net/doc/workerman/components/workerman-redis-queue.html

 类似资料: