EasySwoole 搭建消息推送服务07 - Crontab/TaskManager

季嘉良
2023-12-01

完成了基本功能开发以后,一个更加急迫需要解决的问题摆在面前:消息的推送、存储和准确性、失败重跑,En。

  • DB 使用异步处理性能会变得更快
  • list -> Mysql 用来检测DB处理失败的情况,已达到最终一致性
  • 对于消息推送失败的场景下,定时检测,重新推送

1.DB异步,异步处理是一个提高性能常用的方式和方法。

namespace App\WebSocketController\V1;

use EasySwoole\EasySwoole\Task\TaskManager;
use App\Models\PushMsgModel;

class PcMessage extends Base implements iMessageController
{

   public function login()
   {
    //异步写入MySql
    TaskManager::getInstance()->async(function () use ( $pushMsg ){
        $model = PushMsgModel::create($pushMsg);
        $model->save();
    });
   }
}

TaskManager采用的是闭包的方式和方法,传递参数使用use,需要注意的是不能传递对象,无法执行序列化的操作,产生"Class 'App\Utility\Task\ServerManager' not found"的错误信息。

2.Crontab,检测失败的数据,已达到最终一致性。

1.mainServerCreate,注册Crontab组件,文件位置:/Users/stark/Code/www/cpmessage/EasySwooleEvent.php

Crontab::getInstance()->addTask(\App\Crontab\PushMsgSyncData::class);

2.在注册的指定位置,写具体处理数据的逻辑

<?php
/**
 * 对失败的数据,重跑
 */
namespace App\Crontab;

use EasySwoole\EasySwoole\Crontab\AbstractCronTask;
use App\Models\PushMsgModel;

class PushMsgSyncData extends AbstractCronTask
{

    public static function getRule(): string
    {
        return '*/1 * * * *';
    }

    public static function getTaskName(): string
    {
        return  'PushMsgSyncData';
    }

    function run(int $taskId, int $workerIndex)
    {

        echo 'run 方法里处理具体的代码处理逻辑'.PHP_EOL;
    }

    function onException(\Throwable $throwable, int $taskId, int $workerIndex)
    {
        echo $throwable->getMessage();
    }
}

添加后,重启服务,执行命令 php easyswoole server -mode=websocket start,其他也是类似处理,就不做过多的赘述了。

3.关于任务循环的开发技巧

Crontab执行任务的痛点在于一次循环周星内,是否能处理完成对逻辑数据任务的处理,这是接下来要思考的问题。

在本次分享的笔记里是关于循环的结束,假设每次循环周日执行的时间正好,在循环体里一定会设置一个停止的条件,再说一句题外话,如果执行一个事务过程中,如果不符合条件的需要及时rollback,释放资源。

$server = ServerManager::getInstance()->getSwooleServer();
$start_fd = 0;
while (true) {
    $conn_list = $server->getClientList($start_fd, 10);
    if ($conn_list === false or count($conn_list) === 0) {
        echo "finish\n";
        break;
    }
    $start_fd = end($conn_list);
    echo '链接返回值:'.json_encode($conn_list).PHP_EOL;
    $noce_ack = $this->getNoceAck();

    $pushMsg = [
        "uid" => 0,
        "msg_type" => 6,
        "code" => 200,
        "msg" => 'SUCCESS',
        "body" => $msgInfo,
        'noce_ack' => $noce_ack
    ];

    foreach ($conn_list as $fd) {
        $server->push($fd, json_encode($pushMsg));
        echo 'fd的值:'.$fd.'推送信息为:'.json_encode($conn_list).PHP_EOL;
    }
}

接下来要优化的点:

  • 消息存储按Uid对128取模,对全量消息数据进行单独处理
  • 怎么对ws进行压测
  • 怎么在Nginx上进行部署,实现高可用
  • 怎么把Uid/Fd关系进行持久化、容灾、降级和断融处理
 类似资料: