连接池的作用主要是节省连接的时间。连接池机制预先打开N个连接,把它们缓存起来,当需要使用连接的时候就直接使用这些已经打开的连接,从而节省了时间。但PHP的RabbitMQ扩展为了兼容PHP-FPM模式,因为在PHP-FPM模式下是不能做连接池的,脚本在解释执行完毕后会释放所有的内存资源,没法进行内存常驻。
RabbitMQ配置信息散落在各个业务上,任意配置的改动,都需要各个业务组进行配合修改,并且要在切换的过程中保证线上的服务运行正常,且没法做到统一管理,给维护上带来一定的难度与成本。
PHP-FPM为同步编程模型,并不支持异步的编程方式,所有代码都是逐行顺序执行的。非常符合程序员的习惯,不管是编码还是调试。但在IO密集型的场景下,就会逐渐显得有点乏力,因为该场景下的CPU利用率将会大大降低,发挥不出该有的性能。
PHP-FPM模型下的PHP,将会有着非常多的初始化。比如配置、连接、文件等等。这一系列的操作,对性能的影响将是毫秒级的。
吞吐量(Throughput)指在单位时间内成功传输的数据量。作为消息队列中间件,必须有着非常高的吞吐量,不能因此成为业务的瓶颈。
可用性(Availability)指在一段连续时间考察内,系统正常运行的概率。在保证吞吐量的基础上,还必须保证系统能够7*24小时正常运转。作为消息中转平台,如果发生故障,将会导致业务不同情况受损。
可维护性(Maintainability)是衡量一个系统的可修复(恢复)性和可改进性的难易程度。除了编码上的可维护性,还需对故障的检测、诊断、修复提供快速的响应能力。
跨平台(Cross Platform)指不应受到硬件或者环境影响而导致程序的不可运行。在设计中,必须考虑到多种编程语言(JAVA、PHP、PYTHON等等)的接入场景,要基于通用的协议进行开发。
业务透明(Service Transparency)指对所有环境提供固定的接入方式。消息队列的配置不对业务开放,实现统一管理。无论是阶段性的服务迁移或者是参数调整,对业务来说都是无感知的,简化维护流程。
PHP(外文名称:PHP:Hypertext Preprocessor,中文名称:超文本预处理器)是一种通用的开源脚本语言。该语法吸收了C语言、Java和Perl的特点,有利于学习和广泛使用,主要适用于Web开发领域。PHP的独特语法是C、Java、Perl和PHP自己的语法的混合。它可以比CGI或Perl更快地执行动态web页面。与其他编程语言相比,PHP将程序嵌入到HTML(一种标准通用标记语言的应用程序)文档中执行。执行效率比完全生成HTML标记的CGI要高得多。PHP还可以执行编译后的代码,这将对代码进行加密和优化,以使其运行得更快。
Swoole允许PHP开发人员编写高性能、高并发的服务,如TCP、UDP、Unix套接字、HTTP和WebSocket,使PHP超越Web。Swoole4的成熟将PHP带到了一个前所未有的阶段,为性能改进提供了独一无二的可能性。Swoole可广泛应用于互联网、移动通信、云计算、在线游戏、物联网、汽车联网、智能家居等领域。使用PHP+Swoole可以使企业IT研发团队更有效率,更专注于开发创新产品。
RabbitMQ,全名为Rabbit Message Queue,是一种应用程序到应用程序的通信方法。应用程序通过从队列(应用程序的数据)读写消息进行通信,而不需要专用连接来链接它们。消息传递指的是程序之间通过发送消息中的数据进行通信,而不是通过相互直接调用进行通信,后者通常用于远程过程调用等技术。排队意味着应用程序通过队列进行通信。使用队列消除了同时接收和发送应用程序的需要。
待补充
- Reactor线程是在Master进程中创建的线程
- 负责维护客户端TCP连接、处理网络IO、处理协议、收发数据
- 不执行任何PHP代码
- 将TCP客户端发来的数据缓冲、拼接、拆分成完整的一个请求数据包
- 接受由Reactor线程投递的请求数据包,并执行PHP回调函数处理数据
- 生成响应数据并发给Reactor线程,由Reactor线程发送给TCP客户端
- 可以是异步非阻塞模式,也可以是同步阻塞模式
- Worker以多进程的方式运行
- 接受由Worker进程投递的任务
- 处理任务,并将结果数据返回给Task Worker进程
- 完全是同步阻塞模式
- Task Worker以多进程的方式运行
- 负责创建/回收Worker/Task进程
HTTP(超文本传输协议)是Internet上使用最广泛的网络传输协议之一。所有的WWW文件必须遵守这个标准,并且可以作为所有编程语言的通用协议使用。
$http = new Swoole\Http\Server('0.0.0.0', 9501);
$http->on('request', function ($request, $response) {
...调用入口
});
$http->start();
HTTP接收到请求,完成基本的解析与校验之后,通过TaskWorker进程,进行消息投递并结束HTTP请求。然后由TaskWorker进程进行异步的RabbitMQ消息投递,能够满足高吞吐量的场景。
public static function buildMessage(string $content): array
{
$content = json_decode($content, true);
$queue = [];
foreach($content as $item) {
$obj = new self();
foreach([
'exchange',
'queue',
'route',
'payload'
] as $k) {
$obj->{$k} = $item[$k];
}
$queue[] = $obj;
}
return $queue;
}
基于TaskWorker,程序立即返回继续向下执行代码。onTask回调函数Task进程池内被异步执行。执行完成后调用 $serv->finish() 返回结果
public function OnTask(swoole_server $serv, int $task_id, int $src_worker_id, $data)
{
...TaskWorker进程执行的逻辑代码
}
public static function sendMessageToTask(\swoole_server $process, array $messages)
{
foreach($messages as $message) {
$process->task($message);
}
}
class RabbitMqPool
{
private static $pool = null;
public static function getConnectPool()
{
}
public static function release(){
}
}
$status = [
'start' => 0,
'pid' => 0,
'send_success' => 0,
'send_error' => 0,
];
public function getStatus(){
}
public function shutdown(){
}
public function registerSignal(){
}
public static function urlNotity(array $urlList, string $uniqueKey = null, bool $isSlowQueue = false,string $topic = ''): bool
{
try {
$config = self::getConfig();
if ($isSlowQueue) {
$config['topic'] = "LowQueue";
}elseif (!empty($topic)){
$config['topic'] = $topic;
}
$config = new ConfigRpc($config['url'], $config['topic'], $config['secretKey']);
$producerObj = new Producer($config);
if ($uniqueKey === null || $uniqueKey !== "") {
$producerObj->setVars(ConstVars::RD_KAFKA_PARTITION_UA, 0, $uniqueKey);
}
foreach ($urlList as $url) {
$producerObj->send([
'requests' => [
//这里不使用并发请求
['url' => $url],
],
]);
}
$producerObj->waitFinish();
return true;
} catch (\Exception $exception) {
return false;
}
}
生产者服务是基于事件的异步任务系统。当一个HTTP请求到来,将由Worker进程负责任务分发的工作,利用TaskWorker进程进行异步化,最后由TaskWorker进程完成消息投递到RabbitMQ的过程。而这一流程设计,将会极大提高Reactor线程的吞吐量
待补充
基于GuzzleHttp框架实现的http callback模式,该模式下主要优缺点:
public function consumeTask(array $task, array $options = []): bool
{
$noticeObject = $task['msg-payload'];
$reqNum = count($noticeObject['requests']);
[$promises, $urlMap] = RequestParse::parse($noticeObject['requests'], $this->config);
$control = ControlParse::parse(isset($noticeObject['control']) ? $noticeObject['control'] : [], $this->config);
try {
$allowRetry = $reqNum == 1 && $control['retryTimes'] > 0;
$retryCode = $control['retryCode'];
$retryTimes = (int) $control['retryTimes'];
$try = -1;
do {
$try++;//第几次尝试
$retryFlag = false;
try {
//该行代码会抛出 ConnectException,需要捕获做重试处理
$results = Promise\unwrap($promises);
$successMap = [];
$errorMap = [];
/**
* @var ResponseInterface $result
*/
foreach ($results as $index => $result) {
$statusCode = (int) $result->getStatusCode();
$headers = $result->getHeaders();
if ($allowRetry && in_array($statusCode, $retryCode)) {
$retryFlag = true;
}
if (in_array($statusCode, $control['successCode'])) {
$successMap[$index] = [
'code' => $statusCode,
'header' => $headers,
];
} else {
$errorMap[$index] = [
'code' => $statusCode,
'header' => $headers,
'response' => $result->getBody()->getContents(),
];
}
}
} catch (ConnectException $connectException) {
$retryFlag = true;
}
} while ($retryFlag && $retryTimes-- > 0);
return ! $retryFlag;
} catch (\Exception $ex) {
return false;
}
}
该模式下,需要针对业务需求进行具体定制,获取到RabbitMQ消息后,进行一些列无超时的逻辑处理,由于保密协议,代码逻辑不方便展示。该模式下主要优缺点:
//执行结果回调函数
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
...
};
$worker = new Worker($callback);
$worker->setQueue('demo');
$worker->run();
基于上面分析,如无特殊要求,一般建议使用http模式。
private function consume()
{
$this->channel = $this->mq->channel();
$this->channel->basic_qos(null, 1, null);
$customerId = $this->getCustomerId($this->jobConfig->getKafkaGroupId());
$queueName = $this->getQueueName();
$this->channel->basic_consume($queueName, $customerId, false, ! $this->needAck, false, false,
[$this, 'processMessage']);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public static function parse(array $request,JobConfig $jobConfig): array
{
$attCfg = $jobConfig->getAttachConfig();
$extOptions = [];
if(isset($attCfg['curlTimeout']) && $attCfg['curlTimeout'] > 0) {
$extOptions['defaultTimeout'] = (int)$attCfg['curlTimeout'];
}
$result = [];
$urlMap = [];
$client = new Client();
foreach($request as $index => $item) {
if(empty($item['method'])) {
$item['method'] = "GET";
}
if(empty($item['options'])) {
$item['options'] = [];
}
$options = self::parseOptions($item['options'], $extOptions);
$req = $client->requestAsync($item['method'], $item['url'], $options);
$result[$index] = $req;
$urlMap[$index] = $item['url'];
}
return [$result, $urlMap];
}
消息校验是为了避免因为执行错误的数据而导致系统崩溃或业务错乱。业务错乱是最大的原因,因为这有可能导致显式的经济损失,比如价格错误
if(empty($noticeObject['requests']) || !is_array($noticeObject['requests'])) {
throw new JobException("传递了一个空的请求对象");
}
if(!is_array($item['options'])) {
throw new JobException("无法解析的头部选项");
}
if(!is_string($item['method']) || !is_string($item['url'])) {
throw new JobException("无法处理的");
}
用于执行客户端的回调url,核心代码逻辑。
$results = Promise\unwrap($promises);
foreach($results as $index => $result) {
$statusCode = (int)$result->getStatusCode();
$headers = $result->getHeaders();
$url = $urlMap[$index];
if($allowRetry && in_array($statusCode, $retryCode)) {
$retryFlag = true;
}
if(in_array($statusCode, $control['successCode'])) {
$successMap[$index] = [
'code' => $statusCode,
'header' => $headers,
];
} else {
$errorMap[$index] = [
'code' => $statusCode,
'header' => $headers,
'response' => $result->getBody()->getContents()
];
}
}
消息执行完毕后,需要进行消息确认。
$channel->basic_ack($message->delivery_info['delivery_tag']);
超时机制(timeout)是保护系统的常用手段。一个应用中,常常包含对多方应用的网络调用,每个应用的稳定性都不一样。如果没有超时机制,可能会因为某个应用的调用阻塞,导致当前应用系统发生大量阻塞,其余的任务无法执行的现象。
$defaultTimeout = 30;
if(isset($extOptions['defaultTimeout']) && $extOptions['defaultTimeout'] > 0) {
$defaultTimeout = (int)$extOptions['defaultTimeout'];
}
$result = [
"connect_timeout" => $defaultTimeout,
"read_timeout" => $defaultTimeout,
"timeout" => $defaultTimeout,
];
由于业务场景特殊,比如系统繁忙暂时无法服务,需要进行多次重试。该模式下允许业务方传递重试次数,但接口幂等性须自行维护。
$allowRetry = $reqNum == 1 && $control['retryTimes'] > 0;
$retryCode = $control['retryCode'];
$retryTimes = (int)$control['retryTimes'];
$try = -1; å
用于确认消费者是否成功消费了队列中的消息,如果没有确认机制,系统一旦崩溃,可能就会出现数据丢失等问题。
if ($this->needAck && $this->firstAck) {
$channel->basic_ack($message->delivery_info['delivery_tag']);
}
该消费者设计提供多种消费场景(HTTP与Cli),能够满足大部分业务场景。消费者与生产者高度解耦,拆分为两种应用,生产者负责消息的传递,消费者负责消息的消费,基础与业务隔离。应用可以独自部署,具有非常优秀的横向扩展性,同时极大降低日常维护工作的负担
命令行参数指的是在命令行中执行程序时传递的参数,常用于同一个程序执行不同的作用域,比如运行在多个端口上。
$shortOpts = ["port:", "host:", "config::"];
$options = getopt("", $shortOpts);
if(empty($options['port']) || $options['port'] < 1) {
fwrite(STDERR, "错误的端口号".PHP_EOL);
return -1;
}
if(empty($options['host'])) {
fwrite(STDERR, "错误的监听域".PHP_EOL);
return -1;
}
if(empty($options['config'])) {
$options['config'] = "config";
}
环境变量会影响多个实用程序,函数和应用程序的操作,方便配置集中式维护。
$host = env('host');
$port = env['port'];
$port = env('config');
基于AMQP 0-9-1(高级消息队列协议),该协议是一种消息传递协议,使一致的客户端应用程序可以与一致的消息传递中间件代理进行通信。
[
'HOST' => '',
'PORT' => '',
'USER' => '',
'PASS' => '',
'VHOST' => '',
'options' => [
'login_method' => 'AMQPLAIN',
'locale' => 'en_US',
'connection_timeout' => 20.0,
'read_write_timeout' => 120.0,
'keepalive' => false,
'heartbeat' => 30
],
]
- 对系统不间断实时监控:实际上是对系统不间断的实时监控
- 实时反馈系统当前状态:我们监控某个硬件、或者某个系统,都是需要能实时看到当前系统的状态,是正常、异常、或者故障
- 保证服务可靠性安全性:我们监控的目的就是要保证系统、服务、业务正常运行
- 保证业务持续稳定运行:如果我们的监控做得很完善,即使出现故障,能第一时间接收到故障报警,在第一时间处理解决,从而保证业务持续性的稳定运行
基于Monolog实现的一套日志系统。
switch($config['type']) {
case "file";
$this->logger = new FileLogger($config['path'], $level);
break;
case "std":
$this->logger = new StdLogger($level);
break;
case "es":
$this->logger = new EsLogger($level);
break;
}
基于cli实现,该模式下只要输出debug级别的日志,由于debug级别日志非常庞大,不适合常规存储。
基于文件实现,该模式下适合单机运行的服务,不适合分布下的存储,所以接下来主要介绍Es模式。
Es(全名ElasticSearch),是一个构建在Apache Lucene之上的开源分布式搜索引擎,主要特点:
- 基于 JSON 的分布式搜索和分析引擎,专为实现水平扩展、高可靠性和管理便捷性而设计
- 是一个分布式的 RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。作为 Elastic Stack的核心,它集中存储您的数据,帮助您发现意料之中以及意料之外的情况
- 允许执行和合并多种类型的搜索 ( 结构化、非结构化、地理位置、度量指标 )搜索方式随心而变
Kibana,和Es是个非常好的搭档,主要特点:- 能够以图表的形式呈现数据,并且具有可扩展的用户界面,供您全方位配置和管理
- 核心搭载了一批经典功能:柱状图、线状图、饼图、旭日图,等等。它们充分利用了 Elasticsearch 的聚合功能
报警后的故障一般如何处理?首先,我们可以通过报警升级机制自动处理故障。例如,如果服务宕机,我们可以设置警报升级自动启动。
但是,如果在一般业务中出现严重故障,我们通常会根据故障的级别和故障的业务分配不同的运维人员进行处理。
企业微信机器人的功能是指机器人成员可以添加到内部组中,机器人可以在一定的时间或按照一定的规则向组中发送消息,如@group members。企业微信是一款高频办公工具。随着机器人功能的引入,我们可以充分利用微信的强提醒功能,有效的利用机器人来提高工作效率和执行率。此外,它还可以应用到其他一些场景,帮助企业的运营。
由于企业微信机器人有2048个字节限制,一般大文本告警信息,需要使用邮箱进行通知。邮箱更多优势在于:
方便简单,但运维成本高,常用于本地开发。
php server.php --host="0.0.0.0" --port="8001" --config="config.dev"
Supervisor是用Python开发的一个通用进程管理器,它可以将一个普通的命令行进程转换为守护进程,监视它的状态,并在它退出时自动重启它。它通过fork/exec来启动作为管理器子进程的被管理进程,这样它就可以简单地将被管理进程的可执行文件的路径写入到管理器配置文件中。它还意识到,当子进程被挂起时,父进程可以准确地获取被挂起子进程的信息,并可以自行选择是否启动报警。所以线上服务我们采用的是此方式进行部署。
[program:server]
command=php server.php --host="0.0.0.0" --port="8001" --config="config.dev"
autostart=true
startsecs=10
autorestart=true
startretries=3
user=tomcat
priority=999
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups = 20
stdout_logfile=/opt/logs/catalina.out
stopasgroup=false
killasgroup=false