当前位置: 首页 > 工具软件 > HTTPSQS > 使用案例 >

Httpsqs类。

鄢雅畅
2023-12-01
<?php

class Service_Httpsqs
{
//类单例静态变量
private static $_instance = null;

/**
* 服务器的url连接参数
* @var string
*/
protected $_serverUrl = '';

/**
* 请求的key值
* @var string
*/
protected $_key = '';

/**
* 请求的url地址
* @var string
*/
protected $_url = '';

/**
* 字符集
* @var string
*/
protected $_charset = '';

/**
* 是否重置请求
* @var bool
*/
protected $_change = false;

/**
* 请求方式
* @var string
*/
protected $_opt = '';

/**
* curl连接句柄
* @var 
*/
protected $_ch = array();

/**
* 超时时间
* @var int
*/
protected $_timeOut = 2;

/**
* 构造器
* @param string $host
* @param string $port
*/
public function __construct($host = '', $port = '')
{  
if (!empty($host) && !empty($port)) {
$this->connect($host, $port);
}
}


/**
* 禁止克隆对象
*/
private function __clone()
{


}

/**
* 单例实现
* @param string $host
* @param int $port
* @param bool $change
* @return Public_HttpSqs
*/
public static function getInstance($host = '', $port = 1218, $change = false)
{
if (null == self::$_instance || true == $change) {
self::$_instance = new self($host, $port);
}


return self::$_instance;
}

/**
* 设置连接参数
* @param string $host
* @param string $port
* @return void
*/
public function connect($host, $port)
{
if (!empty($host) && !empty($port)) {
$this->_serverUrl = $host . ':' . $port;
}
}

/**
* 取得连接的url地址
* @param 请求的key值 $key
* @return void
*/
protected function _setUrl($key, $opt, $charset = '')
{
if ($key && $this->_serverUrl) {
$this->_key = $key;
if (empty($charset)) {
$url =  $this->_serverUrl . '/?name=' . $key . '&opt=' . $opt;
} else {
$url =  $this->_serverUrl . '/?name=' . $key . '&charset=' . $charset . '&opt=' . $opt;
}

if ($url != $this->_url) {
$this->_url = $url;
}
}
}

/**
* 根据pos获取队列信息 1.2版本以后支持
     * @param String $queueName 队列名称
     * @param String $pos       队列位置
     * @return String;
*/
public function view($queueName, $pos)
{
$this->_setUrl($queueName, 'view&pos=' . $pos);
$this->_opt = 'get';
$result = $this->_sendRequest();
if (isset($result->valid) && isset($result->data)) {
return $result->data;
} else {
return '';
}
}


/**
* 取得key-value的value值
* @param 查询主键 $key
* @return mixed
*/
public function get($key)
{
$this->_setUrl($key, 'get');
$this->_opt = 'get';
$result = $this->_sendRequest();
if (isset($result->valid) && isset($result->data)) {
return $result->data;
} else {
return '';
}
}


/**
* 取得key-value的value值,同时返回pos信息
* @param String $key 队列名称
* @return array
     * @note
     *      pos默认为null
     *      从HTTPSQS 1.2版本开始返回pos信息
*/
public function getWithPos($key)
{
$this->_setUrl($key, 'get');
$this->_opt = 'get';
$result = $this->_sendRequest('get', '', true);
        if (isset($result->valid) && isset($result->data) && isset($result->header)) {
            $headers    = explode("\n", $result->header);
            $pos        = null;
            foreach ($headers as $tmpHeader) {
                $fields = explode(":", $tmpHeader);
                if (is_array($fields) && count($fields) == 2 
                && strtolower(trim($fields[0])) == "pos") {
                    $pos    = trim($fields[1]);
                    break;
                }
            }
            return array(
                "body"  => $result->data,
                "pos"   => $pos 
            );
} else {
            return array();
}
}

/**
* 设置数据进队列
* @param string $key
* @param string $charset
* @return bool
*/
public function put($key, $charset, $data)
{
$this->_setUrl($key, 'put', $charset);
$this->_opt = 'put';
$result = $this->_sendRequest('post', $data);
return (isset($result->valid) && $result->valid == 1) ? true : false;
}

/**
* 设置curl
* @return $this->_ch
*/
private function _getCurlHandle()
{
if (!isset($this->_ch[$this->_url]) && empty($this->_ch[$this->_url])) {
$this->_ch[$this->_url] = curl_init($this->_url);

curl_setopt($this->_ch[$this->_url], CURLOPT_HTTPHEADER, array('Connection: Keep-Alive', 'Expect:'));
curl_setopt($this->_ch[$this->_url], CURLOPT_TIMEOUT, $this->_timeOut);
curl_setopt($this->_ch[$this->_url], CURLOPT_RETURNTRANSFER, 1);
}
return $this->_ch[$this->_url];
}

/**
* 发送请求
* @return stdClass
*/
private function _sendRequest($opt = 'get', $data = '', $header = true)
{
$this->_getCurlHandle($opt);

if ($opt == 'post') {
curl_setopt($this->_ch[$this->_url], CURLOPT_POST, TRUE);
curl_setopt($this->_ch[$this->_url], CURLOPT_POSTFIELDS, $data);
}
        if ($header) {
            curl_setopt($this->_ch[$this->_url], CURLOPT_HEADER, TRUE);
        }
$return = curl_exec($this->_ch[$this->_url]);
$result = new stdClass();
        if ($header) {
            $info   = curl_getinfo($this->_ch[$this->_url]);
            $header = substr($return, 0, $info["header_size"]);
            $return = substr($return, $info["header_size"], strlen($return)-$info["header_size"]);
            $result->header = $header;
        }
if (!$return) {
$err = curl_errno($this->_ch[$this->_url]);
if ($err != 0) {
$result->error = 'curl err' . curl_error($this->_ch[$this->_url]);
}
} else {
$this->_parseResult($return, $result);
}
return $result;
}

/**
* 处理请求回来的数据
* @param string $data
* @return stdClass
*/
private function _parseResult($data, $result)
{
if (empty($data)) {
$result->error = 'data empty';
}
if ($data == false 
|| $data["data"] == "HTTPSQS_ERROR" 
|| $data["data"] == false 
|| $data == 'HTTPSQS_ERROR') {
$result->error = '请求数据失败';
} elseif ($data['data'] == 'HTTPSQS_PUT_END' || $data == 'HTTPSQS_PUT_END') {
$result->error = '队列已满';
} elseif ($data == 'HTTPSQS_GET_END' || $data['data'] == 'HTTPSQS_GET_END') {
$result->error = '队列中已无数据';
} elseif ($data['data'] == 'HTTPSQS_PUT_OK' || $data == 'HTTPSQS_PUT_OK') {
$result->valid = 1;
} else {
$result->valid = 1;
$result->data = $data;
}

return $result;
}

}






$sqs = Service_Httpsqs::getInstance('192.168.1.1', 1218);
$item = array('a','b','1');


foreach ($item as $val) {
    $sqs->put('cbupdate11', 'utf-8', $val);
}


while(true) {
    $a = $sqs->get('cbupdate11');
   // $pos = $a["pos"]; //当前队列消息的读取位置点
    $data = $a; //当前队列消息的内容
    if ($data != "HTTPSQS_GET_END" && $data != "HTTPSQS_ERROR") {
           echo $data; 
    } else {
        sleep(1); //暂停1秒钟后,再次循环
    }
}

 类似资料: