当前位置: 首页 > 编程笔记 >

python 多进程队列数据处理详解

王佐
2023-03-14
本文向大家介绍python 多进程队列数据处理详解,包括了python 多进程队列数据处理详解的使用技巧和注意事项,需要的朋友参考一下

我就废话不多说了,直接上代码吧!

# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import time, random, os
import camera_person_num
 
MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue() 
 
 
# 连接MQTT服务器
def on_mqtt_connect():
  mqttClient.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient.loop_start()
 
 
# 消息处理函数
def on_message_come(lient, userdata, msg):
  # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))
  
  q.put(msg.payload.decode("utf-8")) # 放入队列
  print("产生消息", msg.payload.decode("utf-8"))
  # 消息处理开启多进程
  # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
  # p.start()
 
 
def consumer(q, pid):
  print("开启消费序列进程", pid)
  while True:
    msg = q.get()
    # p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
    # p.start()
    talk("/camera/person/num/result", msg, pid) 
 
 
# subscribe 消息订阅
def on_subscribe():
  mqttClient.subscribe("test123", 1) # 主题为"test"
  mqttClient.on_message = on_message_come # 消息到来处理函数
 
 
# publish 消息发布
def on_publish(topic, msg, qos):
  mqttClient.publish(topic, msg, qos);
 
 
# 多进程中发布消息需要重新初始化mqttClient
def talk(topic, msg, pid):
  cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
  t_max, t_mean, t_min = cameraPsersonNum.personNum()
  # time.sleep(20)
  print("消费消息", pid, msg) 
  mqttClient2 = mqtt.Client()
  mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient2.loop_start()
  mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
  mqttClient2.disconnect()
 
 
def main():
  
  on_mqtt_connect()
  on_subscribe()
  for i in range(1, 3):
    c1 = Process(target=consumer, args=(q, i))
    c1.start()
  while True:
    pass
 
 
if __name__ == '__main__':
  main()

以上这篇python 多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持小牛知识库。

 类似资料:
  • 默认情况下,C++容器应该是线程安全的。我必须错误地使用多线程,因为对于此代码: 我得到了:

  • 问题内容: 我正在尝试在Python中的多处理库中使用队列。执行下面的代码后(打印语句起作用),但是在调用Queue上的join之后,这些进程没有退出,并且仍然存在。我如何终止其余过程? 谢谢! 问题答案: 尝试这个:

  • 本文向大家介绍C#多线程处理多个队列数据的方法,包括了C#多线程处理多个队列数据的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了C#多线程处理多个队列数据的方法。分享给大家供大家参考。具体实现方法如下: 希望本文所述对大家的C#程序设计有所帮助。

  • 本文向大家介绍详解Java多线程处理List数据,包括了详解Java多线程处理List数据的使用技巧和注意事项,需要的朋友参考一下 实例1: 解决问题:如何让n个线程顺序遍历含有n个元素的List集合 实例2: List多线程并发读取读取现有的list对象 实例3: 多线程分段处理List集合 场景:大数据List集合,需要对List集合中的数据同标准库中数据进行对比,生成新增,更新,取消数据 解

  • 问题内容: 等待(不旋转)直到两个(多处理)队列中的任何一个都可用的最佳方法是什么(两者都驻留在同一系统上)? 问题答案: 似乎还没有一种正式的方式来解决这个问题。或至少不是基于此: http://bugs.python.org/issue3831 您可以尝试类似本文所进行的操作-访问基础管道文件句柄: http://haltcondition.net/?p=2319 然后使用选择。

  • 本文向大家介绍python数据预处理 :数据共线性处理详解,包括了python数据预处理 :数据共线性处理详解的使用技巧和注意事项,需要的朋友参考一下 何为共线性: 共线性问题指的是输入的自变量之间存在较高的线性相关度。共线性问题会导致回归模型的稳定性和准确性大大降低,另外,过多无关的维度计算也很浪费时间 共线性产生原因: 变量出现共线性的原因: 数据样本不够,导致共线性存在偶然性,这其实反映了缺