本人是django新手,最近因为项目需求Redis用来管理视频流地址,要搭建一个后台会捕捉到Redis的hash变化,增加时为其分配一个线程并把新增的流地址给到处理接口中,而删除时也会关闭对应的线程。这是我三天里从0开始学django研究出来的方法,拿出来分享一下,也是给自己留个足迹,这里也没考虑线程锁之类的数据安全问题,有需要的话可以自己根据需求修改加上去。以下主要代码为项目中新建的threadOP.py文件里的代码。
实现上其实就是监听Redis的变化,要做到这一点可以参考这篇
10Redis键空间通知(keyspace notifications).
具体还要用点正则的知识,这些根据自己来修改吧,我的版本就是监听0到99数据库频道的数据修改消息。在监听到之后就分配线程给数据处理就行。
#threadOP.py
global lock #数据库的进程锁
global mainThread #Redis监听线程
global processDic #进程字典
#threadOP.py
class MainThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self,request):
super(MainThread, self).__init__()
push_service(request)#监听Redis函数
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
#threadOP.py
def init():
global lock
lock = RLock()
global processDic
processDic = {}
#threadOP.py
def start_main_thread(request):
global mainThread
init()
try:
print("Start: 启动主线程")
mainThread = MainThread(request)
mainThread.start()
print('time to start')
return HttpResponse('starting thread sucessful')
except Exception:
print("Error: 主线程出错")
traceback.print_exc()
return HttpResponse('cant start tread')
#threadOP.py
def stop_main_thread(request):
global mainThread
print("尝试关闭所有进程")
try:
for p in processDic.values():
p.terminate()
for p in processDic.values():
p.close()
mainThread.stop()
mainThread.join()
return HttpResponse('done')
except Exception:
print("Error: 线程关闭失败")
traceback.print_exc()
return HttpResponse('error')
#threadOP.py
def stop_process(id):
global processDic
processDic[id].terminate()
processDic[id].close()
我这里Redis中监听的hash键是’test’,根据自己的修改
#threadOP.py
def push_service(request):
print("start push_service")
global processDic
global threds
global lock
global pool
# 订阅频道
con = get_redis_connection("default")
ps = con.pubsub()
ps.psubscribe('__key*__:*') #订阅所有消息
# 为已有的流地址分配线程
if con.hlen('test') >0: #test为要监听的key值
for i in con.hgetall('test'):
p = Process(target=Event,args=(i.decode(),con.hget('test',i).decode(),lock,))
processDic[i.decode()]=p
p.start()
# 监听redis的变化
for item in ps.listen():
if item['type'] == 'pmessage':
channel = item['channel'].decode()
message = item['data'].decode()
#redis添加时的操作
if re.match(r'__keyevent@([0-9]{1,2})__:hset',channel,re.I) and message=="test":#正则匹配0-99号数据库的通道里的hset事件
count = 0
len = con.hlen('test')
for i in con.hgetall('test'):
count+=1
if count == len:
print("starting process for ",i.decode())
p = Process(target=Event, args=(i.decode(),con.hget(message,i).decode(),lock,))
processDic[i.decode()] = p
p.start()
#redis删除时的操作
elif re.match(r'__keyevent@([0-9]{1,2})__:hdel',channel,re.I) and message=="test":#正则匹配0-99号数据库的通道里的hset事件
fieldList=[]
for field in con.hgetall('test'):
fieldList.append(field.decode())
#寻找被删除的那一条记录
for id in processDic:
if id not in fieldList:
processDic[id].terminate()
#threadOP.py
def Event(id,url,lock):
print('父进程 id:', os.getppid()) # 获取父进程id()
print('当前子进程 id:', os.getpid()) # 获取自己的进程id
print('------------------------')
...
我是直接用url来启动服务的,可以自己根据需求改
#urls.py
from django.urls import path
from . import threadOP
urlpatterns = [
...
path('start/',threadOP.start_main_thread),
path('stop/',threadOP.stop_main_thread),
]
django配置Redis那些网上一堆,我这里就不多阐述了。这是我第一次发文章,希望能帮到大家,或者起码给同是Django初学者的一点启发吧。
之前文章是用线程来做的,后来发现django原生单线程,所以就改用mutilprocessing,然后也将开启与存储进程简化了。