什么是MQTT?
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。 -(引用自百度百科)
mqtt特点:
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
项目需求对接mqtt数据,启用workderman/mqtt包
文档
https://www.workerman.net/doc/workerman/components/workerman-mqtt.html
安装
composer require workerman/mqtt
注意点:
判断扩展是否加载,安装PHP>=5.4,并安装了pcntl、posix扩展
一直运行,php可能会有内存泄漏风险,利用Timer定时器类,判断运行1小时退出程序
退出程序利用supervisor机制,保持进程常驻
项目中只支持mqtt协议,市面上包括原生都只支持mqtt协议,java->mqtt框架可支持wss协议,需要提供mqtt协议
//重要引入
require_once dirname(dirname(dirname(__FILE__))) . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Timer;
//检查扩展是否加载
check_extension();
$worker = new Worker();
$worker->onWorkerStart = function () {
//获取脚本执行时间
$startRunTime = time();
// 定时,每1秒一次
Timer::add(1, function () use ($startRunTime) {
$pid = posix_getpid();
if (3600 < time() - $startRunTime) {
$logInfo = array(
'msg' => 'check run time exit',
'pid' => $pid
);
Logger::getLogger($logName)->info(json_encode($logInfo));
exit();
}
});
$appKey = '';
$appSecret = '';
$timestamp = currentTimeMillis();
$clientId = 'MqttClient_' . rand(10000000000, 99999999999);
$username = $appKey . "_" . $timestamp;
$password = doSign($appKey, $timestamp, $appSecret, $clientId);
$connectTimeout = 10;
$keepalive = 20;
$cleanSession = true;//中断清除会话
$options = array(
'keepalive' => $keepalive,//客户端向服务端发送心跳的时间间隔,默认50秒,设置成0代表不启用心跳
'client_id' => $clientId,//客户端id,如果没设置默认是 "workerman-mqtt-client-".mt_rand()
'clean_session' => $cleanSession,
'connect_timeout' => $connectTimeout,
'username' => $username,
'password' => $password,
'debug' => false//是否调试模式
);
$mqtt = new Workerman\Mqtt\Client('mqtt://', $options);
$mqtt->onConnect = function ($mqtt) {
$mqtt->subscribe('', array('qos' => 1));
};
$mqtt->onMessage = function ($topic, $content) {
//检查进程是否存在
proc_check_exit();
proc_parent_check_exit();
//$topic=订阅地址,$content=接收数据内容
};
$mqtt->onError = function (\Exception $exception) {
Logger::getLogger($logName)->info(json_encode($exception));
exit();
};
$mqtt->onClose = function ($mqtt) {
Logger::getLogger($logName)->info('重新连接');
$mqtt->connect();
};
$mqtt->connect();
};
Worker::runAll();
//检查扩展是否加载
function check_extension()
{
$ext_list = array(
'posix',
'pcntl',
);
foreach ($ext_list as $ext) {
if (!extension_loaded($ext)) {
echo $ext . " not loaded" . "\n";
exit();
}
}
}
//检查pid是否存在
function check_pid($pid)
{
$oldErrorLevel = error_reporting(0);
error_reporting($oldErrorLevel & ~E_WARNING);
$res = pcntl_getpriority($pid);
error_reporting($oldErrorLevel);
$ret = !($res === false);
return $ret;
}
/*
* 查找进程是否重复
*/
function proc_check_exit($check_title = true)
{
$run_title = cli_get_process_title();
$proc_title = "master process";
if ($check_title && strpos($run_title, $proc_title) === false) {
return;
}
if (file_exists(Worker::$pidFile)) {
$file_pid = file_get_contents(Worker::$pidFile);
$proc_pid = getmypid();
if (check_pid($file_pid) && $file_pid != $proc_pid) {
echo sprintf("file pid: %s proc pid: %s\n", $file_pid, $proc_pid);
exit();
}
}
}
/*
* 查找父进程是否为日志进程
*/
function proc_parent_check_exit($check_title = true)
{
$run_title = cli_get_process_title();
$proc_title = "worker process";
if ($check_title && strpos($run_title, $proc_title) === false) {
return;
}
if (file_exists(Worker::$pidFile)) {
$file_pid = file_get_contents(Worker::$pidFile);
$file_pid = trim($file_pid);
$file_pid = intval($file_pid);
$proc_pid = getmypid();
$proc_ppid = posix_getppid();
if (/* check_pid($file_pid) && */ $file_pid != $proc_ppid) {
$msg = sprintf(
"exit ppid: file pid: %s proc pid: %s proc ppid: %s",
$file_pid,
$proc_pid,
$proc_ppid
);
echo $msg . "\n";
Logger::getLogger('today.reward')->warn($msg);
posix_kill($proc_ppid, SIGKILL);
sleep(5);
exit();
}
}
}
//获取默认毫秒时间戳方法
function currentTimeMillis()
{
list($microsecond, $time) = explode(' ', microtime()); //' '中间是一个空格
$res = (float)sprintf('%.0f', (floatval($microsecond) + floatval($time)) * 1000);
return $res;
}
//还原JAVAHmacSHA256加密
function doSign($appKey, $timestamp, $appSecret, $clientId)
{
try {
$query = "appKey=" . $appKey . "clientId=" . $clientId . "timestamp=" . $timestamp;
// 使用HmacSHA256加密
$midata = hash_hmac('sha256', $query, $appSecret, false);
$midata = strtoupper($midata);
return $midata;
} catch (Exception $e) {
$info = array(
'msg' => "获取签名失败: appKey:{$appKey}, timestamp: {$timestamp}, appSecret:{$appSecret}"
);
Logger::getLogger('today.reward')->info($info);
return null;
}
}