第三方类库Python接口
urequests
--- 网络请求模块
功能相关函数
urequests.request(method, url, data=None, json=None, headers={})
发送网络请求, 它会阻塞返回网络的响应数据,参数:
- method 建立网络请求的方法,例如
HEAD
,GET
,POST
,PUT
,PATCH
,DELETE
。 - url 网络请求的URL(网址)。
- \data**(可选)在请求正文中发送的字典或元组列表[(键,值)](将是表单编码的),字节或类文件对象。
- \json**(可选)在请求正文中发送的json数据。
- \headers**(可选)要与请求一起发送的HTTP标头字典。
urequests.head(url, **kw)
发送一个 HEAD 请求,返回类型是 request 的响应,参数:
- url 网络请求的URL(网址)。
- \*kw* request可选的参数
urequests.get(url, **kw)
发送一个 GET 请求,返回类型是 request 的响应,参数:
- url 网络请求的URL(网址)。
- \*kw* request可选的参数
urequests.post(url, **kw)
发送一个 POST 请求,返回类型是 request 的响应,参数:
- url 网络请求的URL(网址)。
- \*kw* request可选的参数
urequests.put(url, **kw)
发送一个 PUT 请求,返回类型是 request 的响应,参数:
- url 网络请求的URL(网址)。
- \*kw* request可选的参数
urequests.patch(url, **kw)
发送一个 PATCH 请求,返回类型是 request 的响应,参数:
- url 网络请求的URL(网址)。
- \*kw* request可选的参数
urequests.delete(url, **kw)
发送一个 DELETE 请求,返回类型是 request 的响应,参数:
- url 网络请求的URL(网址)。
- \*kw* request可选的参数
程序示例1:
import codey
import urequests as requests
import time
# 此处需填入自己路由器的 ssid 和 密码
codey.wifi.start('wifi_ssid', 'password')
codey.led.show(0,0,0)
while True:
if codey.wifi.is_connected():
codey.led.show(0,0,255)
res = requests.get(url='http://www.baidu.com/')
print(res.text)
time.sleep(3)
else:
codey.led.show(0,0,0)
程序示例2:
import codey
import urequests as requests
import time
# 此处需填入自己路由器的 ssid 和 密码
codey.wifi.start('wifi_ssid', 'password')
codey.led.show(0,0,0)
hour = minite = second = "00"
while True:
if codey.wifi.is_connected():
try:
res = requests.get(url = 'http://www.time.ac.cn/timeflash.asp?user=flash').text
hour_begin = res.find('<hour>') + len('<hour>')
hour_end = res.find('</hour>')
minite_begin = res.find('<minite>') + len('<minite>')
minite_end = res.find('</minite>')
second_begin = res.find('<second>') + len('<second>')
second_end = res.find('</second>')
if hour_begin > len('<hour>') and hour_end > hour_begin and \
minite_begin > len('<minite>') and minite_end > minite_begin and \
second_begin > len('<second>') and second_end > second_begin:
if hour_end - hour_begin == 1:
hour = '0' + res[hour_begin:hour_end]
elif hour_end - hour_begin == 2:
hour = res[hour_begin:hour_end]
if minite_end - minite_begin == 1:
minite = '0' + res[minite_begin:minite_end]
elif minite_end - minite_begin == 2:
minite = res[minite_begin:minite_end]
if second_end - second_begin == 1:
second = '0' + res[second_begin:second_end]
elif second_end - second_begin == 2:
second = res[second_begin:second_end]
print(hour + ":" + minite + ":" + second)
cur_time = hour + ':' + minite;
codey.display.show(cur_time)
except:
print("get error data")
else:
codey.led.show(0,0,0)
程序示例3:
import codey
import urequests as requests
import ujson
# user_account 和 password 的账户信息就是mblock的账户
def get_user_request_header():
post_data = ujson.dumps({ 'account': 'user_account', 'password': 'password'})
request_url = 'http://passport2.makeblock.com/v1/user/login'
res = requests.post(request_url, headers = {'content-type': 'application/json'}, data = post_data).json()
header_data = ''
if res['code'] == 0:
header_data = { "content-type": 'application/json; charset=utf-8', "devicetype": '1'}
header_data["uid"] = str(res['data']['user']['uid'])
header_data["deviceid"] = '30AEA427EC60'
return header_data
# 获取天气信息
# cid: 检查站id
# arg: 需要查询的信息
# aqi: 空气质量指数
# pm25: PM2.5浓度
# pm10: PM10浓度
# co: 一氧化碳浓度
# so2: 二氧化硫浓度
# no2: 二氧化氮浓度
deget_air_quality_info(cid, arg):
if not codey.wifi.is_connected():
return ''
post_data = ujson.dumps({ "cid": cid, "arg": arg})
request_url = 'http://msapi.passport3.makeblock.com/' + 'air/getone'
res = requests.post(request_url, headers = get_user_request_header(), data = post_data)
text = res.text
return float(text)
# 此处需填入自己路由器的 ssid 和 密码
codey.wifi.start('wifi_ssid', 'password')
codey.led.show(0,0,0)
while True:
if codey.wifi.is_connected():
codey.led.show(0,0,255)
data = get_air_quality_info('1539','aqi') #1539 表示深圳测试点
codey.display.show(data)
else:
codey.led.show(0,0,0)
mqtt
--- 消息队列遥测传输
功能相关函数
class mqtt.MQTTClient(client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={})
实例化MQTT客户端的接口对象,参数:
- client_id 连接到代理时使用的唯一客户端ID字符串,如果client_id为零长度或无,则将随机生成一个。在这种情况下,
connect
函数的clean_session参数必须为True。 - server 远程服务器的主机名或IP地址。
- \port**(可选)要连接的服务器主机的网络端口。 默认为1883,请注意,MQTT over SSL / TLS的默认端口是8883。
- \user**(可选)在服务器上注册的用户名。
- \password**(可选)在服务器上注册的密码。
- \keepalive**(可选)客户端的keepalive超时值。 默认为60秒。
- \ssl**(可选)是否使能 SSL/TLS 支持。
- \ssl_params**(可选)SSL/TLS 参数。
connect(clean_session=True)
将客户端连接到服务器。 这是一个阻塞函数,参数:
- clean_session 一个确定客户端类型的布尔值。 如果为
True
,服务器将在断开连接时删除有关此客户端的所有信息。 如果为False
,则客户端是持久客户端,并且当客户端断开连接时,将保留订阅信息和排队消息。
reconnect()
使用先前提供的详细信息重新连接到服务器。 在调用此函数之前,您必须调用 connect
。
disconnect()
与服务器断开连接。
ping()
测试客户端与服务器的连通性。
set_last_will(topic, msg, retain=False, qos=0)
设置要发送给服务器的遗嘱。 如果客户端断开而没有调用 disconnect()
,服务器将代表它发布消息。
- topic 该遗嘱消息发布的主题。
- msg 要发送的遗嘱消息。
- retain 如果设置为 True,遗嘱消息将被设置为该主题的 最后已知良好 /保留消息。
- qos 用于遗嘱的服务质量等级。
publish(topic, msg, retain=False, qos=0)
从客户端向代理发送消息,然后从代理发送到订阅匹配主题的任何客户端。 参数:
- topic 应该发布消息的主题。
- msg 要发送的实际消息。
- retain 如果设置为
True
,遗嘱消息将被设置为该主题的 最后已知良好 /保留消息。 - qos 要使用的服务质量水平。
subscribe(topic, qos=0)
订阅服务的某个主题,该模块提供了一些辅助函数,可以直接订阅和处理消息。例如 set_callback
。
- topic 要订阅消息的主题。
- qos 要使用的服务质量水平。
set_callback(f)
设置主题订阅的回调函数,当服务器响应我们的订阅请求时调用。参数:
- f 回调函数。
wait_msg()
等待服务器直到服务器无待处理消息。该函数是阻塞函数。
check_msg()
检查服务器是否有待处理消息。如果没有,直接返回,如果有的话,同 wait_msg 的处理。
程序示例:
from mqtt import MQTTClient
import codey
import time
MQTTHOST = "mq.makeblock.com"
MQTTPORT = 1883
# 任意填写
client_id = "20180911203800"
# 示例
Topic = "/sensors/temperature/#"
mqttClient = MQTTClient(client_id, MQTTHOST, port=MQTTPORT, user='test', password='test', keepalive=0, ssl=False)
# 连接MQTT服务器
def on_mqtt_connect():
mqttClient.connect()
# 发布消息
def on_publish(topic, payload, retain=False, qos = 0):
mqttClient.publish(topic, payload, retain, qos)
# 消息处理函数
def on_message_come(topic, msg):
print(topic + " " + ":" + str(msg))
codey.display.show(msg)
# subscribe 消息
def on_subscribe():
mqttClient.set_callback(on_message_come)
mqttClient.subscribe(Topic, qos = 1)
# 此处填入自己家的wiif账户和密码
codey.wifi.start('wifi_ssid', 'password')
codey.led.show(0,0,0)
codey.display.show(0)
while True:
if codey.wifi.is_connected():
on_mqtt_connect()
on_subscribe()
codey.led.show(0,0,255)
while True:
# Blocking wait for message
on_publish("/sensors/temperature/home", str(38), qos = 1)
mqttClient.wait_msg()
time.sleep(1)
else:
codey.led.show(0,0,0)