当前位置: 首页 > 知识库问答 >
问题:

Mqtt客户端不能同时处理多条消息

段干兴业
2023-03-14
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy import update
from sqlalchemy.ext.automap import generate_relationship
import sqlalchemy
import paho.mqtt.client as mqtt
import time  
#Function that define what to do on client conenction
def on_connect(client, userdata, rc):
    #Subscribe to all specified topics
    mqttc.subscribe(topic='/+/mysignals/sensors/+/')
def on_message(client,userdata,message):
    #Get the mysignals member id from the topic
    topic_split = message.topic.split('/')
    member_id = topic_split[1]
    session = Session(engine)
    sensor_id = topic_split[4]
    patient = session.query(Patient).filter(Patient.mysignalsid==member_id).first()
    if message.payload == None:
        payload = 0
    else:
        payload = message.payload
    if patient:
        current_time = time.time()
        if patient.id in pending.keys() and (current_time - pending[patient.id]['time_created']) <= 55:
            pending[patient.id]['record'].__dict__[sensor_id] = payload
            print time.time()
        else:
            pending.pop(patient.id,None)
            patientdata = PatientData()
            patientdata.__dict__[sensor_id] = payload
            print patientdata.__dict__[sensor_id]
            print payload
            print patientdata.temp
            patient.patientdata_collection.append(patientdata)
            session.add(patientdata)
            print time.time()
            pending.update({patient.id:{
                                    'time_created':time.time(),
                                    'record':patientdata,
                                    }})
        session.flush()
        session.commit()
        print('Wrote to database.')

pending = {}
Base = automap_base()
engine = create_engine('mysql+mysqlconnector://user:pass@localhost/db')
# reflect the tables
Base.prepare(engine, reflect=True)
Patient = Base.classes.patient
PatientData = Base.classes.patientdata
session = Session(engine)
#Create a mqtt client object
mqttc = mqtt.Client(client_id='database_logger',clean_session=False)
#Set mqtt client callbacks
mqttc.on_connect = on_connect
mqttc.on_message = on_message
#Set mqtt broker username and password
mqttc.username_pw_set('blah','blahblah')
#Connect to the mqtt broker with the specified hostname/ip adress
mqttc.connect('127.0.0.1')
mqttc.loop_forever()

ouput:

98
98
None
1500576377.3
Wrote to database.
1500576377.43
Wrote to database.

产出应为:

98
98
None
1500576377.3
Wrote to database.
25.4
25.4
25.4
1500576377.43
Wrote to database.

共有1个答案

斜向文
2023-03-14

这最终不是mqtt客户机的问题。代码是错误的,第二条消息没有写在数据库中。

为了让它工作,我不得不替换以下一行:

pending[patient.id]['record'].__dict__[sensor_id] = payload

有了这个:

setattr(pending[patient.id]['record'],sensor_id,payload)
session = Session(engine)
session.expunge_all()
session.commit()
 类似资料:
  • 我后来理解对了。实际上,我需要一条来自android客户端的MQTT消息发送到所有其他客户端,所以我想在消息正文中包含publish关键字,这是非常错误的。MQTT本身将接收到的消息发送给所有提供的客户端,如果客户端订阅了该主题的话。

  • 安装:composer require imiphp/imi-mqtt 项目配置文件: [ 'components' => [ 'MQTT' => 'Imi\MQTT', ], ] MQTT 功能要求 PHP >= 7.2 使用 事件监听类: <?php namespace Imi\MQTT\Test; use Imi\MQTT\Client\Con

  • 我是MQTT的新手,我有一些问题希望你们能帮助我。我正在做一个学校项目,需要我使用MQTT协议,程序需要用C语言编写。(只是一些背景信息) > MQTT客户端可以同时是发布服务器和订阅服务器吗?也就是说,在不断等待从代理接收消息并执行结果操作的同时,它还能够在需要时将消息发布到代理。 我对MQTT的理解是:MQTT发布者-->MQTT代理-->MQTT订阅者 用白痴的话来说,MQTT的异步模式到底

  • 最大消息长度为64K字节(尽管我可以将其减少到256字节)。 注意:这将运行在一个微小的嵌入式设备上,所以使用像ZMQ这样的消息传递层不是一个选项(没有足够的内存)。 我可以:

  • 假设我有一个出版商和两个消费者。每个消费者应该一次消费5条消息(并行)。 (一个交换,绑定到一个队列,直接模式) 发布者产生消息(1,2,3,...14,15) 消费者A消费(1,3,5,7,9) 消费者B消费(2,4,6,8,10) 消费者A处理完消息1并接收消息11 ...等 我怎样才能做到这一点?我意识到,消费者。接收事件仅在处理上一条消息时触发。 阅读rabbitmq文档时,这似乎正是我需

  • null 当MQTT代理变得不可用时,Paho MQTT客户机不能帮助我保证这些QoS2级别的消息将被重新传递,这是正确的说法吗? 因此,我如何区分以下情况,即Client.Publish导致了一个MqttException,其中Paho没有将消息持久化。 下面是它在飞行中坚持的地方 null 连接丢失(32109):PAHO保存消息 客户端当前正在断开连接(32102):PAHO丢失消息 等待服