<?php
/**
* 埋点服务
* User: shiyf
* Date: 2022/5/30
* Time: 17:17
*/
namespace App\Services;
require_once __DIR__ .'/../../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
class TrackService
{
private $connection = null;
private $channel = null;
private $exchange_name = '';
private $queue_name = '';
private $route_key = '';
public function construct()
{
}
/**
* 有配送费的订单支付成功后,生成快递任务
* @param $params
* @return bool
*/
public function write($params)
{
$this->send(json_encode($params));
return true;
}
private function send($content)
{
$this->initial();
//声明一个交换器
$exchange = $this->channel->exchange_declare($this->exchange_name,'direct',false,true,true);
//定义一个队列
$queue = $this->channel->queue_declare($this->queue_name, false, true, false, true);
//将队列和交换器通过路由key绑定起来
$ret = $this->channel->queue_bind($this->queue_name, $this->exchange_name, $this->route_key);
//生产消息
$msg = new AMQPMessage($content);
$this->channel->basic_publish($msg, $this->exchange_name, $this->route_key);
$this->channel->close();
$this->connection->close();
}
/**
* 初始化数据库连接
*/
private function initial()
{
$config = config('services.rabbitmq');
$this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['username'], $config['password']);
$this->channel = $this->connection->channel();
$this->exchange_name = $config['track_exch'];
$this->queue_name = $config['track_queue'];
$this->route_key = $config['track_route'];
}
/**
* 消息的消费方法
*/
public function consume()
{
$this->initial();
//定义一个队列
$queue = $this->channel->queue_declare($this->queue_name, false, true, false, true);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
$now = date('Y-m-d H:i:s');
if ($msg instanceOf AMQPMessage) {
$msg->ack();
echo "{$now} Received ", $msg->body, "\n";
$data = json_decode($msg->body, true);
$ret = $this->process($data);
if (!$ret) {
$this->processFail($data);
}
} else {
exit(-1);
}
};
$this->channel->basic_consume($this->queue_name, '', false, false, false, false, $callback);
while (count($this->channel->callbacks)) {
echo "inner while \n";
try {
$this->channel->wait();
} catch (\Exception $e) {
echo 'exception:'.$e->getMessage().'|'.$e->getCode()."\n";
$this->channel->close();
$this->connection->close();
exit(-1);
}
}
$this->channel->close();
$this->connection->close();
}
/**
* 处理获取的消息内容
* @param $params
* @return bool
*/
private function process($params)
{
try {
$table = 'evt_' . $params['event_code'];
unset($params['event_code']);
$userId = $params['user_id'];
$uuid = $params['uuid'];
$s_date = $params['s_date'];
$builder = DB::connection('mongodb')->getSchemaBuilder();
$isExist = $builder->hasCollection($table);
if (!$isExist) {
$builder->create($table, function ($collection) {
$collection->index(['user_id', 'uuid', 's_date'], 'uid_date', null, ['background' => true]);
$collection->index(['s_date','user_id'], 'idx_date', null, ['background' => true]);
});
}
$row = DB::connection('mongodb')->collection($table)->where('user_id', $userId)
->where('uuid', $uuid)->where('s_date', $s_date)
->first();
if (empty($row)) {
$params['visit_times'] = 1;
DB::connection('mongodb')->collection($table)->insert($params);
} else {
DB::connection('mongodb')->collection($table)
->where('_id', $row['_id'])
->increment('visit_times', 1);
}
} catch (\Exception $e) {
echo 'exception:'.$e->getMessage().'|'.$e->getCode()."\n";
return false;
}
return true;
}
/**
* 将失败的数据写入到track_msg_failed集合中
* @param $data
*/
private function processFail($data)
{
DB::connection('mongodb')->collection('track_msg_failed')->insert($data);
}
}
针对App的埋点需求,一般特点是:请求量大,高并发
针对这种埋点接口,要做到支持高并发而且性能要好,我这里的解决方案是:
用户发起请求后,将消息扔到消息队列中去,后台开启多个守护进程,共同消费这些消息;
针对消费端,先给出应答,然后再处理数据,处理失败的数据放到处理失败表中,后续可以二次处理。
消费者Command类 app\Console\Commands\ConsumeCommand.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Services\TrackService;
class ConsumeCommand extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'weizhi:msg_consume';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Consume Task of Queue';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$service = new TrackService();
$service->consume();
echo 'ok';
}
}
api接口文件
/**
* 客户端单独埋点
* @param Request $request
* @return \Illuminate\Contracts\Routing\ResponseFactory|\Illuminate\Http\Response
* @throws \Illuminate\Validation\ValidationException
*/
public function embed(Request $request)
{
$result = ['code' => 0, 'msg' => 'ok', 'data' => []];
$data = [];
$data['user_id'] = $request->get('user_id', 0);
$data['uuid'] = $request->header('uuid');
$data['ver_id'] = $request->header('ver');
$data['device'] = $request->header('device');
$data['opr_time'] = $request->get('opr_time');
$data['s_date'] = date('Y-m-d');
$data['event_code'] = $request->get('event_code', 0);
$objJson = $request->get('json', 0);
if (!empty($objJson)) {
$objJson = json_decode($objJson, true);
} else {
$objJson = [];
}
$data = array_merge($data, $objJson);
$service = new TrackService();
$service->write($data);
return json_response($result);
}
laravel项目中关于rabbitmq的配置文件
config\services.php
return [
'rabbitmq' => [
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'username' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'track_exch' => env('TRACK_EXCH', '/'),
'track_queue' => env('TRACK_QUEUE', '/'),
'track_route' => env('TRACK_ROUTE', '/'),
],
];
supervosor配置文件
/usr/local/supervisor/conf.d/online_mq.ini
[program:online_msg_consumer]
process_name=%(program_name)s
command=/usr/bin/php /var/www/html/myproject/artisan weizhi:msg_consume
numprocs=1
autostart=true
autorestart=unexpected
exitcodes=0,2
startsecs=1
user=root
stderr_logfile=/tmp/online_blog_stderr.log
stdout_logfile=/tmp/online_blog_stdout.log
redirect_stderr = true
stdout_logfile_maxbytes = 20MB
stdout_logfile_backups = 20