基础进阶 - 自定义进程

优质
小牛编辑
132浏览
2023-12-01

EasySwoole中支持添加用户自定义的swoole process。

抽象父类

任何的自定义进程,都应该继承自EasySwoole\Core\Swoole\Process\AbstractProcess,
AbstractProcess实现代码如下:

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: yf
  5. * Date: 2018/1/17
  6. * Time: 上午11:28
  7. */
  8. namespace EasySwoole\Core\Swoole\Process;
  9. use EasySwoole\Core\Swoole\Memory\TableManager;
  10. use EasySwoole\Core\Swoole\ServerManager;
  11. use EasySwoole\Core\Swoole\Time\Timer;
  12. use Swoole\Process;
  13. abstract class AbstractProcess
  14. {
  15. private $swooleProcess;
  16. private $processName;
  17. private $async = null;
  18. private $args = [];
  19. function __construct(string $processName,$async = true,array $args)
  20. {
  21. $this->async = $async;
  22. $this->args = $args;
  23. $this->processName = $processName;
  24. $this->swooleProcess = new \swoole_process([$this,'__start'],false,2);
  25. ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);
  26. }
  27. public function getProcess():Process
  28. {
  29. return $this->swooleProcess;
  30. }
  31. /*
  32. * 仅仅为了提示:在自定义进程中依旧可以使用定时器
  33. */
  34. public function addTick($ms,callable $call):?int
  35. {
  36. return Timer::loop(
  37. $ms,$call
  38. );
  39. }
  40. public function clearTick(int $timerId)
  41. {
  42. Timer::clear($timerId);
  43. }
  44. public function delay($ms,callable $call):?int
  45. {
  46. return Timer::delay(
  47. $ms,$call
  48. );
  49. }
  50. /*
  51. * 服务启动后才能获得到pid
  52. */
  53. public function getPid():?int
  54. {
  55. if(isset($this->swooleProcess->pid)){
  56. return $this->swooleProcess->pid;
  57. }else{
  58. $key = md5($this->processName);
  59. $pid = TableManager::getInstance()->get('process_hash_map')->get($key);
  60. if($pid){
  61. return $pid['pid'];
  62. }else{
  63. return null;
  64. }
  65. }
  66. }
  67. function __start(Process $process)
  68. {
  69. if(PHP_OS != 'Darwin'){
  70. $process->name($this->getProcessName());
  71. }
  72. TableManager::getInstance()->get('process_hash_map')->set(
  73. md5($this->processName),['pid'=>$this->swooleProcess->pid]
  74. );
  75. ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
  76. if (extension_loaded('pcntl')) {
  77. pcntl_async_signals(true);
  78. }
  79. Process::signal(SIGTERM,function ()use($process){
  80. $this->onShutDown();
  81. TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName));
  82. swoole_event_del($process->pipe);
  83. $this->swooleProcess->exit(0);
  84. });
  85. if($this->async){
  86. swoole_event_add($this->swooleProcess->pipe, function(){
  87. $msg = $this->swooleProcess->read(64 * 1024);
  88. $this->onReceive($msg);
  89. });
  90. }
  91. $this->run($this->swooleProcess);
  92. }
  93. public function getArgs():array
  94. {
  95. return $this->args;
  96. }
  97. public function getProcessName()
  98. {
  99. return $this->processName;
  100. }
  101. public abstract function run(Process $process);
  102. public abstract function onShutDown();
  103. public abstract function onReceive(string $str,...$args);
  104. }

进程管理器

ProcessManager,实现代码如下:

  1. namespace EasySwoole\Core\Swoole\Process;
  2. use EasySwoole\Core\AbstractInterface\Singleton;
  3. use EasySwoole\Core\Swoole\Memory\TableManager;
  4. use EasySwoole\Core\Swoole\ServerManager;
  5. use Swoole\Table;
  6. class ProcessManager
  7. {
  8. use Singleton;
  9. private $processList = [];
  10. function __construct()
  11. {
  12. TableManager::getInstance()->add(
  13. 'process_hash_map',[
  14. 'pid'=>[
  15. 'type'=>Table::TYPE_INT,
  16. 'size'=>10
  17. ]
  18. ],256
  19. );
  20. }
  21. public function addProcess(string $processName,string $processClass,$async = true,array $args = []):bool
  22. {
  23. if(ServerManager::getInstance()->isStart()){
  24. trigger_error('you can not add a process after server start');
  25. return false;
  26. }
  27. $key = md5($processName);
  28. if(!isset($this->processList[$key])){
  29. try{
  30. $process = new $processClass($processName,$async,$args);
  31. $this->processList[$key] = $process;
  32. return true;
  33. }catch (\Throwable $throwable){
  34. trigger_error($throwable->getMessage().$throwable->getTraceAsString());
  35. return false;
  36. }
  37. }else{
  38. trigger_error('you can not add the same name process : '.$processName);
  39. return false;
  40. }
  41. }
  42. public function getProcessByName(string $processName):?AbstractProcess
  43. {
  44. $key = md5($processName);
  45. if(isset($this->processList[$key])){
  46. return $this->processList[$key];
  47. }else{
  48. return null;
  49. }
  50. }
  51. public function getProcessByPid(int $pid):?AbstractProcess
  52. {
  53. $table = TableManager::getInstance()->get('process_hash_map');
  54. foreach ($table as $key => $item){
  55. if($item['pid'] == $pid){
  56. return $this->processList[$key];
  57. }
  58. }
  59. return null;
  60. }
  61. public function setProcess(string $processName,AbstractProcess $process)
  62. {
  63. $key = md5($processName);
  64. $this->processList[$key] = $process;
  65. }
  66. public function reboot(string $processName):bool
  67. {
  68. $p = $this->getProcessByName($processName);
  69. if($p){
  70. \swoole_process::kill($p->getPid(),SIGTERM);
  71. return true;
  72. }else{
  73. return false;
  74. }
  75. }
  76. public function writeByProcessName(string $name,string $data):bool
  77. {
  78. $process = $this->getProcessByName($name);
  79. if($process){
  80. return (bool)$process->getProcess()->write($data);
  81. }else{
  82. return false;
  83. }
  84. }
  85. public function readByProcessName(string $name,float $timeOut = 0.1):?string
  86. {
  87. $process = $this->getProcessByName($name);
  88. if($process){
  89. $process = $process->getProcess();
  90. $read = array($process);
  91. $write = [];
  92. $error = [];
  93. $ret = swoole_select($read, $write,$error, $timeOut);
  94. if($ret){
  95. return $process->read(64 * 1024);
  96. }else{
  97. return null;
  98. }
  99. }else{
  100. return null;
  101. }
  102. }
  103. }

实例

我们以demo中的自定义进程例子来说明:

  1. namespace App\Process;
  2. use EasySwoole\Core\Swoole\Process\AbstractProcess;
  3. use Swoole\Process;
  4. class Test extends AbstractProcess
  5. {
  6. //进程start的时候会执行的事件
  7. public function run(Process $process)
  8. {
  9. // TODO: Implement run() method.
  10. //添加进程内定时器
  11. $this->addTick(2000,function (){
  12. var_dump('this is '.$this->getProcessName().' process tick');
  13. });
  14. }
  15. //当进程关闭的时候会执行该事件
  16. public function onShutDown()
  17. {
  18. // TODO: Implement onShutDown() method.
  19. }
  20. //当有信息发给该进程的时候,会执行此进程
  21. public function onReceive(string $str, ...$args)
  22. {
  23. // TODO: Implement onReceive() method.
  24. var_dump('process rec'.$str);
  25. }
  26. }

以上代码直达连接
至于如何使用(测试),请见demo中的EasySwooleEvent.php