框架进阶 - 异步进程

优质
小牛编辑
131浏览
2023-12-01

EasySwoole支持在定时器、控制器处理中等多处位置使用异步进程。
CoreSwooleAsyncTaskManager是对Swoole Task的封装实现。

AbstractAsyncTask

CoreAbstractInterfaceAbstractAsyncTask 定义了异步任务的接口实现,一个异步任务对象都应当基础AbstractAsyncTask。

  1. class Task extends AbstractAsyncTask{
  2. function handler(swoole_server $server, $taskId, $fromId)
  3. {
  4. // TODO: Implement handler() method.
  5. for($i=0;$i<=5;$i++){
  6. sleep(1);
  7. }
  8. Logger::getInstance()->log('task finish');
  9. }
  10. function finishCallBack(swoole_server $server, $task_id, $resultData)
  11. {
  12. // TODO: Implement finishCallBack() method.
  13. }
  14. }

添加一个异步任务

  1. //CoreSwooleAsyncTaskManager
  2. AsyncTaskManager::getInstance()->add(function (){
  3. sleep(1);
  4. Logger::getInstance()->console("async task run");
  5. }
  6. );
  7. //或者
  8. AsyncTaskManager::getInstance()->add(Task::class);
  9. //执行回调
  10. AsyncTaskManager::getInstance()->add(function (){
  11. Logger::getInstance()->console("task");
  12. return 'ret';//注意return
  13. },-1,function (){
  14. Logger::getInstance()->console("task finish");
  15. }
  16. );

多个任务并发执行

CoreComponentBarrier 是对SWoole taskWaitMulti的封装实现。

  1. $barrier = new Barrier();
  2. $barrier->add("a",function (){
  3. usleep(50000);
  4. return time();
  5. }
  6. );
  7. $barrier->add("b",function (){
  8. sleep(2);
  9. return time();
  10. }
  11. );
  12. $barrier->add("c",function (){
  13. usleep(50000);
  14. return time();
  15. }
  16. );
  17. $result = $barrier->run(1);

注意:Barrier为阻塞等待执行,所有的任务会被分发到不同Task进程同步执行,
直到所有的任务执行结束或超时才返回全部结果。以上代码中,限制了三个任务的最大执行时间为1秒,
仅有a、b两个任务能够被执行并返回结果。

使用示例

在控制器中使用

  1. class Index extends AbstractController
  2. {
  3. function index()
  4. {
  5. // TODO: Implement index() method.
  6. AsyncTaskManager::getInstance()->add(function (){
  7. sleep(3);
  8. Logger::getInstance()->console("task finish");
  9. });
  10. $this->response()->writeJson(Status::CODE_OK,null,'任务已经提交');
  11. }
  12. function onRequest($actionName)
  13. {
  14. // TODO: Implement onRequest() method.
  15. }
  16. function actionNotFound($actionName = null, $arguments = null)
  17. {
  18. // TODO: Implement actionNotFound() method.'
  19. $this->response()->withStatus(Status::CODE_NOT_FOUND);
  20. }
  21. function afterAction()
  22. {
  23. // TODO: Implement afterAction() method.
  24. }
  25. function auth(){
  26. }
  27. }

Event.php workerStart的定时器中使用

  1. function onWorkerStart(swoole_server $server, $workerId)
  2. {
  3. // TODO: Implement onWorkerStart() method.
  4. if($workerId == 0){
  5. Timer::loop(110000,function (){
  6. AsyncTaskManager::getInstance()->add(function (){
  7. sleep(3);
  8. Logger::getInstance()->console("task finish");
  9. });
  10. });
  11. }
  12. }