前文提到的在系统设置Cache组件 Cache::getInstance()的时候,会去调用processManager去创建Cache的进程,然后以管道通信的方式进行设置缓存和获取缓存。
Cache是以单例模式实现的。构造器会进行如下操作
//根据配置创建指定数目的Cache服务进程,然后启动。 $num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默认配置数目是1,在Config.php里'EASY_CACHE.PROCESS_NUM'=>1 if($num <= 0){ return; } $this->cliTemp = new SplArray();//这个数组以后会给单元测试时候单独使用,正常模式这个数组是不使用的 //若是在主服务创建,而非单元测试调用 if(ServerManager::getInstance()->getServer()){ //创建了一个swoole_table ,表名为__Cache,里面存储data(后面就讲到其实这里存储的是操作Cache的指令)作用是用来做GC(防止Cache被撑爆) TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[ 'data'=>[ 'type'=>Table::TYPE_STRING, 'size'=>10*1024 ], 'microTime'=>[ 'type'=>Table::TYPE_STRING, 'size'=>15 ] ],2048); $this->processNum = $num; for ($i=0;$i < $num;$i++){ ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class); } }
ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)这句话才是Cache的核心逻辑。
ProcessManager::getInstance()这句话主要做了下面的操作
ProcessManager 的__construct构造函数创建了一个swoole_table,表名是process_hash_map
TableManager::getInstance()->add( 'process_hash_map',[ 'pid'=>[ 'type'=>Table::TYPE_INT, 'size'=>10 ] ],256 );
addProcess($this->generateProcessName($i),CacheProcess::class);
$this->generateProcessName($i)这个代码很简单就是根据$i来设置进程名称
addProcess 是在processList存储CacheProcess::class的实例,具体代码如下
$key = md5($processName); if(!isset($this->processList[$key])){ try{ $process = new $processClass($processName,$args,$async); $this->processList[$key] = $process; return true; }catch (\Throwable $throwable){ Trigger::throwable($throwable); return false; } }else{ trigger_error("you can not add the same name process : {$processName}.{$processClass}"); return false; }
那么CacheProcess::class的实例话做了什么操作呢
$this->cacheData = new SplArray();//这里很关键,为什么这么说每个Cache进程实际保存的缓存值都是在这里的,每个Cache进程都有自己的一个cacheData数组
$this->persistentTime = Config::getInstance()->getConf('EASY_CACHE.PERSISTENT_TIME');
parent::__construct($processName, $args);
CacheProcess::class继承于AbstractProcess
AbstractProcess的构造方法
$this->async = $async; $this->args = $args; $this->processName = $processName; $this->swooleProcess = new \swoole_process([$this,'__start'],false,2); ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服务会addProcess一个Cache的任务进程。
__start方法主要是给swoole_table,表名为process_hash_map插入当前CacheProcess的进程名为key,进程IDpid为value。并且注册进程退出的事件。
if(PHP_OS != 'Darwin'){ $process->name($this->getProcessName()); } TableManager::getInstance()->get('process_hash_map')->set( md5($this->processName),['pid'=>$this->swooleProcess->pid] ); ProcessManager::getInstance()->setProcess($this->getProcessName(),$this); if (extension_loaded('pcntl')) { pcntl_async_signals(true); } Process::signal(SIGTERM,function ()use($process){ $this->onShutDown(); TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName)); swoole_event_del($process->pipe); $this->swooleProcess->exit(0); }); if($this->async){ swoole_event_add($this->swooleProcess->pipe, function(){ $msg = $this->swooleProcess->read(64 * 1024); $this->onReceive($msg); }); } $this->run($this->swooleProcess);
$this->run($this->swooleProcess)这个函数是CacheProcess如果配置了persistentTime,就会开启一个定时器定时去取$file = Config::getInstance()->getConf('TEMP_DIR')."/{$processName}.data";的数据备份,默认是0也就是不会去做定时数据落地的操作
看到这里才是Cache组件在第一次实例化的时候做的相关事情,总结就是创建了指定数量的Cache进程绑定到swoole服务器上。在全局的process_hash_map表中能找到对应的Cache进程ID。然后Cache进程是可以以管道方式来进行通信。
set缓存方法
public function set($key,$data) { if(!ServerManager::getInstance()->isStart()){ $this->cliTemp->set($key,$data); } if(ServerManager::getInstance()->getServer()){ $num = $this->keyToProcessNum($key); $msg = new Msg(); $msg->setCommand('set'); $msg->setArg('key',$key); $msg->setData($data); ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(\swoole_serialize::pack($msg));//直接把需要缓存的数据,封装成msg然后write给hash映射到的Cache进程 } }
当进程获取到的时候会回调onReceive方法
public function onReceive(string $str,...$agrs) { // TODO: Implement onReceive() method. $msg = \swoole_serialize::unpack($str); $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME); if(count($table) > 1900){ //接近阈值的时候进行gc检测 //遍历Table 依赖pcre 如果发现无法遍历table,检查机器是否安装pcre-devel //超过0.1s 基本上99.99%为无用数据。 $time = microtime(true); foreach ($table as $key => $item){ if(round($time - $item['microTime']) > 0.1){ $table->del($key); } } } if($msg instanceof Msg){ switch ($msg->getCommand()){ case 'set':{ $this->cacheData->set($msg->getArg('key'),$msg->getData()); break; } case 'get':{ $ret = $this->cacheData->get($msg->getArg('key')); $msg->setData($ret); $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); break; } case 'del':{ $this->cacheData->delete($msg->getArg('key')); break; } case 'flush':{ $this->cacheData->flush(); break; } case 'enQueue':{ $que = $this->cacheData->get($msg->getArg('key')); if(!$que instanceof \SplQueue){ $que = new \SplQueue(); $this->cacheData->set($msg->getArg('key'),$que); } $que->enqueue($msg->getData()); break; } case 'deQueue':{ $que = $this->cacheData->get($msg->getArg('key')); if(!$que instanceof \SplQueue){ $que = new \SplQueue(); $this->cacheData->set($msg->getArg('key'),$que); } $ret = null; if(!$que->isEmpty()){ $ret = $que->dequeue(); } $msg->setData($ret); //deQueue 有cli 服务未启动的请求,但无token if(!empty($msg->getToken())){ $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); } break; } case 'queueSize':{ $que = $this->cacheData->get($msg->getArg('key')); if(!$que instanceof \SplQueue){ $que = new \SplQueue(); } $msg->setData($que->count()); $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); break; } } } }
这里一开始会进行缓存GC确保内存不会撑爆
set方法会直接给$this->cacheData,设置缓存值。
get方法比较特殊,它会去给Cache进程发送get的命令,然后Cache读取到命令会将值写到_Cache,Swoole_table表中。然后再去读取(这个会有一个while循环,类似自旋)出缓存内容。这样的好处,可以确保可以读取到当时的数据缓存,不会因为高并发读取到最新的缓存值内容。而且还能更有效的做gc,防止Cache内存撑爆。
public function get($key,$timeOut = 0.01) { if(!ServerManager::getInstance()->isStart()){ return $this->cliTemp->get($key); } $num = $this->keyToProcessNum($key); $token = Random::randStr(9);//这个是一个凭证,是确保获取到自己此刻想获取的cache数据,和事务类似为了保证可重复读 $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num)); $msg = new Msg(); $msg->setArg('timeOut',$timeOut); $msg->setArg('key',$key); $msg->setCommand('get'); $msg->setToken($token); $process->getProcess()->write(\swoole_serialize::pack($msg)); return $this->read($token,$timeOut); }
$process->getProcess()->write(\swoole_serialize::pack($msg))发这个包给Cache进程,Cache进程会进行下面这些操作
$ret = $this->cacheData->get($msg->getArg('key'));//获取到当前的缓存值 $msg->setData($ret); //将当前的内容设置到_Cache表中,token是请求的时候发过来的凭证原样拼装。这有什么好处呢,就是确保在高并发下,在A时刻获取的缓存,不会拿到后面B时刻更新的值。 $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); $this->read($token,$timeOut);
//这里的操作是直接从_Cache表中获取缓存数据,如果缓存存在并且进程调度没有超时,然后在表中将取过数据的内容删除掉返回 private function read($token,$timeOut) { $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME); $start = microtime(true); $data = null; while(true){ usleep(1); if($table->exist($token)){ $data = $table->get($token)['data']; $data = \swoole_serialize::unpack($data); if(!$data instanceof Msg){ $data = null; } break; } if(round($start - microtime(true),3) > $timeOut){ break; } } $table->del($token); if($data){ return $data->getData(); }else{ return null; } }