当前位置: 首页 > 工具软件 > Swoft > 使用案例 >

Swoft2.X 使用进程处理Redis队列

岳彬炳
2023-12-01

连接Redis

在bean.php中配置

普通连接池配置

'redis-2' => [
    'class'         => Swoft\Redis\RedisDb::class,
    'host'          => '10.0.0.2',
    'port'          => 6379,
    'database'      => 1,
    'retryInterval' => 10,
    'readTimeout'   => 0,
    'timeout'       => 2,
    'option'        => [
        'prefix'     => 'Swoft',
        'serializer' => Redis::SERIALIZER_PHP
    ]
],
'redis.pool-2' => [
    'class'       => Swoft\Redis\Pool::class,
    'redisDb'     => bean('redis-2'),
    'minActive'   => 10,
    'maxActive'   => 20,
    'maxWait'     => 0,
    'maxWaitTime' => 0,
    'maxIdleTime' => 60,
]

连接池配置项说明:

  • class:连接池驱动类,仅自定义时需指定。默认为 Swoft 连接池驱动
  • redisDb:指定 Redis 配置
  • minActive:最少连接数
  • maxActive:最大连接数
  • maxWait:最大等待连接数,默认为 0 无限制
  • maxWaitTime:连接最大等待时间,默认为 0 秒无限制
  • maxIdleTime:连接最大空闲时间,单位秒

Api

这里我们假设请求api后 , 触发任务, 把数据存储到redis list中

demo:

注解的方式引入redis默认的连接池 , 通过连接池push到list中

<?php

namespace App\Http\Controller;


use App\Model\Entity\GoodModel;
use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Http\Server\Annotation\Mapping\Controller;
use Swoft\Http\Server\Annotation\Mapping\RequestMapping;
use Swoft\Redis\Pool;

/**
 * @Controller(prefix="redisjob")
 */
class RedisJob
{
    /**
     * @Inject()
     * @var Pool
     */
    protected $redis;

    /**
     * @RequestMapping("rpush")
     */
    public function redispush()
    {
        return $this->redis->lPush("message",time());
    }
}

任务进程

使用自定义用户进程之前,必须定义用户进程,如下定义一个监控上报信息的用户进程为例:

示例: 自定义用户进程入口。

<?php declare(strict_types=1);

namespace App\Process;

use App\Model\Logic\MonitorLogic;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Db\Exception\DbException;
use Swoft\Process\Process;
use Swoft\Process\UserProcess;

/**
 * Class MonitorProcess
 *
 * @since 2.0
 *
 * @Bean()
 */
class MonitorProcess extends UserProcess
{
    /**
     * @Inject()
     *
     * @var MonitorLogic
     */
    private $logic;

    /**
     * @param Process $process
     *
     * @throws DbException
     */
    public function run(Process $process): void
    {
        $this->logic->monitor($process);
    }
}
  • 自定义用户进程必须实现 Swoft\Process\UserProcess 接口
  • 自定义用户进程必须使用 @Bean 标记为一个 bean 对象

示例: 业务处理。

<?php declare(strict_types=1);

namespace App\Model\Logic;

use App\Model\Entity\User;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Db\Exception\DbException;
use Swoft\Log\Helper\CLog;
use Swoft\Process\Process;
use Swoft\Redis\Redis;
use Swoole\Coroutine;

/**
 * Class MonitorProcessLogic
 *
 * @since 2.0
 *
 * @Bean()
 */
class MonitorLogic
{
    /**
     * @param Process $process
     *
     * @throws DbException
     */
    public function monitor(Process $process): void
    {
        $process->name('swoft-monitor');

        while (true) {
            $connections = context()->getServer()->getSwooleServer()->connections;
            CLog::info('monitor = ' . json_encode($connections));

            // Database
            $user = User::find(1)->toArray();
            CLog::info('user='.json_encode($user));

            // Redis
            Redis::set('test', 'ok');
            CLog::info('test='.Redis::get('test'));

            Coroutine::sleep(3);
        }
    }
}

任务demo:

基于swoole的毫秒级定时器 , 每500毫秒执行一次出队

如果想实时处理可以使用while(true)来实现

<?php

namespace App\Process;

use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Process\UserProcess;
use Swoft\Redis\Pool;
use Swoole\Timer;

/**
 * @Bean()
 */
class RedisQueueProcess extends UserProcess
{

    /**
     * @Inject()
     * @var Pool
     */
    protected $redis;

    /**
     * @param \Swoft\Process\Process $process
     * @return void
     */
    public function run(\Swoft\Process\Process $process):void
    {
        // swoole定时器 , 500毫秒执行一次
        Timer::tick(500,function (){
            $res = $this->redis->lpop("message");
            if ($res)
                echo "队列获取数据:".$res.PHP_EOL;
        });
    }

}

装载任务到http服务同步启动

这里在app\bean.php文件中 , 添加上自定义的process

 'httpServer'         => [
        'class'    => HttpServer::class,
        'port'     => 18333,
        'listener' => [
            // 'rpc' => bean('rpcServer'),
            // 'tcp' => bean('tcpServer'),
        ],
        'process'  => [
//             'monitor' => bean(\App\Process\MonitorProcess::class)
            'redisqueue'=>bean(\App\Process\RedisQueueProcess::class)
            // 'crontab' => bean(CrontabProcess::class)
        ],
     
]

ok , 这样子就完成了, 启动swoft测试一下吧

下一次来用php实现rabbitMq的队列服务

 类似资料: