示例代码 - 多进程爬虫

优质
小牛编辑
143浏览
2023-12-01

EasySwoole利用redis队列+定时器+task进程实现的一个多进程爬虫。直接上代码

添加Redis配置信息

修改配置文件,添加Redis配置

  1. "REDIS"=>array(
  2. "HOST"=>'',
  3. "PORT"=>6379,
  4. "AUTH"=>""
  5. )

封装Redis

  1. namespace AppUtilityDb;
  2. use ConfConfig;
  3. class Redis
  4. {
  5. private $con;
  6. protected static $instance;
  7. protected $tryConnectTimes = 0;
  8. protected $maxTryConnectTimes = 3;
  9. function __construct()
  10. {
  11. $this->connect();
  12. }
  13. function connect(){
  14. $this->tryConnectTimes++;
  15. $conf = Config::getInstance()->getConf("REDIS");
  16. $this->con = new Redis();
  17. $this->con->connect($conf['HOST'], $conf['PORT'],2);
  18. $this->con->auth($conf['AUTH']);
  19. if(!$this->ping()){
  20. if($this->tryConnectTimes <= $this->maxTryConnectTimes){
  21. return $this->connect();
  22. }else{
  23. trigger_error("redis connect fail");
  24. return null;
  25. }
  26. }
  27. $this->con->setOption(Redis::OPT_SERIALIZER,Redis::SERIALIZER_PHP);
  28. }
  29. static function getInstance(){
  30. if(is_object(self::$instance)){
  31. return self::$instance;
  32. }else{
  33. self::$instance = new Redis();
  34. return self::$instance;
  35. }
  36. }
  37. function rPush($key,$val){
  38. try{
  39. return $this->con->rpush($key,$val);
  40. // return $ret;
  41. }catch(Exception $e){
  42. $this->connect();
  43. if($this->tryConnectTimes <= $this->maxTryConnectTimes){
  44. return $this->rPush($key,$val);
  45. }else{
  46. return false;
  47. }
  48. }
  49. }
  50. function lPop($key){
  51. try{
  52. return $this->con->lPop($key);
  53. }catch(Exception $e){
  54. $this->connect();
  55. if($this->tryConnectTimes <= $this->maxTryConnectTimes){
  56. return $this->lPop($key);
  57. }else{
  58. return false;
  59. }
  60. }
  61. }
  62. function lSize($key){
  63. try{
  64. $ret = $this->con->lSize($key);
  65. return $ret;
  66. }catch(Exception $e){
  67. $this->connect();
  68. if($this->tryConnectTimes <= $this->maxTryConnectTimes){
  69. return $this->lSize($key);
  70. }else{
  71. return false;
  72. }
  73. }
  74. }
  75. function getRedisConnect(){
  76. return $this->con;
  77. }
  78. function ping(){
  79. try{
  80. $ret = $this->con->ping();
  81. if(!empty($ret)){
  82. $this->tryConnectTimes = 0;
  83. return true;
  84. }else{
  85. return false;
  86. }
  87. }catch(Exception $e){
  88. return false;
  89. }
  90. }
  91. }

定义SysConst

  1. namespace AppUtility;
  2. class SysConst extends CoreComponentSysConst
  3. {
  4. const TASK_RUNNING_NUM = 'TASK_RUNNING_NUM';
  5. }

封装队列

  1. namespace AppModel;
  2. use AppUtilityDbRedis;
  3. class Queue
  4. {
  5. const QUEUE_NAME = 'task_list';
  6. static function set(TaskBean $taskBean){
  7. return Redis::getInstance()->rPush(self::QUEUE_NAME,$taskBean);
  8. }
  9. static function pop(){
  10. return Redis::getInstance()->lPop(self::QUEUE_NAME);
  11. }
  12. }

封装TaskBean

  1. namespace AppModel;
  2. use CoreComponentSplSplBean;
  3. class TaskBean extends SplBean
  4. {
  5. /*
  6. * 仅仅做示例,curl opt 选项请自己写
  7. */
  8. protected $url;
  9. /**
  10. * @return mixed
  11. */
  12. public function getUrl()
  13. {
  14. return $this->url;
  15. }
  16. /**
  17. * @param mixed $url
  18. */
  19. public function setUrl($url)
  20. {
  21. $this->url = $url;
  22. }
  23. protected function initialize()
  24. {
  25. // TODO: Implement initialize() method.
  26. }
  27. }

封装异步执行模型

  1. namespace AppModel;
  2. use AppUtilitySysConst;
  3. use CoreAbstractInterfaceAbstractAsyncTask;
  4. use CoreComponentLogger;
  5. use CoreComponentShareMemory;
  6. use CoreUtilityCurlRequest;
  7. class Runner extends AbstractAsyncTask
  8. {
  9. function handler(swoole_server $server, $taskId, $fromId)
  10. {
  11. // TODO: Implement handler() method.
  12. //记录处于运行状态的task数量
  13. $share = ShareMemory::getInstance();
  14. $share->startTransaction();
  15. $share->set(SysConst::TASK_RUNNING_NUM,$share->get(SysConst::TASK_RUNNING_NUM)+1);
  16. $share->commit();
  17. //while其实为危险操作,while会剥夺进程控制权
  18. while (true){
  19. $task = Queue::pop();
  20. if($task instanceof TaskBean){
  21. $req = new Request($task->getUrl());
  22. $ret = $req->exec()->getBody();
  23. Logger::getInstance("curl")->console("finish url:".$task->getUrl());
  24. }else{
  25. break;
  26. }
  27. }
  28. // Logger::getInstance()->console("async task exit");
  29. $share->startTransaction();
  30. $share->set(SysConst::TASK_RUNNING_NUM,$share->get(SysConst::TASK_RUNNING_NUM)-1);
  31. $share->commit();
  32. }
  33. function finishCallBack(swoole_server $server, $task_id, $resultData)
  34. {
  35. // TODO: Implement finishCallBack() method.
  36. }
  37. }

注册事件

在Conf/Event.php中

  • 在启动前先清理共享内存信息。
    1. function frameInitialized()
    2. {
    3. // TODO: Implement frameInitialized() method.
    4. ShareMemory::getInstance()->clear();
    5. }
  • 注册定时器,定时去获取任务
    1. function onWorkerStart(swoole_server $server, $workerId)
    2. {
    3. // TODO: Implement onWorkerStart() method.
    4. //为第一个进程添加唤起任务执行的定时器;
    5. if($workerId == 0){
    6. Timer::loop(1000,function (){
    7. $share = ShareMemory::getInstance();
    8. //请勿使得所有worker全部处于繁忙状态 危险操作
    9. if($share->get(SysConst::TASK_RUNNING_NUM) < 2){
    10. AsyncTaskManager::getInstance()->add(Runner::class);
    11. }
    12. });
    13. }
    14. }

    任务投递控制器

    任意建立一个控制器,添加以下两个方法。
    1. function addTask(){
    2. $url = $this->request()->getRequestParam("url");
    3. if(empty($url)){
    4. $url = 'http://wiki.swoole.com/';
    5. }
    6. $bean = new TaskBean();
    7. $bean->setUrl($url);
    8. //做异步投递
    9. AsyncTaskManager::getInstance()->add(function ()use($bean){
    10. Queue::set($bean);
    11. });
    12. $this->response()->writeJson(200,null,"任务投递成功");
    13. }
    14. function status(){
    15. $num = ShareMemory::getInstance()->get(SysConst::TASK_RUNNING_NUM);
    16. $this->response()->writeJson(200,array(
    17. "taskRuningNum"=>$num
    18. ));
    19. }

执行

启动EasySwoole,访问addTask方法往Redis队列中添加任务,并等待执行结果。

本例子仅供参考,未做详尽错误处理。