Slim Framework RabbitMQ

郑正阳
2023-12-01

关于 illuminate/database 的注册部分参考这个即可

下面我要另外说明的一点是, 在 settings 中对 database 的部分有一些区别,主要是为了

区分正式数据库和测试数据库。

<?php

$remote_addr = empty($_SERVER['REMOTE_ADDR']) ? "" : $_SERVER['REMOTE_ADDR'];
return [
    'settings' => [
        // Database settings
        'database' => [
            'driver'    => getenv('DB_DRIVER'), //mysql
            'host'      => getenv('DB_HOST'), //ip地址
            'database'  => ($remote_addr == getenv('DB_HOST')) ? getenv('DB_NAME') : getenv('DB_NAME_TEST'), // 这里我想用 REMOTE_ADDR 来区分 正式数据库 和 测试数据库,暂时不考虑反向代理的问题
            'username'  => getenv('DB_USER'),//用户名
            'password'  => getenv('DB_PASSWORD'),//密码
            'port'      => getenv('DB_PORT'),//端口
            'charset'   => getenv('DB_CHARSET'),//utf8
            'collation' => getenv('DB_COLLATION'),//utf8_unicode_ci
            'prefix'    => getenv('DB_PREFIX'),//prefix_
        ]
    ],
];

这样我们就简单的实现了正式数据库和测试数据库的区分。

然后在 Slim 框架中注册我们的 RabbitMQ 依赖。这里我采用了一个相对统一的操作消息队列的包来处理消息队列。注册之前我们先

安装相关扩展和包文件。

amqp扩展安装 Communicate with any AMQP compliant server

amqp 包

AMQP transport

安装完成之后,我们就可以来添加依赖了。

在 Slim 框架 DI 中添加依赖。

<?php

use Enqueue\AmqpExt\AmqpConnectionFactory;

$container['amqpfactory'] = function () {
    $factory = new AmqpConnectionFactory([
        'host'      => 'localhost',
        'port'      => 5672,
        'vhost'     => '/',
        'user'      => '你的 rabbitmq 用户',
        'pass'      => '你的 rabbitmq 密码',
        'persisted' => false,
    ]);
    return $factory;
};

然后在基类中声明类成员属性 $amqpfactory; 并用phpdoc var 来指定属性所属类文件 同样这里我们可以采用直接声明赋值属性,这种类型的属性声明要用protected来声明属性。

直接声明加载依赖

<?php

namespace 你的文件命名空间;

use Interop\Container\ContainerInterface;

class BaseController
{
    /** @var \Interop\Container\ContainerInterface */
    protected $container;

    /** @var \Enqueue\AmqpExt\AmqpConnectionFactory */
    protected $amqpfactory;

    /**
     * BaseController constructor.
     *
     * @param \Interop\Container\ContainerInterface $container
     */
    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
        $this->amqpfactory = $this->container->get('amqpfactory');
    }
}

也可以用

php magic methods

方式来在代码中加载属性 (private) 指向的实例。

<?php

namespace 你的文件命名空间;

use Interop\Container\ContainerInterface;

class BaseController
{
    /** @var \Interop\Container\ContainerInterface */
    protected $container;

    /** @var \Enqueue\AmqpExt\AmqpConnectionFactory */
    private $amqpfactory;

    /**
     * BaseController constructor.
     *
     * @param \Interop\Container\ContainerInterface $container
     */
    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }

    /**
     * @param $name
     * @return mixed
     */
    public function __isset ($name)
    {
        return $this->container->{$name};
    }

    /**
     * @param $name
     * @param $value
     * @return mixed
     */
    public function __set ($name, $value)
    {
        return $this->container->{$name} = $value;
    }

    /**
     * @param $name
     * @return mixed
     */
    public function __get ($name)
    {
        return $this->container->{$name};
    }
}

两种方式调用方式相同

$this->amqpfactory->你的方法……

基础工作基本完成,现在来描述一下我的具体

需求

当用户搜索关键字,这个关键字假定说是一条公司记录,同时我们有一块区域是用于展示热门搜索的,那这个热门搜索的定义我们是来按该公司被用户检索过的记录次数来区分的,被检索的越多次说明该公司热门程度越高。

那需求有了,接下来我们需要分析,

1 对于这个需求有几种合适的解决方案?

2 这些方案分别有什么利各弊?

3 我们会选择哪种方案来满足这个需求?

4 还有哪些能改进的地方?

那思路有了,具体的分析大家可以结全自身来具体分析,这里我就不浪费大家保贵的时间来序述了。 在这里我只描述一点我对于这个需求的解决方案。这里我采用了rabbitmq消息队列来处理。之所以采用这个方案一方面是为了保证前端接收数据的及时,另一方面也是保证数据库负载的有效降压。

假设下面就是你的业务代码,主要分为两部分,一部分用于将查询出来的结果 pulisher 到消息队列中,另一部分是用于 consumer 丛消息队列中取出数据进行处理。

Puliser 部分

<?php

namespace 你的命名空间;

use Interop\Container\ContainerInterface;
use Conduit\Controllers\BaseController;
use Interop\Amqp\AmqpTopic;
use Slim\Http\Request;
use Slim\Http\Response;

class CompanyController extends BaseController
{
    public function __construct(ContainerInterface $container)
    {
        parent::__construct($container);
        // check database is development or produce
    }

    public function homeSearchIndex(Request $request, Response $response, array $args)
    {
        // $company 假设是你查询出来的结果,这里因为我用的 Eloquent ORM 所以查询出来的结果是集合 collection 的形式,用 pluck 处理得到所有的公司 id,在将数据转换为 json 格式

        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        // amqp message body
        $mqbody = $company->pluck('id')->toJson(); // $mqbody 为查询出的所有符合条件的公司 id 的 json 数组

        // pulisher mqbody to rabbitmq
        $psrContext = $this->amqpfactory->createContext();
        // Declare topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        $topic = $psrContext->createTopic('homepagesearch');
        $topic->setType(AmqpTopic::TYPE_FANOUT);
        $psrContext->declareTopic($topic);
        // Send message to topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        /** @var \Interop\Amqp\Impl\AmqpTopic $topic */
        $message = $psrContext->createMessage($mqbody);
        $psrContext->createProducer()->send($topic, $message);
        // Close
        $psrContext->close();
        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++

        // …… 其他你的后置操作
    }
}

然后我们编写,消息处理部分也就是我们的消费者。这里需要我们先安装一个 php cli 模式下运行的包,这个包主要用于和我们框架的集成,如果不运用框架本身的一些配置,单独做也可以,这里我就集成到了框架中。

adrianfalleiro/slim-cli-runner

Consumer

<?php

namespace 你的命名空间;

use Conduit\Controllers\BaseController;
use Conduit\Models\Company;
use Interop\Container\ContainerInterface;
use Interop\Amqp\AmqpTopic;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\Impl\AmqpBind;

class HomePageSearchTask extends BaseController
{
    public function __construct(ContainerInterface $container)
    {
        parent::__construct($container);
    }

    public function command()
    {
        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        // consumer
        $psrContext = $this->amqpfactory->createContext();
        // Declare topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        $topic = $psrContext->createTopic('homepagesearch');
        $topic->setType(AmqpTopic::TYPE_FANOUT);
        $psrContext->declareTopic($topic);
        // Declare queue
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        $queue = $psrContext->createQueue('homepagesearch');
        $queue->addFlag(AmqpQueue::FLAG_DURABLE);
        $psrContext->declareQueue($queue);
        // Bind queue to topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        /** @var \Interop\Amqp\Impl\AmqpQueue $queue */
        /** @var \Interop\Amqp\Impl\AmqpTopic $topic */
        $psrContext->bind(new AmqpBind($topic, $queue));
        // Consume message
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        /** @var \Interop\Amqp\Impl\AmqpQueue $queue */
        $consumer = $psrContext->createConsumer($queue);
        while (true) {
            if ($message = $consumer->receive()) {
                $bodyarr = json_decode($message->getBody(), true); // 还记得上面我们的 publisher的数据格式么?json的需要解析一下
                // process a message 这里你需要针对你自身需求来定制你的处理方式
                Company::query()->whereIn("id", $bodyarr)->increment("search");
                $consumer->acknowledge($message);
            }
        }
        // Close content
        $psrContext->close();
        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    }
}

那到现在,当你运行查询的时候你的 publisher exchange 将会创建。

那 consumer 又该如何运行起来呢?还记得我们安装的 slim-cli-runner 包文件么?现在需要用到它的时候 了。

php 你的框架项目绝对路径/public/index.php HomePageSearchTask > /dev/null 2>&1 &

这样就运行起来我们的 consumer 来处理队列中的消息了。

那还有一点问题就是,当我关掉终端窗口的时候这个 consumer 该如何做为守户进程执行。这里有好多监控进程的方案,像 supervisor,crontab等,因为这块我也没下功夫研究,

采用了 crontab 的方式来监控。

首先我们启动一个进程。

# setsid php 你的框架项目绝对路径/public/index.php HomePageSearchTask  > /dev/null 2>&1 &

此时进程状态返回值为 0

# ps aux|grep HomePageSearchTask | grep -v grep|wc -l

创建我们要执行的 shell 脚本

#!/bin/bash
alive=`ps aux|grep HomePageSearchTask | grep -v grep|wc -l`
if [ $alive -eq 0 ]
then
nohup /usr/local/php/bin/php 你的框架项目绝对路径/public/index.php HomePageSearchTask > /dev/null 2>&1 &
fi

修改 shell 脚本执行权限

# chomod +x HomePageSearchTask.sh

手动执行 sh 可检测书写是否有语法错误

sh HomePageSearchTask .sh

创建定时任务脚本,每 8 分钟执行检测

# crontab -e
0-59/8 * * * * /root/HomePageSearchTask.sh > /dev/null 2>&1 &

等待,返回值为 1,则说明我们的 HomePageSearch 进程在成功运行

# ps aux|grep HomePageSearchTask | grep -v grep|wc -l

这里面还有一个

小插曲

就是我上面不是用 REMOTE_ADDR 来测试执行的是哪个数据库么?正式的还是测试的数据库,

那在本地测试的话 consumer 处理的数据是处理到了测试数据库,这也是我想要的,

但是当我执行的 publiser 是正式数据库的时候,consumer 处理的数据还是处理到了测试数据库中,

几乎把所有能调试的地方我都调试了,没找到问题。

后来我发现在我 publiser 到队列中消息体的时候做了下 toJson()的转换,这个问题竟然解决了,但是我之前也是用 json_encode($mqbody) 来处理的啊,那这个 toJson 和 json_encode() 的效果应该是一样的啊,一直到现在我都不太清楚到底是为什么会造成执行数据库不一致的问题,

初步猜想

1 可能是这个 REMOTE_ADDR 的问题,但是都打印测试查看过的,没有问题

2 发布消息体到队列的时候,消息体内容格式没有做好处理,因为我就是把消息内容主体处理了下就好了

但我想事情肯定没有这么简单,如果有知道的大佬,还请多多指教啊。小弟在这里先行答谢了。

 类似资料:

相关阅读

相关文章

相关问答