当前位置: 首页 > 工具软件 > swoole-jobs > 使用案例 >

php+swoole+RabbitMQ实现的异步生产者与消费者

钱志义
2023-12-01

需求背景

PHP RabbitMQ扩展不支持连接池

连接池的作用主要是节省连接的时间。连接池机制预先打开N个连接,把它们缓存起来,当需要使用连接的时候就直接使用这些已经打开的连接,从而节省了时间。但PHP的RabbitMQ扩展为了兼容PHP-FPM模式,因为在PHP-FPM模式下是不能做连接池的,脚本在解释执行完毕后会释放所有的内存资源,没法进行内存常驻。

RabbitMQ配置信息与业务强耦合

RabbitMQ配置信息散落在各个业务上,任意配置的改动,都需要各个业务组进行配合修改,并且要在切换的过程中保证线上的服务运行正常,且没法做到统一管理,给维护上带来一定的难度与成本。

PHP-FPM缺乏异步消息投递机制

PHP-FPM为同步编程模型,并不支持异步的编程方式,所有代码都是逐行顺序执行的。非常符合程序员的习惯,不管是编码还是调试。但在IO密集型的场景下,就会逐渐显得有点乏力,因为该场景下的CPU利用率将会大大降低,发挥不出该有的性能。

性能低

PHP-FPM模型下的PHP,将会有着非常多的初始化。比如配置、连接、文件等等。这一系列的操作,对性能的影响将是毫秒级的。

技术要求

吞吐量

吞吐量(Throughput)指在单位时间内成功传输的数据量。作为消息队列中间件,必须有着非常高的吞吐量,不能因此成为业务的瓶颈。

可用性

可用性(Availability)指在一段连续时间考察内,系统正常运行的概率。在保证吞吐量的基础上,还必须保证系统能够7*24小时正常运转。作为消息中转平台,如果发生故障,将会导致业务不同情况受损。

可维护性

可维护性(Maintainability)是衡量一个系统的可修复(恢复)性和可改进性的难易程度。除了编码上的可维护性,还需对故障的检测、诊断、修复提供快速的响应能力。

跨平台

跨平台(Cross Platform)指不应受到硬件或者环境影响而导致程序的不可运行。在设计中,必须考虑到多种编程语言(JAVA、PHP、PYTHON等等)的接入场景,要基于通用的协议进行开发。

业务透明

业务透明(Service Transparency)指对所有环境提供固定的接入方式。消息队列的配置不对业务开放,实现统一管理。无论是阶段性的服务迁移或者是参数调整,对业务来说都是无感知的,简化维护流程。

技术选型简介

PHP简介

PHP(外文名称:PHP:Hypertext Preprocessor,中文名称:超文本预处理器)是一种通用的开源脚本语言。该语法吸收了C语言、Java和Perl的特点,有利于学习和广泛使用,主要适用于Web开发领域。PHP的独特语法是C、Java、Perl和PHP自己的语法的混合。它可以比CGI或Perl更快地执行动态web页面。与其他编程语言相比,PHP将程序嵌入到HTML(一种标准通用标记语言的应用程序)文档中执行。执行效率比完全生成HTML标记的CGI要高得多。PHP还可以执行编译后的代码,这将对代码进行加密和优化,以使其运行得更快。

Swoole简介

Swoole允许PHP开发人员编写高性能、高并发的服务,如TCP、UDP、Unix套接字、HTTP和WebSocket,使PHP超越Web。Swoole4的成熟将PHP带到了一个前所未有的阶段,为性能改进提供了独一无二的可能性。Swoole可广泛应用于互联网、移动通信、云计算、在线游戏、物联网、汽车联网、智能家居等领域。使用PHP+Swoole可以使企业IT研发团队更有效率,更专注于开发创新产品。

RabbitMQ简介

RabbitMQ,全名为Rabbit Message Queue,是一种应用程序到应用程序的通信方法。应用程序通过从队列(应用程序的数据)读写消息进行通信,而不需要专用连接来链接它们。消息传递指的是程序之间通过发送消息中的数据进行通信,而不是通过相互直接调用进行通信,后者通常用于远程过程调用等技术。排队意味着应用程序通过队列进行通信。使用队列消除了同时接收和发送应用程序的需要。

生产者服务设计

架构设计图

待补充

进程关系简介

Reactor线程

  • Reactor线程是在Master进程中创建的线程
  • 负责维护客户端TCP连接、处理网络IO、处理协议、收发数据
  • 不执行任何PHP代码
  • 将TCP客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包

Worker进程

  • 接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
  • 生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端
  • 可以是异步非阻塞模式,也可以是同步阻塞模式
  • Worker以多进程的方式运行

TaskWorker进程

  • 接受由Worker进程投递的任务
  • 处理任务,并将结果数据返回给Task Worker进程
  • 完全是同步阻塞模式
  • Task Worker以多进程的方式运行

Manager进程

  • 负责创建/回收Worker/Task进程

入口设计

基于HTTP协议

HTTP(超文本传输协议)是Internet上使用最广泛的网络传输协议之一。所有的WWW文件必须遵守这个标准,并且可以作为所有编程语言的通用协议使用。

Swoole的http服务启动

$http = new Swoole\Http\Server('0.0.0.0', 9501);
$http->on('request', function ($request, $response) {
    ...调用入口
});
$http->start();

URL路由设计TaskWorker进程工作原理

HTTP服务与TaskWorker进程关系

HTTP接收到请求,完成基本的解析与校验之后,通过TaskWorker进程,进行消息投递并结束HTTP请求。然后由TaskWorker进程进行异步的RabbitMQ消息投递,能够满足高吞吐量的场景。

消息体构建

public static function buildMessage(string $content): array
{
    $content = json_decode($content, true);
    $queue = [];
    foreach($content as $item) {
        $obj = new self();
        foreach([
                'exchange',
                'queue',
                'route',
                'payload'
            ] as $k) {
             $obj->{$k} = $item[$k];
        }
        $queue[] = $obj;
    }
    return $queue;
}

名词解释

  • exchange:交换机名称,用于数据的分发
  • queue:队列名称,数据的承载体
  • route:路由名称,用于寻找数据承载体的路径
  • payload:将要发送RabbitMQ的实体数据(各业务需要的数据)

异步投递RabbitMQ消息

基于TaskWorker,程序立即返回继续向下执行代码。onTask回调函数Task进程池内被异步执行。执行完成后调用 $serv->finish() 返回结果

public function OnTask(swoole_server $serv, int $task_id, int $src_worker_id, $data)
{
      ...TaskWorker进程执行的逻辑代码
}
public static function sendMessageToTask(\swoole_server $process, array $messages)
{
   
    foreach($messages as $message) {
        $process->task($message);
    }
}

RabbitMQ连接池管理

class RabbitMqPool
{

    private static $pool = null;

    public static function getConnectPool()
    {
    }

    public static function release(){
    }
}

方法解释

  • RabbitMQPool类:为静态常驻类,保证RabbitMQ连接不会被垃圾回收机制释放
  • getConnectPool方法:从RabbitMQ连接池中获取一个可用的连接
  • release方法:当RabbitMQ连接使用完之后,需要通过此方法及时归还

消息投递状态统计

$status = [
    'start'         => 0,
    'pid'           => 0,
    'send_success'  => 0,
    'send_error'    => 0,
];

状态解释

  • start:消息进程启动的时间
  • pid:消息进程ID,用于调试追踪
  • req_success:统计消息发送成功的记录数
  • req_error:统计消息发送失败的记录数

服务管理

public function getStatus(){
}
public function shutdown(){
}
public function registerSignal(){
}

方法解释

  • getStatus方法:获取消息投递状态统计信息,便于及时观察系统状况
  • shutdown方法:用于关闭服务
  • registerSignal方法:检测系统信号,用于平滑重启机制

客户端调用示例

public static function urlNotity(array $urlList, string $uniqueKey = null, bool $isSlowQueue = false,string $topic = ''): bool
{
   
    try {
        $config = self::getConfig();
        if ($isSlowQueue) {
            $config['topic'] = "LowQueue";
        }elseif (!empty($topic)){
            $config['topic'] = $topic;
        }
        $config = new ConfigRpc($config['url'], $config['topic'], $config['secretKey']);
        $producerObj = new Producer($config);

        if ($uniqueKey === null || $uniqueKey !== "") {
            $producerObj->setVars(ConstVars::RD_KAFKA_PARTITION_UA, 0, $uniqueKey);
        }
        foreach ($urlList as $url) {
            $producerObj->send([
                'requests' => [
                    //这里不使用并发请求
                    ['url' => $url],
                ],
            ]);
        }
        $producerObj->waitFinish();

        return true;
    } catch (\Exception $exception) {
        return false;
    }
}

小结

生产者服务是基于事件的异步任务系统。当一个HTTP请求到来,将由Worker进程负责任务分发的工作,利用TaskWorker进程进行异步化,最后由TaskWorker进程完成消息投递到RabbitMQ的过程。而这一流程设计,将会极大提高Reactor线程的吞吐量

整个系统中主要分为三大部分

  1. 事件生产者,即产生消息事件的一方。
  2. 任务调度器(Worker),负责注册事件并调度任务。
  3. 任务进程(TaskWorker),负责信息传输

消费者服务设计

架构设计图

待补充

架构设计图消费模式

http callback模式

基于GuzzleHttp框架实现的http callback模式,该模式下主要优缺点:

  • 优点:
    1. 客户端只需要传递回调的url,即可实现异步调用,操作简单
    2. 适合通用服务,不用为每个业务单独开发消费组件,由业务方实现
    3. 超时重试机制
    4. 适合平滑重启机制
  • 缺点:
    1. http模式下,不适合进行长时间处理的任务,例如批处理
public function consumeTask(array $task, array $options = []): bool
{
    
    $noticeObject = $task['msg-payload'];
    $reqNum = count($noticeObject['requests']);
    [$promises, $urlMap] = RequestParse::parse($noticeObject['requests'], $this->config);
    $control = ControlParse::parse(isset($noticeObject['control']) ? $noticeObject['control'] : [], $this->config);
    try {
        $allowRetry = $reqNum == 1 && $control['retryTimes'] > 0;
        $retryCode = $control['retryCode'];
        $retryTimes = (int) $control['retryTimes'];
        $try = -1;
        do {
            $try++;//第几次尝试
            $retryFlag = false;
            try {
                //该行代码会抛出 ConnectException,需要捕获做重试处理
                $results = Promise\unwrap($promises);

                $successMap = [];
                $errorMap = [];

                /**
                 * @var ResponseInterface $result
                 */
                foreach ($results as $index => $result) {
                    $statusCode = (int) $result->getStatusCode();
                    $headers = $result->getHeaders();

                    if ($allowRetry && in_array($statusCode, $retryCode)) {
                        $retryFlag = true;
                    }

                    if (in_array($statusCode, $control['successCode'])) {
                        $successMap[$index] = [
                            'code'   => $statusCode,
                            'header' => $headers,
                        ];
                    } else {
                        $errorMap[$index] = [
                            'code'     => $statusCode,
                            'header'   => $headers,
                            'response' => $result->getBody()->getContents(),
                        ];
                    }
                }
            } catch (ConnectException $connectException) {
                $retryFlag = true;
            }

        } while ($retryFlag && $retryTimes-- > 0);


        return ! $retryFlag;
    } catch (\Exception $ex) {
        return false;
    }
}

cli模式

该模式下,需要针对业务需求进行具体定制,获取到RabbitMQ消息后,进行一些列无超时的逻辑处理,由于保密协议,代码逻辑不方便展示。该模式下主要优缺点:

  • 优点:
    1. 适合长时间运行的任务
  • 缺点:
    1. 需要业务方自行维护代码逻辑,容易造成代码分散
    2. 不适合平滑重启机制,因为长时间运行的任务,重启容易造成数据丢失或失去幂等性
    3. 需要开发者对内存管理有良好的认知,否则容易造成内存泄漏
    4. 容易造成队列堵塞
    //执行结果回调函数
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
       ...
};
$worker = new Worker($callback);
$worker->setQueue('demo');
$worker->run();

基于上面分析,如无特殊要求,一般建议使用http模式。

消费流程

Rabbitmq原始数据获取

private function consume()
{
    $this->channel = $this->mq->channel();
    $this->channel->basic_qos(null, 1, null);
    $customerId = $this->getCustomerId($this->jobConfig->getKafkaGroupId());
    $queueName = $this->getQueueName();
    $this->channel->basic_consume($queueName, $customerId, false, ! $this->needAck, false, false,
        [$this, 'processMessage']);
    while ($this->channel->is_consuming()) {
        $this->channel->wait();
    }
}

执行解释

  • 获取RabbitMQ连接
  • 基于basic_qos模式,保证信息能够正确被消费
  • 把消息发送到processMessage,由此方法执行任务
  • $this->channel->wait()监控消息消费,等待执行完毕

消息体解析

public static function parse(array $request,JobConfig $jobConfig): array
{
    $attCfg = $jobConfig->getAttachConfig();
    $extOptions = [];
    if(isset($attCfg['curlTimeout']) && $attCfg['curlTimeout'] > 0) {
        $extOptions['defaultTimeout'] = (int)$attCfg['curlTimeout'];
    }
    $result = [];
    $urlMap = [];
    $client = new Client();
    foreach($request as $index => $item) {
        if(empty($item['method'])) {
            $item['method'] = "GET";
        }
        if(empty($item['options'])) {
            $item['options'] = [];
        }
        $options = self::parseOptions($item['options'], $extOptions);
        $req = $client->requestAsync($item['method'], $item['url'], $options);
        $result[$index] = $req;
        $urlMap[$index] = $item['url'];
    }
    return [$result, $urlMap];
}

执行解释

  • 通过getAttachConfig获取附加配置,一般为系统配置
  • 获取timeout参数,用于http请求时间限制
  • 通过$client->requestAsync生成GuzzleHttp对象,用于服务调用

消息校验

消息校验是为了避免因为执行错误的数据而导致系统崩溃或业务错乱。业务错乱是最大的原因,因为这有可能导致显式的经济损失,比如价格错误

if(empty($noticeObject['requests']) || !is_array($noticeObject['requests'])) {
            throw new JobException("传递了一个空的请求对象");
}
if(!is_array($item['options'])) {
            throw new JobException("无法解析的头部选项");
        }
if(!is_string($item['method']) || !is_string($item['url'])) {
    throw new JobException("无法处理的");
}

消息执行

用于执行客户端的回调url,核心代码逻辑。

$results = Promise\unwrap($promises);
foreach($results as $index => $result) {
    $statusCode = (int)$result->getStatusCode();
    $headers = $result->getHeaders();
    $url = $urlMap[$index];
    if($allowRetry && in_array($statusCode, $retryCode)) {
        $retryFlag = true;
    }
    
    if(in_array($statusCode, $control['successCode'])) {
        $successMap[$index] = [
            'code'   => $statusCode,
            'header' => $headers,
        ];
    } else {
        $errorMap[$index] = [
            'code'     => $statusCode,
            'header'   => $headers,
            'response' => $result->getBody()->getContents()
        ];
    }
}

消息确认

消息执行完毕后,需要进行消息确认。

$channel->basic_ack($message->delivery_info['delivery_tag']);

消费机制

超时机制

超时机制(timeout)是保护系统的常用手段。一个应用中,常常包含对多方应用的网络调用,每个应用的稳定性都不一样。如果没有超时机制,可能会因为某个应用的调用阻塞,导致当前应用系统发生大量阻塞,其余的任务无法执行的现象。

$defaultTimeout = 30;
if(isset($extOptions['defaultTimeout']) && $extOptions['defaultTimeout'] > 0) {
    $defaultTimeout = (int)$extOptions['defaultTimeout'];
}

$result = [
    "connect_timeout" => $defaultTimeout,
    "read_timeout"    => $defaultTimeout,
    "timeout"         => $defaultTimeout,
];

重试机制

由于业务场景特殊,比如系统繁忙暂时无法服务,需要进行多次重试。该模式下允许业务方传递重试次数,但接口幂等性须自行维护。

$allowRetry = $reqNum == 1 && $control['retryTimes'] > 0;
$retryCode = $control['retryCode'];
$retryTimes = (int)$control['retryTimes'];
$try = -1;        å

确认机制

用于确认消费者是否成功消费了队列中的消息,如果没有确认机制,系统一旦崩溃,可能就会出现数据丢失等问题。

if ($this->needAck && $this->firstAck) {
    $channel->basic_ack($message->delivery_info['delivery_tag']);
}

小结

该消费者设计提供多种消费场景(HTTP与Cli),能够满足大部分业务场景。消费者与生产者高度解耦,拆分为两种应用,生产者负责消息的传递,消费者负责消息的消费,基础与业务隔离。应用可以独自部署,具有非常优秀的横向扩展性,同时极大降低日常维护工作的负担

配置管理

命令行参数

命令行参数指的是在命令行中执行程序时传递的参数,常用于同一个程序执行不同的作用域,比如运行在多个端口上。

$shortOpts = ["port:", "host:", "config::"];
$options = getopt("", $shortOpts);

if(empty($options['port']) || $options['port'] < 1) {
    fwrite(STDERR, "错误的端口号".PHP_EOL);
    return -1;
}
if(empty($options['host'])) {
    fwrite(STDERR, "错误的监听域".PHP_EOL);
    return -1;
}
if(empty($options['config'])) {
    $options['config'] = "config";
}

环境配置

环境变量会影响多个实用程序,函数和应用程序的操作,方便配置集中式维护。

$host = env('host');
$port = env['port'];
$port = env('config');

RabbitMQ配置

基于AMQP 0-9-1(高级消息队列协议),该协议是一种消息传递协议,使一致的客户端应用程序可以与一致的消息传递中间件代理进行通信。

[
    'HOST'    => '',
    'PORT'    => '',
    'USER'    => '',
    'PASS'    => '',
    'VHOST'   => '',
    'options' => [
        'login_method'       => 'AMQPLAIN',
        'locale'             => 'en_US',
        'connection_timeout' => 20.0,
        'read_write_timeout' => 120.0,
        'keepalive'          => false,
        'heartbeat'          => 30
    ],
]        

配置解释

  • HOST:连接地址
  • PORT:连接端口
  • USER:登陆用户
  • PASS:登陆密码
  • VHOST:虚拟主机,用于逻辑分组与资源分离
  • options.login_method:SASL认证机制的可插拔支持
  • options.connection_timeout:连接超时(秒)
  • options.read_write_timeout:读取超时(秒)
  • options.keepalive:像集群里的其他子节点发送存活消息的间隔(毫秒)
  • options.heartbeat:心跳检查,通常用来检测通信的对端是否存活

监控管理

监控的意义

  • 对系统不间断实时监控:实际上是对系统不间断的实时监控
  • 实时反馈系统当前状态:我们监控某个硬件、或者某个系统,都是需要能实时看到当前系统的状态,是正常、异常、或者故障
  • 保证服务可靠性安全性:我们监控的目的就是要保证系统、服务、业务正常运行
  • 保证业务持续稳定运行:如果我们的监控做得很完善,即使出现故障,能第一时间接收到故障报警,在第一时间处理解决,从而保证业务持续性的稳定运行

监控日志

基于Monolog实现的一套日志系统。

switch($config['type']) {
    case "file";
        $this->logger = new FileLogger($config['path'], $level);
        break;
    case "std":
        $this->logger = new StdLogger($level);
        break;
    case "es":
        $this->logger = new EsLogger($level);
        break;
}

Std模式

基于cli实现,该模式下只要输出debug级别的日志,由于debug级别日志非常庞大,不适合常规存储。

File模式

基于文件实现,该模式下适合单机运行的服务,不适合分布下的存储,所以接下来主要介绍Es模式。

Es+Kibana模式

Es(全名ElasticSearch),是一个构建在Apache Lucene之上的开源分布式搜索引擎,主要特点:

  • 基于 JSON 的分布式搜索和分析引擎,专为实现水平扩展、高可靠性和管理便捷性而设计
  • 是一个分布式的 RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况
  • 允许执行和合并多种类型的搜索 ( 结构化、非结构化、地理位置、度量指标 )搜索方式随心而变
    Kibana,和Es是个非常好的搭档,主要特点:
  • 能够以图表的形式呈现数据,并且具有可扩展的用户界面,供您全方位配置和管理
  • 核心搭载了一批经典功能:柱状图、线状图、饼图、旭日图,等等。它们充分利用了 Elasticsearch 的聚合功能

异常告警

报警后的故障一般如何处理?首先,我们可以通过报警升级机制自动处理故障。例如,如果服务宕机,我们可以设置警报升级自动启动。
但是,如果在一般业务中出现严重故障,我们通常会根据故障的级别和故障的业务分配不同的运维人员进行处理。

企业微信机器人

企业微信机器人的功能是指机器人成员可以添加到内部组中,机器人可以在一定的时间或按照一定的规则向组中发送消息,如@group members。企业微信是一款高频办公工具。随着机器人功能的引入,我们可以充分利用微信的强提醒功能,有效的利用机器人来提高工作效率和执行率。此外,它还可以应用到其他一些场景,帮助企业的运营。

邮箱

由于企业微信机器人有2048个字节限制,一般大文本告警信息,需要使用邮箱进行通知。邮箱更多优势在于:

  1. 延时互动
  2. 回复/对话
  3. 存档/回顾

服务部署

基于command模式

方便简单,但运维成本高,常用于本地开发。

php server.php  --host="0.0.0.0" --port="8001" --config="config.dev"

基于Supervisor服务模式

Supervisor是用Python开发的一个通用进程管理器,它可以将一个普通的命令行进程转换为守护进程,监视它的状态,并在它退出时自动重启它。它通过fork/exec来启动作为管理器子进程的被管理进程,这样它就可以简单地将被管理进程的可执行文件的路径写入到管理器配置文件中。它还意识到,当子进程被挂起时,父进程可以准确地获取被挂起子进程的信息,并可以自行选择是否启动报警。所以线上服务我们采用的是此方式进行部署。

[program:server]
command=php server.php  --host="0.0.0.0" --port="8001" --config="config.dev"   
autostart=true        
startsecs=10         
autorestart=true     
startretries=3       
user=tomcat          
priority=999         
redirect_stderr=true 
stdout_logfile_maxbytes=20MB 
stdout_logfile_backups = 20   
stdout_logfile=/opt/logs/catalina.out
stopasgroup=false     
killasgroup=false     

配置解释

  • command:程序启动命令
  • autostart:在supervisord启动的时候也自动启动
  • startsecs:启动10秒后没有异常退出,就表示进程正常启动了,默认为1秒
  • autorestart:程序退出后自动重启,可选值:[unexpected,true,false],默认为unexpected,表示进程意外杀死后才重启
  • startretries:启动失败自动重试次数,默认是3
  • user:用哪个用户启动进程,默认是root
  • priority:进程启动优先级,默认999,值小的优先启动
  • redirect_stderr:把stderr重定向到stdout,默认false
  • stdout_logfile_maxbytes:stdout 日志文件大小,默认50MB
  • stdout_logfile_backups:stdout 日志文件备份数,默认是10
  • stdout_logfile:输出日志路径
  • stopasgroup:默认为false,进程被杀死时,是否向这个进程组发送stop信号,包括子进程
  • killasgroup:默认为false,向进程组发送kill信号,包括子进程

总结

与传统架构的对比

  • 拥有连接池管理
  • 多进程
  • 业务透明化
  • 提高7倍以上性能

现架构存在的缺点

  • 平滑重启发生在WorKer进程中,而config配置文件读取在Master进程中,配置一旦发生变更,需要硬重启服务,造成服务不稳定。

未来的改进计划

  • 容易发生变化的config配置文件读取,放到Worker进程中处理。
 类似资料: