当前位置: 首页 > 工具软件 > HTTPSQS > 使用案例 >

python封装类_Python httpsqs封装类

方永贞
2023-12-01

‘‘‘httpsqs?????

@author xiaopeng

a = HttpsqsClient(‘192.168.0.218‘,‘1218‘,‘httpsqsmmall.com‘)

print a.put(‘logtest‘,‘sdfsfsf‘)

print a.get(‘logtest‘)‘‘‘

importurllib,urllib2,jsonclassHttpsqsClient(object):def __init__(self,host= ‘127.0.0.1‘,port=‘1218‘,auth = ‘‘, charset = ‘utf-8‘):

self.httpsqs_url= ‘http://‘ + host + ‘:‘ + port + ‘/?‘self.httpsqs_auth=auth

self.httpsqs_charset=charsetdef __http_get(self,params):‘‘‘http get??

params = {"wd":"a","b":"2"}‘‘‘params[‘auth‘] =self.httpsqs_auth

params[‘charset‘] =self.httpsqs_charsettry:

url_params=urllib.urlencode(params)

final_url= self.httpsqs_url +url_params

page= urllib2.urlopen(final_url,timeout=3).read()returnpageexcepturllib2.HTTPError, e:print "Error Code:", e.codeexcepturllib2.URLError, e:print "Error Reason:", e.reasondef __http_post(self,params,post_data):"""http post??

@param params: params = {"wd":"a","b":"2"}

@param post_data: json or string

@type post_data: string

@return:string"""params[‘auth‘] =self.httpsqs_auth

params[‘charset‘] =self.httpsqs_charset

url_params=urllib.urlencode(params)

final_url= self.httpsqs_url +url_params

post_data= post_data.encode(‘utf8‘)try:

req=urllib2.Request(final_url, post_data)

page= urllib2.urlopen(req,timeout=3).read()returnpageexcepturllib2.HTTPError, e:print "Error Code:", e.codeexcepturllib2.URLError, e:print "Error Reason:", e.reasondefput(self,queue_name,queue_data):

params= {‘opt‘:‘put‘,‘name‘:queue_name}

r= self.__http_post(params,queue_data)if r == ‘HTTPSQS_PUT_OK‘:returnTruereturnFalsedefget(self,queue_name):

params= {‘opt‘:‘get‘,‘name‘:queue_name}

r= self.__http_get(params)if r == ‘HTTPSQS_GET_END‘:returnNonereturnrdefstatus(self,queue_name):

params= {‘opt‘:‘status‘,‘name‘:queue_name}return self.__http_get(params)defstatus_json(self,queue_name):

params= {‘opt‘:‘status_json‘,‘name‘:queue_name}return self.__http_get(params)defreset(self,queue_name):

params= {‘opt‘:‘reset‘,‘name‘:queue_name}return self.__http_get(params)defmaxqueue(self,queue_name,num):

params= {‘opt‘:‘maxqueue‘,‘name‘:queue_name,‘num‘:str(num)}return self.__http_get(params)defsynctime(self,queue_name,num):

params= {‘opt‘:‘synctime‘,‘name‘:queue_name,‘num‘:str(num)}return self.__http_get(params)

 类似资料: