其他 - 如何实现队列消费/自定义进程

优质
小牛编辑
142浏览
2023-12-01

可能我们会经常遇见需要不断消费队列内内容的场景,我们以EasySwoole中自定义进程的方式,来实现这一功能。

实现代码

定义消费进程逻辑

  1. namespace App;
  2. use EasySwoole\Core\Swoole\Process\AbstractProcess;
  3. use Swoole\Process;
  4. class Consumer extends AbstractProcess
  5. {
  6. private $isRun = false;
  7. public function run(Process $process)
  8. {
  9. // TODO: Implement run() method.
  10. /*
  11. * 举例,消费redis中的队列数据
  12. * 定时500ms检测有没有任务,有的话就while死循环执行
  13. */
  14. $this->addTick(500,function (){
  15. if(!$this->isRun){
  16. $this->isRun = true;
  17. $redis = new \redis();//此处为伪代码,请自己建立连接或者维护redis连接
  18. while (true){
  19. try{
  20. $task = $redis->lPop('task_list');
  21. if($task){
  22. // do you task
  23. }else{
  24. break;
  25. }
  26. }catch (\Throwable $throwable){
  27. break;
  28. }
  29. }
  30. $this->isRun = false;
  31. }
  32. var_dump($this->getProcessName().' task run check');
  33. });
  34. }
  35. public function onShutDown()
  36. {
  37. // TODO: Implement onShutDown() method.
  38. }
  39. public function onReceive(string $str, ...$args)
  40. {
  41. // TODO: Implement onReceive() method.
  42. }
  43. }

注册消费进程

在EasySwoole的全局事件中,注册消费进程。

  1. use App\Consumer;
  2. use EasySwoole\Core\Swoole\Process\ProcessManager;
  3. use \EasySwoole\Core\Swoole\ServerManager;
  4. public static function mainServerCreate(ServerManager $server,EventRegister $register): void
  5. {
  6. // TODO: Implement mainServerCreate() method.
  7. $allNum = 3;
  8. for ($i = 0 ;$i < $allNum;$i++){
  9. ProcessManager::getInstance()->addProcess("consumer_{$i}",Consumer::class);
  10. }
  11. }

爬虫例子:https://github.com/easy-swoole/spider