当前位置: 首页 > 工具软件 > php-amqplib > 使用案例 >

laravel6 使用php-amqplib包处理App中的埋点问题

梁季
2023-12-01
<?php
/**
 * 埋点服务
 * User: shiyf
 * Date: 2022/5/30
 * Time: 17:17
 */

namespace App\Services;

require_once __DIR__ .'/../../vendor/autoload.php';


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;


class TrackService
{
    private $connection = null;
    private $channel = null;
    private $exchange_name = '';
    private $queue_name = '';
    private $route_key = '';

    public function construct()
    {

    }

    /**
     * 有配送费的订单支付成功后,生成快递任务
     * @param $params
     * @return bool
     */
    public function write($params)
    {
        $this->send(json_encode($params));
        return true;
    }

    private function send($content)
    {
        $this->initial();
        //声明一个交换器
        $exchange = $this->channel->exchange_declare($this->exchange_name,'direct',false,true,true);

        //定义一个队列
        $queue = $this->channel->queue_declare($this->queue_name, false, true, false, true);
        //将队列和交换器通过路由key绑定起来
        $ret = $this->channel->queue_bind($this->queue_name, $this->exchange_name, $this->route_key);

        //生产消息
        $msg = new AMQPMessage($content);
        $this->channel->basic_publish($msg, $this->exchange_name, $this->route_key);
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 初始化数据库连接
     */
    private function initial()
    {
        $config = config('services.rabbitmq');
        $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
        $this->channel = $this->connection->channel();
        $this->exchange_name = $config['track_exch'];
        $this->queue_name = $config['track_queue'];
        $this->route_key = $config['track_route'];
    }


    /**
     * 消息的消费方法
     */
    public function consume()
    {
        $this->initial();
        //定义一个队列
        $queue = $this->channel->queue_declare($this->queue_name, false, true, false, true);
        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
        $callback = function ($msg) {
            $now = date('Y-m-d H:i:s');
            if ($msg instanceOf AMQPMessage) {
                $msg->ack();
                echo "{$now} Received ", $msg->body, "\n";
                $data = json_decode($msg->body, true);
                $ret = $this->process($data);
                if (!$ret) {
                    $this->processFail($data);
                }
            } else {
                exit(-1);
            }
        };
        $this->channel->basic_consume($this->queue_name, '', false, false, false, false, $callback);
        while (count($this->channel->callbacks)) {
            echo "inner while \n";
            try {
                $this->channel->wait();
            } catch (\Exception $e) {
                echo 'exception:'.$e->getMessage().'|'.$e->getCode()."\n";
                $this->channel->close();
                $this->connection->close();
                exit(-1);
            }
        }
        $this->channel->close();
        $this->connection->close();
    }


    /**
     * 处理获取的消息内容
     * @param $params
     * @return bool
     */
    private function process($params)
    {
        try {
            $table = 'evt_' . $params['event_code'];
            unset($params['event_code']);
            $userId = $params['user_id'];
            $uuid = $params['uuid'];
            $s_date = $params['s_date'];
            $builder = DB::connection('mongodb')->getSchemaBuilder();
            $isExist = $builder->hasCollection($table);
            if (!$isExist) {
                $builder->create($table, function ($collection) {
                    $collection->index(['user_id', 'uuid', 's_date'], 'uid_date', null, ['background' => true]);
                    $collection->index(['s_date','user_id'], 'idx_date', null, ['background' => true]);
                });
            }
            $row = DB::connection('mongodb')->collection($table)->where('user_id', $userId)
                ->where('uuid', $uuid)->where('s_date', $s_date)
                ->first();
            if (empty($row)) {
                $params['visit_times'] = 1;
                DB::connection('mongodb')->collection($table)->insert($params);
            } else {
                DB::connection('mongodb')->collection($table)
                    ->where('_id', $row['_id'])
                    ->increment('visit_times', 1);
            }
        } catch (\Exception $e) {
            echo 'exception:'.$e->getMessage().'|'.$e->getCode()."\n";
            return false;
        }
        return true;
    }

    /**
     * 将失败的数据写入到track_msg_failed集合中
     * @param $data
     */
    private function processFail($data)
    {
        DB::connection('mongodb')->collection('track_msg_failed')->insert($data);
    }

}

针对App的埋点需求,一般特点是:请求量大,高并发

针对这种埋点接口,要做到支持高并发而且性能要好,我这里的解决方案是:

用户发起请求后,将消息扔到消息队列中去,后台开启多个守护进程,共同消费这些消息;

针对消费端,先给出应答,然后再处理数据,处理失败的数据放到处理失败表中,后续可以二次处理。

消费者Command类  app\Console\Commands\ConsumeCommand.php

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use App\Services\TrackService;

class ConsumeCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'weizhi:msg_consume';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Consume Task of Queue';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $service = new TrackService();
        $service->consume();
        echo 'ok';
    }
}

api接口文件 

/**
     * 客户端单独埋点
     * @param Request $request
     * @return \Illuminate\Contracts\Routing\ResponseFactory|\Illuminate\Http\Response
     * @throws \Illuminate\Validation\ValidationException
     */
    public function embed(Request $request)
    {
        $result = ['code' => 0, 'msg' => 'ok', 'data' => []];

        $data = [];
        $data['user_id'] = $request->get('user_id', 0);
        $data['uuid'] = $request->header('uuid');
        $data['ver_id'] = $request->header('ver');
        $data['device'] = $request->header('device');
        $data['opr_time'] = $request->get('opr_time');
        $data['s_date'] = date('Y-m-d');
        $data['event_code'] = $request->get('event_code', 0);
        $objJson = $request->get('json', 0);
        if (!empty($objJson)) {
            $objJson = json_decode($objJson, true);
        } else {
            $objJson = [];
        }
        $data = array_merge($data, $objJson);

        $service = new TrackService();
        $service->write($data);

        return json_response($result);
    }

laravel项目中关于rabbitmq的配置文件

config\services.php

return [
'rabbitmq' => [
        'host' => env('RABBITMQ_HOST', '127.0.0.1'),
        'port' => env('RABBITMQ_PORT', 5672),
        'username' => env('RABBITMQ_USER', 'guest'),
        'password' => env('RABBITMQ_PASSWORD', 'guest'),
        'track_exch' => env('TRACK_EXCH', '/'),
        'track_queue' => env('TRACK_QUEUE', '/'),
        'track_route' => env('TRACK_ROUTE', '/'),
    ],
];

supervosor配置文件

/usr/local/supervisor/conf.d/online_mq.ini

[program:online_msg_consumer]

process_name=%(program_name)s
command=/usr/bin/php /var/www/html/myproject/artisan weizhi:msg_consume
numprocs=1

autostart=true

autorestart=unexpected
exitcodes=0,2

startsecs=1
user=root
stderr_logfile=/tmp/online_blog_stderr.log
stdout_logfile=/tmp/online_blog_stdout.log

redirect_stderr = true
stdout_logfile_maxbytes = 20MB
stdout_logfile_backups = 20

 类似资料: