当前位置: 首页 > 编程笔记 >

Python使用Redis实现作业调度系统(超简单)

暨成双
2023-03-14
本文向大家介绍Python使用Redis实现作业调度系统(超简单),包括了Python使用Redis实现作业调度系统(超简单)的使用技巧和注意事项,需要的朋友参考一下

概述

Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。

Redis从它的许多竞争继承来的三个主要特点:

Redis数据库完全在内存中,使用磁盘仅用于持久性。

相比许多键值数据存储,Redis拥有一套较为丰富的数据类型。

Redis可以将数据复制到任意数量的从服务器。

Redis 优势

异常快速:Redis的速度非常快,每秒能执行约11万集合,每秒约81000+条记录。

支持丰富的数据类型:Redis支持最大多数开发人员已经知道像列表,集合,有序集合,散列数据类型。这使得它非常容易解决各种各样的问题,因为我们知道哪些问题是可以处理通过它的数据类型更好。

操作都是原子性:所有Redis操作是原子的,这保证了如果两个客户端同时访问的Redis服务器将获得更新后的值。

多功能实用工具:Redis是一个多实用的工具,可以在多个用例如缓存,消息,队列使用(Redis原生支持发布/订阅),任何短暂的数据,应用程序,如Web应用程序会话,网页命中计数等。

步入主题:

Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。

下面是实现上的想法

MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。

MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。

channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。

channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。

Master代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MyMaster():
def __init__(self):
pass
def start(self):
MyServerResultHandleThread().start()
MyServerDispatchThread().start()
class MyServerDispatchThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
for i in range(1, 100):
channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
print("Dispatch job %s to %s" % (str(i), channel))
ret = r.publish(channel, str(i))
if ret == 0:
print("Dispatch job %s failed." % str(i))
time.sleep(5)
class MyServerResultHandleThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(CHANNEL_RESULT)
for message in p.listen():
if message['type'] != 'message':
continue
print("Received finished job %s" % message['data'])
if __name__ == "__main__":
MyMaster().start()
time.sleep(10000)

说明

MyMaster类 - master主程序,用来启动dispatch和resulthandler的线程

MyServerDispatchThread类 - 派发作业线程,产生作业并派发到计算节点

MyServerResultHandleThread类 - 作业运行结果处理线程,从channel里获取作业结果并显示

Slave代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MySlave():
def __init__(self):
pass
def start(self):
for i in range(1, 4):
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
class MyJobWorkerThread(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.channel = channel
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(self.channel)
for message in p.listen():
if message['type'] != 'message':
continue
print("%s: Received dispatched job %s " % (self.channel, message['data']))
print("%s: Run dispatched job %s " % (self.channel, message['data']))
time.sleep(2)
print("%s: Send finished job %s " % (self.channel, message['data']))
ret = r.publish(CHANNEL_RESULT, message['data'])
if ret == 0:
print("%s: Send finished job %s failed." % (self.channel, message['data']))
if __name__ == "__main__":
MySlave().start()
time.sleep(10000)

说明

MySlave类 - slave节点主程序,用来启动MyJobWorkerThread的线程

MyJobWorkerThread类 - 从channel里获取派发的作业并将运行结果发送回master

测试

首先运行MySlave来定义派发作业channel。

然后运行MyMaster派发作业并显示执行结果。

有关Python使用Redis实现作业调度系统(超简单),小编就给大家介绍这么多,希望对大家有所帮助!

 类似资料:
  • 本文向大家介绍Python使用multiprocessing实现一个最简单的分布式作业调度系统,包括了Python使用multiprocessing实现一个最简单的分布式作业调度系统的使用技巧和注意事项,需要的朋友参考一下  mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。 介绍 P

  • 系统调用实现 系统调用的英文名字是System Call。操作系统为什么需要实现系统调用呢?其实这是实现了用户进程后,自然引申出来需要实现的操作系统功能。用户进程只能在操作系统给它圈定好的“用户环境”中执行,但“用户环境”限制了用户进程能够执行的指令,即用户进程只能执行一般的指令,无法执行特权指令。如果用户进程想执行一些需要特权指令的任务,比如通过网卡发网络包等,只能让操作系统来代劳了。于是就需要

  • 实现系统调用 目前,我们实现 sys_read sys_write 和 sys_exit 三个简单的系统调用。通过学习它们的实现,更多的系统调用也并没有多难。 用户程序中调用系统调用 在用户程序中实现系统调用比较容易,就像我们之前在操作系统中使用 sbi_call 一样,只需要符合规则传递参数即可。而且这一次我们甚至不需要参考任何标准,每个人都可以为自己的操作系统实现自己的标准。 例如,在实验指导

  • 本文向大家介绍springboot集成redis实现简单秒杀系统,包括了springboot集成redis实现简单秒杀系统的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了springboot集成redis实现简单秒杀系统的具体代码,供大家参考,具体内容如下 项目是有地址的,我会放到文章的最后面 1. 直接service,我们会介绍两种秒杀模式 2. service实现类 3. con

  • 到目前为止,我们根据它们的到达时间(在FCFS调度中)调度这些进程。 但是,SJF调度算法根据其突发时间安排进程。 在SJF调度中,就绪队列中可用进程列表中的突发时间最短的进程将在下一个进行调度。 然而,预测一个过程所需的突发时间是非常困难的,因此这个算法在系统中很难实现。 SJF的优势 最大吞吐量 最低的平均等候时间和周转时间 SJF的缺点 可能会面临饥饿问题 这是不可实现的,因为一个进程的确切

  • 本文向大家介绍详解使用SSM实现简单工作流系统之实现篇,包括了详解使用SSM实现简单工作流系统之实现篇的使用技巧和注意事项,需要的朋友参考一下 项目说明 本项目是依据《轻量级 Java EE 企业应用实战 第4版》的最后一章中的项目实现的,原本项目使用的框架是Struts2 + Spring 4 + Hibernate,因为本人在学习Spring MVC + Spring + Mybatis,所以