Python 是一种广泛使用的解释型、高级编程、通用型编程语言。Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或者关键词)。Python 让开发者能够用更少的代码表达想法,不管是小型还是大型程序,该语言都试图让程序的结构清晰明了。
MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。
本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。
本项目使用 Python 3.6 进行开发测试,读者可用如下命令确认 Python 的版本。
➜ ~ python3 --version Python 3.6.7
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库,它在 Python 2.7 或 3.x 上为客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。
Pip 是 Python 包管理工具,该工具提供了对 Python 包的查找、下载、安装、卸载的功能。
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
本文将使用 EMQ X 提供的 免费公共 MQTT 服务器 ,该服务基于 EMQ X 的 MQTT 物联网云平台 创建。服务器接入信息如下:
from paho.mqtt import client as mqtt_client
设置 MQTT Broker 连接地址,端口以及 topic,同时我们调用 Python random.randint 函数随机生成 MQTT 客户端 id。
broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" client_id = f'python-mqtt-{random.randint(0, 1000)}'
编写连接回调函数 on_connect ,该函数将在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。通常同时我们将创建一个 MQTT 客户端,该客户端将连接到 broker.emqx.io 。
def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) # Set Connecting Client ID client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client
首先定义一个 while 循环语句,在循环中我们将设置每秒调用 MQTT 客户端 publish 函数向 /python/mqtt 主题发送消息。
def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1
编写消息回调函数 on_message ,该函数将在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message
# python 3.6 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
# python 3.6 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 1000)}' def connect_mqtt(): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic, msg) # result: [0, 1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run() 消息订阅代码 # python3.6 import random from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0, 100)}' def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker, port) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
运行 MQTT 消息发布代码,我们将看到客户端连接成功,并且成功将消息发布。
python3 pub.py
运行 MQTT 消息订阅代码,我们将看到客户端连接成功,并且成功接收到发布的消息。
python3 sub.py
至此,我们完成了使用 paho-mqtt 客户端连接到 公共 MQTT 服务器 ,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。
与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现,使用 Python 您可以减少代码上的逻辑复杂度,降低与设备的交互成本。我们相信在物联网领域 Python 将会有更广泛的应用。
接下来我们将会陆续发布更多关于物联网开发及 Python 的相关文章,敬请关注。
以上就是在 Python 中使用 MQTT的方法的详细内容,更多关于Python 中使用 MQTT的资料请关注小牛知识库其它相关文章!
我该怎么修好它?
本文向大家介绍在Python中使用Neo4j的方法,包括了在Python中使用Neo4j的方法的使用技巧和注意事项,需要的朋友参考一下 Neo4j是面向对象基于Java的 ,被设计为一个建立在Java之上、可以直接嵌入应用的数据存储。此后,其他语言和平台的支持被引入,Neo4j社区获得持续增长,获得了越来越多的技术支持者。目前已支持.NET、Ruby、Python、Node.js及PHP等。因此,
本文向大家介绍Random 在 Python 中的使用方法,包括了Random 在 Python 中的使用方法的使用技巧和注意事项,需要的朋友参考一下 1.random.random(): 会随机生成0-1之间的小数 例如: 2.random.uniform(min,max): 会随机生成 min - max 之间的小数,其中min 和 max 的位置可以互换而不会报错: 3.random.ran
本文向大家介绍在 Python 应用中使用 MongoDB的方法,包括了在 Python 应用中使用 MongoDB的方法的使用技巧和注意事项,需要的朋友参考一下 在这篇文章中,将向您展示如何使用Python链接目前主流的MongoDB(V3.4.0)数据库,主要使用PyMongo(v3.4.0)和MongoEngine(V0.10.7)。同时比较SQL和NoSQL。 英文原文:https://r
我还要求通过启用TLS/SSL(根据Microsoft Azure文档:https://docs.Microsoft.com/en-us/Azure/iot-hub/iot-hub-mqtt-support#tlsssl-configuration)(如https://github.com/mqttjs/mqtt.js#client上的mqtt.js文档所述)。 如何使用第三方MQTT库和SAS令
mosquitto.conf将最大飞行消息设置为0,持久性为true。 发布服务器QOS=2 用户QOS=2 保持活力=60 还有其他参数我应该看吗?