当前位置: 首页 > 工具软件 > php-amqplib > 使用案例 >

easyswoole 使用 php-amqplib/php-amqplib 自定义进程消费出现 SQLSTATE[HY000] [2006] MySQL server has gone away

淳于健
2023-12-01

出现场景
使用 php-amqplib/php-amqplib 自定义进程进行消费,当程序运行1天左右样子再消费会出现 SQLSTATE[HY000] [2006] MySQL server has gone away,再次消费会出现SQLSTATE[HY000] [2002] Connection reset by peer or Transport endpoint is not connected

原因分析
easyswoole 使用的是数据库连接池,php-amqplib/php-amqplib 组件是同步阻塞io,无法触发协成切换,导致自定义进程中获取的mysql 链接脱离了连接池的控制,在时间到达8小时后被mysql 主动切断链接

目前的解决方案
在官方给出解决方案之前,目前的解决方案就是当在消费者中捕获到mysql 链接异常后 直接kill掉当前进程,easyswoole 框架会自动重启一个新的进程,因为rabbitmq 使用的是ack 确认机制,当消费者未提交ack确认信息的情况下,重新启动进程消息依然会被消费,从而不用担心消息丢失的问题
上代码

<?php
namespace App\Process\RabbitMqConsumer;
use App\Models\ClientModel;
use App\Service\LogService;
use App\Service\MqXDelayService;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\EasySwoole\Logger;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPAbstractCollection;
use PhpAmqpLib\Wire\AMQPTable;
class Test extends AbstractProcess
{
    protected function run($arg)
    {
        try {
            $pid = $this->getPid();
            Logger::getInstance()->info("延时队列消费进程===============pid={$pid}");
            $exchange    = 'delay.myExchange';
            $queue       = 'delay.myQueue';
            $config = \EasySwoole\EasySwoole\Config::getInstance()->getConf("rabbitmq");
            $connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'],$config['vhost'], false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
            $channel = $connection->channel();
            $channel->exchange_declare($exchange, 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
            $channel->queue_declare($queue, false, true, false, false);
            $callback = function ($msg) use ($pid){
                $headersObject = $msg->get_properties()['application_headers'];
                try {
                    Logger::getInstance()->info("延时队列消费进程,data={$msg->body}");
                    $data = json_decode($msg->body,true);
                    $msgId = $data['msgId'];
                    if (isset($data['listenerKey']) && isset($data['data']) && isset($data['msgId'])){
                        print_r($data['data']);
                        //消费逻辑......
                        ClientModel::create()->get();
                        Logger::getInstance()->info("完成消费");
                        //响应ack、代表本条消息被正常消费完毕
                        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                    }
                }catch (\Throwable $e){
                    $message = $e->getMessage();
                    Logger::getInstance()->error("延时队列消费异常:".$message);
                    Logger::getInstance()->error("延时队列消费异常:".$e->getTraceAsString());
                    if (strrpos(strtoupper($message),'SQLSTATE') !== false){
                        \Co::sleep(1);
                        Logger::getInstance()->error("数据库异常:".$message);
                        $cmd = "php easyswoole process kill --pid={$pid} -f";
                        Logger::getInstance()->error("已完成进程重启cmd:\r\n".$cmd);
                        shell_exec($cmd);
                    }
                    //todo 邮件通知管理员
                }
            };
            $channel->basic_qos(null, 1, null);
            $channel->basic_consume($queue, '', false, false, false, false, $callback);
            while (count($channel->callbacks)) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }catch (\Throwable $exception){
            Logger::getInstance()->error("延时队列异常:".$exception->getMessage());
            Logger::getInstance()->error("延时队列异常:".$exception->getTraceAsString());
        }
    }
}

 类似资料: