当前位置: 首页 > 编程笔记 >

python分布式编程实现过程解析

闾丘卓
2023-03-14
本文向大家介绍python分布式编程实现过程解析,包括了python分布式编程实现过程解析的使用技巧和注意事项,需要的朋友参考一下

分布式编程的难点在于:

1.服务器之间的通信,主节点如何了解从节点的执行进度,并在从节点之间进行负载均衡和任务调度;

2.如何让多个服务器上的进程访问同一资源的不同部分进行执行

第一部分涉及到网络编程的底层细节

第二个问题让我联想到hdfs的一些功能。

首先分布式进程还是解决的是单机单进程无法处理的大数据量大计算量的问题,希望能加通过一份代码(最多主+从两份)来并行执行一个大任务。

这就面临两个问题,首先将程序分布到多台服务器,其次将输入数据分配给多台服务器。

第一个问题相对比较简单,毕竟程序一般不会太长,即便是超级jar包的spark程序,也不过百兆。

但数据里不同,如今企业级别的数据动辄GB、TB,如果在分布式程序执行之前首先要进行大容量数据的转移,显然是不可取的。

这时候我们就需要一个中央共享数据源,所有服务器都可以对这个数据源进行并行存取(块block),这就已经非常接近hdfs的功能。

因为在hdfs中,集群中的多台服务器共享同一个hdfs,每台机器访问hdfs就像访问本地数据一样(还是稍微慢一点);

计算任务执行完之后,每台服务器还可以将自己的计算结果写回hdfs,每台服务器的结果被存储成了结果目录中的小文件

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
  pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
  n = random.randint(0, 10000)
  print('Put task %d...' % n)
  task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
  r = result.get(timeout=10)
  print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
  pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
  try:
    n = task.get(timeout=1)
    print('run task %d * %d...' % (n, n))
    r = '%d * %d = %d' % (n, n, n*n)
    time.sleep(1)
    result.put(r)
  except Queue.Empty:
    print('task queue is empty.')
# 处理结束:
print('worker exit.')

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 本文向大家介绍在Python程序中实现分布式进程的教程,包括了在Python程序中实现分布式进程的教程的使用技巧和注意事项,需要的朋友参考一下 在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。 Python的multiprocessing模块不但支持多进程,其中man

  • 本文向大家介绍Vue发布订阅模式实现过程图解,包括了Vue发布订阅模式实现过程图解的使用技巧和注意事项,需要的朋友参考一下 vue项目中不同组件间通信一般使用vuex,通常情况下vuex和EventBus不应该混用,不过某些场景下不同组件间只有消息的交互,这时使用EventBus消息通知的方式就更合适一些。 图解 html Dvue.js 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大

  • 分布式程序是那些旨在在计算机网络上运行并且只能通过消息传递协调其活动的程序。 我们可能想要编写分布式应用程序的原因有很多。 这里是其中的一些。 Performance - 我们可以通过安排程序的不同部分在不同的机器上并行运行来使程序更快。 Reliability - 我们可以通过将系统结构化以在多台机器上运行来制造容错系统。 如果一台机器出现故障,我们可以继续使用另一台机器 Scalability

  • 本文向大家介绍Kafka多节点分布式集群搭建实现过程详解,包括了Kafka多节点分布式集群搭建实现过程详解的使用技巧和注意事项,需要的朋友参考一下 上一篇分享了单节点伪分布式集群搭建方法,本篇来分享一下多节点分布式集群搭建方法。多节点分布式集群结构如下图所示:   为了方便查阅,本篇将和上一篇一样从零开始一步一步进行集群搭建。 一、安装Jdk   具体安装步骤可参考linux安装jdk。 二、安装

  • 本文向大家介绍Python线程协作threading.Condition实现过程解析,包括了Python线程协作threading.Condition实现过程解析的使用技巧和注意事项,需要的朋友参考一下 领会下面这个示例吧,其实跟java中wait/nofity是一样一样的道理 Condition的底层实现了__enter__和 __exit__协议.所以可以使用with上下文管理器 由Condi

  • 本文向大家介绍Java编程redisson实现分布式锁代码示例,包括了Java编程redisson实现分布式锁代码示例的使用技巧和注意事项,需要的朋友参考一下 最近由于工作很忙,很长时间没有更新博客了,今天为大家带来一篇有关Redisson实现分布式锁的文章,好了,不多说了,直接进入主题。 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现

  • 本文向大家介绍Javascript异步编程async实现过程详解,包括了Javascript异步编程async实现过程详解的使用技巧和注意事项,需要的朋友参考一下 async官方DOC 介绍 node安装 npm install async --save 使用 var async = require('async') js文件 https://github.com/caolan/async/tre

  • 本章描述如何编写运行于Erlang节点网络上的分布式Erlang程序。我们描述了用于实现分布式系统的语言原语。Erlang进程可以自然地映射到分布式系统之中;同时,之前章节所介绍的Erlang并发原语和错误检测原语在分布式系统和单节点系统中仍保持原有属性。 动机 我们有很多理由去编写分布式应用,比如: 速度 我们可以把我们的程序切分成能够分别运行于多个不同节点的几个部分。比如,某个编译器可以将一个