<?php
class Service_Httpsqs
{
//类单例静态变量
private static $_instance
= null;
/**
* 服务器的url连接参数
* @var string
*/
protected $_serverUrl
= '';
/**
* 请求的key值
* @var string
*/
protected $_key
= '';
/**
* 请求的url地址
* @var string
*/
protected $_url
= '';
/**
* 字符集
* @var string
*/
protected $_charset
= '';
/**
* 是否重置请求
* @var bool
*/
protected $_change
= false;
/**
* 请求方式
* @var string
*/
protected $_opt
= '';
/**
* curl连接句柄
* @var
*/
protected $_ch
= array();
/**
* 超时时间
* @var int
*/
protected $_timeOut
= 2;
/**
* 构造器
* @param string $host
* @param string $port
*/
public function __construct($host = '', $port = '')
{
if (!empty($host) && !empty($port)) {
$this->connect($host, $port);
}
}
/**
* 禁止克隆对象
*/
private function __clone()
{
}
/**
* 单例实现
* @param string $host
* @param int $port
* @param bool $change
* @return Public_HttpSqs
*/
public static function getInstance($host = '', $port = 1218, $change = false)
{
if (null == self::$_instance || true == $change) {
self::$_instance = new self($host, $port);
}
return self::$_instance;
}
/**
* 设置连接参数
* @param string $host
* @param string $port
* @return void
*/
public function connect($host, $port)
{
if (!empty($host) && !empty($port)) {
$this->_serverUrl = $host . ':' . $port;
}
}
/**
* 取得连接的url地址
* @param 请求的key值 $key
* @return void
*/
protected function _setUrl($key, $opt, $charset = '')
{
if ($key && $this->_serverUrl) {
$this->_key
= $key;
if (empty($charset)) {
$url
= $this->_serverUrl . '/?name=' . $key . '&opt=' . $opt;
} else {
$url
= $this->_serverUrl . '/?name=' . $key . '&charset=' . $charset . '&opt=' . $opt;
}
if ($url != $this->_url) {
$this->_url
= $url;
}
}
}
/**
* 根据pos获取队列信息 1.2版本以后支持
* @param String $queueName 队列名称
* @param String $pos 队列位置
* @return String;
*/
public function view($queueName, $pos)
{
$this->_setUrl($queueName, 'view&pos=' . $pos);
$this->_opt = 'get';
$result
= $this->_sendRequest();
if (isset($result->valid) && isset($result->data)) {
return $result->data;
} else {
return '';
}
}
/**
* 取得key-value的value值
* @param 查询主键 $key
* @return mixed
*/
public function get($key)
{
$this->_setUrl($key, 'get');
$this->_opt = 'get';
$result
= $this->_sendRequest();
if (isset($result->valid) && isset($result->data)) {
return $result->data;
} else {
return '';
}
}
/**
* 取得key-value的value值,同时返回pos信息
* @param String $key 队列名称
* @return array
* @note
* pos默认为null
* 从HTTPSQS 1.2版本开始返回pos信息
*/
public function getWithPos($key)
{
$this->_setUrl($key, 'get');
$this->_opt = 'get';
$result
= $this->_sendRequest('get', '', true);
if (isset($result->valid) && isset($result->data) && isset($result->header)) {
$headers = explode("\n", $result->header);
$pos = null;
foreach ($headers as $tmpHeader) {
$fields = explode(":", $tmpHeader);
if (is_array($fields) && count($fields) == 2
&& strtolower(trim($fields[0])) == "pos") {
$pos = trim($fields[1]);
break;
}
}
return array(
"body" => $result->data,
"pos" => $pos
);
} else {
return array();
}
}
/**
* 设置数据进队列
* @param string $key
* @param string $charset
* @return bool
*/
public function put($key, $charset, $data)
{
$this->_setUrl($key, 'put', $charset);
$this->_opt = 'put';
$result
= $this->_sendRequest('post', $data);
return (isset($result->valid) && $result->valid == 1) ? true : false;
}
/**
* 设置curl
* @return $this->_ch
*/
private function _getCurlHandle()
{
if (!isset($this->_ch[$this->_url]) && empty($this->_ch[$this->_url])) {
$this->_ch[$this->_url] = curl_init($this->_url);
curl_setopt($this->_ch[$this->_url], CURLOPT_HTTPHEADER, array('Connection: Keep-Alive', 'Expect:'));
curl_setopt($this->_ch[$this->_url], CURLOPT_TIMEOUT, $this->_timeOut);
curl_setopt($this->_ch[$this->_url], CURLOPT_RETURNTRANSFER, 1);
}
return $this->_ch[$this->_url];
}
/**
* 发送请求
* @return stdClass
*/
private function _sendRequest($opt = 'get', $data = '', $header = true)
{
$this->_getCurlHandle($opt);
if ($opt == 'post') {
curl_setopt($this->_ch[$this->_url], CURLOPT_POST, TRUE);
curl_setopt($this->_ch[$this->_url], CURLOPT_POSTFIELDS, $data);
}
if ($header) {
curl_setopt($this->_ch[$this->_url], CURLOPT_HEADER, TRUE);
}
$return = curl_exec($this->_ch[$this->_url]);
$result
= new stdClass();
if ($header) {
$info = curl_getinfo($this->_ch[$this->_url]);
$header = substr($return, 0, $info["header_size"]);
$return = substr($return, $info["header_size"], strlen($return)-$info["header_size"]);
$result->header = $header;
}
if (!$return) {
$err = curl_errno($this->_ch[$this->_url]);
if ($err != 0) {
$result->error
= 'curl err' . curl_error($this->_ch[$this->_url]);
}
} else {
$this->_parseResult($return, $result);
}
return $result;
}
/**
* 处理请求回来的数据
* @param string $data
* @return stdClass
*/
private function _parseResult($data, $result)
{
if (empty($data)) {
$result->error
= 'data empty';
}
if ($data == false
|| $data["data"] == "HTTPSQS_ERROR"
|| $data["data"] == false
|| $data == 'HTTPSQS_ERROR') {
$result->error
= '请求数据失败';
} elseif ($data['data'] == 'HTTPSQS_PUT_END' || $data == 'HTTPSQS_PUT_END') {
$result->error
= '队列已满';
} elseif ($data == 'HTTPSQS_GET_END' || $data['data'] == 'HTTPSQS_GET_END') {
$result->error
= '队列中已无数据';
} elseif ($data['data'] == 'HTTPSQS_PUT_OK' || $data == 'HTTPSQS_PUT_OK') {
$result->valid
= 1;
} else {
$result->valid
= 1;
$result->data
= $data;
}
return $result;
}
}
$sqs = Service_Httpsqs::getInstance('192.168.1.1', 1218);
$item = array('a','b','1');
foreach ($item as $val) {
$sqs->put('cbupdate11', 'utf-8', $val);
}
while(true) {
$a = $sqs->get('cbupdate11');
// $pos = $a["pos"]; //当前队列消息的读取位置点
$data = $a; //当前队列消息的内容
if ($data != "HTTPSQS_GET_END" && $data != "HTTPSQS_ERROR") {
echo $data;
} else {
sleep(1); //暂停1秒钟后,再次循环
}
}