Django捕捉Redis的hset和hdel事件,并进行线程分配操作

尉迟国发
2023-12-01

前言

本人是django新手,最近因为项目需求Redis用来管理视频流地址,要搭建一个后台会捕捉到Redis的hash变化,增加时为其分配一个线程并把新增的流地址给到处理接口中,而删除时也会关闭对应的线程。这是我三天里从0开始学django研究出来的方法,拿出来分享一下,也是给自己留个足迹,这里也没考虑线程锁之类的数据安全问题,有需要的话可以自己根据需求修改加上去。以下主要代码为项目中新建的threadOP.py文件里的代码。

简述

实现上其实就是监听Redis的变化,要做到这一点可以参考这篇
10Redis键空间通知(keyspace notifications).
具体还要用点正则的知识,这些根据自己来修改吧,我的版本就是监听0到99数据库频道的数据修改消息。在监听到之后就分配线程给数据处理就行。

全局变量

#threadOP.py
global lock #数据库的进程锁
global mainThread #Redis监听线程
global processDic #进程字典

定义监听线程的类

#threadOP.py
class MainThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""
 
    def __init__(self,request):
        super(MainThread, self).__init__()
        push_service(request)#监听Redis函数
        self._stop_event = threading.Event()
 
    def stop(self):
        self._stop_event.set()
 
    def stopped(self):
        return self._stop_event.is_set()

全局变量的初始化函数

#threadOP.py
def init():
    global lock
    lock = RLock()
    global processDic
    processDic = {}

启动监听线程的函数

#threadOP.py
def start_main_thread(request):
    global mainThread
    
    init()
    try:
        print("Start: 启动主线程")
        mainThread = MainThread(request)
        mainThread.start()
        print('time to start')
        return HttpResponse('starting thread sucessful')
        
    except Exception:
        
        print("Error: 主线程出错")
        traceback.print_exc()
        return HttpResponse('cant start tread')

关闭所有线程的函数

#threadOP.py
def stop_main_thread(request):
    global mainThread

    print("尝试关闭所有进程")

    try:      
        for p in processDic.values():
            p.terminate()
        
        for p in processDic.values():
            p.close()

        mainThread.stop()
        mainThread.join()
        return HttpResponse('done')
        
    except Exception:
        print("Error: 线程关闭失败")
        traceback.print_exc()
        return HttpResponse('error')

关闭指定数据处理进程的函数

#threadOP.py
def stop_process(id):
    global processDic

    processDic[id].terminate()
    processDic[id].close()

监听Redis的事件函数

我这里Redis中监听的hash键是’test’,根据自己的修改

#threadOP.py
def push_service(request):
    print("start push_service")

    global processDic
    global threds
    global lock
    global pool

	# 订阅频道
    con = get_redis_connection("default")
    ps = con.pubsub()
    ps.psubscribe('__key*__:*')  #订阅所有消息


    # 为已有的流地址分配线程
    if con.hlen('test') >0: #test为要监听的key值
        for i in con.hgetall('test'):
            p = Process(target=Event,args=(i.decode(),con.hget('test',i).decode(),lock,))
            processDic[i.decode()]=p
            p.start()
            

	# 监听redis的变化
    for item in ps.listen():

        if item['type'] == 'pmessage':
            channel = item['channel'].decode()
            message = item['data'].decode()
            
            #redis添加时的操作
            if re.match(r'__keyevent@([0-9]{1,2})__:hset',channel,re.I) and message=="test":#正则匹配0-99号数据库的通道里的hset事件
                count = 0
                len = con.hlen('test')
                for i in con.hgetall('test'):
                    count+=1
                    if count == len:
                        print("starting process for ",i.decode())
                        p = Process(target=Event, args=(i.decode(),con.hget(message,i).decode(),lock,))
                        processDic[i.decode()] = p
                        p.start()

            #redis删除时的操作
            elif re.match(r'__keyevent@([0-9]{1,2})__:hdel',channel,re.I) and message=="test":#正则匹配0-99号数据库的通道里的hset事件
                fieldList=[]
                for field in con.hgetall('test'):
                    fieldList.append(field.decode())
                #寻找被删除的那一条记录
                for id in processDic:
                    if id not in fieldList:
                        processDic[id].terminate()

数据处理函数

#threadOP.py
def Event(id,url,lock):
	print('父进程 id:', os.getppid())  # 获取父进程id()
    print('当前子进程 id:', os.getpid())  # 获取自己的进程id
    print('------------------------')
	...

在url.py文件中添加启动配置

我是直接用url来启动服务的,可以自己根据需求改

#urls.py
from django.urls import path
from . import threadOP
urlpatterns = [
	...
	path('start/',threadOP.start_main_thread),
	path('stop/',threadOP.stop_main_thread),
]

结尾

django配置Redis那些网上一堆,我这里就不多阐述了。这是我第一次发文章,希望能帮到大家,或者起码给同是Django初学者的一点启发吧。


更新

之前文章是用线程来做的,后来发现django原生单线程,所以就改用mutilprocessing,然后也将开启与存储进程简化了。

 类似资料: