当前位置: 首页 > 编程笔记 >

python并发和异步编程实例

祁飞翰
2023-03-14
本文向大家介绍python并发和异步编程实例,包括了python并发和异步编程实例的使用技巧和注意事项,需要的朋友参考一下

关于并发、并行、同步阻塞、异步非阻塞、线程、进程、协程等这些概念,单纯通过文字恐怕很难有比较深刻的理解,本文就通过代码一步步实现这些并发和异步编程,并进行比较。解释器方面本文选择python3,毕竟python3才是python的未来,并且python3用原生的库实现协程已经非常方便了。

1、准备阶段

下面为所有测试代码所需要的包

#! python3
# coding:utf-8

import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import asyncio
import aiohttp
import time
from time import ctime

在进行不同实现方式的比较时,实现场景就是在进行爬虫开发的时候通过向对方网站发起一系列的http请求访问,统计耗时来判断实现方式的优劣,具体地,通过建立通信套接字,访问新浪主页,返回源码,作为一次请求。先实现一个装饰器用来统计函数的执行时间:

def tsfunc(func):
  def wrappedFunc(*args,**kargs):
    start = time.clock()
    action = func(*args,**kargs)
    time_delta = time.clock() - start
    print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
    return action
  return wrappedFunc

输出的格式为:当前时间,调用的函数,函数的执行时间。

2、阻塞/非阻塞和同步/异步

这两对概念不是很好区分,从定义上理解:

阻塞:在进行socket通信过程中,一个线程发起请求,如果当前请求没有返回结果,则进入sleep状态,期间线程挂起不能做其他操作,直到有返回结果,或者超时(如果设置超时的话)。
非阻塞:与阻塞相似,只不过在等待请求结果时,线程并不挂起而是进行其他操作,即在不能立刻得到结果之前,该函数不会阻挂起当前线程,而会立刻返回。
同步:同步和阻塞比较相似,但是二者并不是同一个概念,同步是指完成事件的逻辑,是指一件事完成之后,再完成第二件事,以此类推…
异步:异步和非阻塞比较类似,异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者,实现异步的方式通俗讲就是“等会再告诉你”。

1)阻塞方式

回到代码上,首先实现阻塞方式的请求函数:

def blocking_way():
  sock = socket.socket()
  sock.connect(('www.sina.com',80))
  request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
  sock.send(request.encode('ascii'))
  response = b''
  chunk = sock.recv(4096)
  while chunk:
    response += chunk
    chunk = sock.recv(4096)
  return response

测试线程、多进程和多线程

# 阻塞无并发
@tsfunc
def sync_way():
  res = []
  for i in range(10):
    res.append(blocking_way())
  return len(res)
@tsfunc
# 阻塞、多进程
def process_way():
  worker = 10
  with futures.ProcessPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])
# 阻塞、多线程
@tsfunc
def thread_way():
  worker = 10
  with futures.ThreadPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])

运行结果:

[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328
[Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734
[Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727

可见与非并发的方式相比,启动10个进程完成10次请求访问耗费的时间最长,进程确实需要很大的系统开销,相比多线程则效果好得多,启动10个线程并发请求,比顺序请求速度快了6倍左右。

2)非阻塞方式

实现非阻塞的请求代码,与阻塞方式的区别在于等待请求时并不挂起而是直接返回,为了确保能正确读取消息,最原始的方式就是循环读取,知道读取完成为跳出循环,代码如下:

def nonblocking_way():
  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  sock.setblocking(False)
  try:
    sock.connect(('www.sina.com', 80))
  except BlockingIOError:
    pass
  request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
  data = request.encode('ascii')
  while True:
    try:
      sock.send(data)
      break
    except OSError:
      pass

  response = b''
  while True:
    try:
      chunk = sock.recv(4096)
      while chunk:
        response += chunk
        chunk = sock.recv(4096)
      break
    except OSError:
      pass

  return response

测试单线程异步非阻塞方式:

@tsfunc
def async_way():
  res = []
  for i in range(10):
    res.append(nonblocking_way())
  return len(res)

测试结果与单线程同步阻塞方式相比:

[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574
[Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886

非阻塞方式起到了一定的效果,但是并不明显,原因肯定是读取消息的时候虽然不是在线程挂起的时候而是在循环读取消息的时候浪费了时间,如果大部分时间读浪费了并没有发挥异步编程的威力,解决的办法就是后面要说的【事件驱动】

3、回调、生成器和协程

a、回调

class Crawler():
  def __init__(self,url):
    self.url = url
    self.sock = None
    self.response = b''

  def fetch(self):
    self.sock = socket.socket()
    self.sock.setblocking(False)
    try:
      self.sock.connect(('www.sina.com',80))
    except BlockingIOError:
      pass
    selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

  def connected(self,key,mask):
    selector.unregister(key.fd)
    get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
    self.sock.send(get.encode('ascii'))
    selector.register(key.fd,EVENT_READ,self.read_response)

  def read_response(self,key,mask):
    global stopped
    while True:
      try:
        chunk = self.sock.recv(4096)
        if chunk:
          self.response += chunk
          chunk = self.sock.recv(4096)
        else:
          selector.unregister(key.fd)
          urls_todo.remove(self.url)
          if not urls_todo:
            stopped = True
        break
      except:
        pass

def loop():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback(event_key,event_mask)
 @tsfunc
def callback_way():
  for url in urls_todo:
    crawler = Crawler(url)
    crawler.fetch()
  loop1()

这是通过传统回调方式实现的异步编程,结果如下:

[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374

b、生成器

class Crawler2:
  def __init__(self, url):
    self.url = url
    self.response = b''

  def fetch(self):
    global stopped
    sock = socket.socket()
    yield from connect(sock, ('www.sina.com', 80))
    get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))
    self.response = yield from read_all(sock)
    urls_todo.remove(self.url)
    if not urls_todo:
      stopped = True

class Task:
  def __init__(self, coro):
    self.coro = coro
    f = Future1()
    f.set_result(None)
    self.step(f)

  def step(self, future):
    try:
      # send会进入到coro执行, 即fetch, 直到下次yield
      # next_future 为yield返回的对象
      next_future = self.coro.send(future.result)
    except StopIteration:
      return
    next_future.add_done_callback(self.step)

def loop1():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback()

运行结果如下:

[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473

c、协程

def nonblocking_way():
  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  sock.setblocking(False)
  try:
    sock.connect(('www.sina.com', 80))
  except BlockingIOError:
    pass
  request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
  data = request.encode('ascii')
  while True:
    try:
      sock.send(data)
      break
    except OSError:
      pass

  response = b''
  while True:
    try:
      chunk = sock.recv(4096)
      while chunk:
        response += chunk
        chunk = sock.recv(4096)
      break
    except OSError:
      pass

  return response
@tsfunc
def asyncio_way():
    tasks = [fetch(host+url) for url in urls_todo]
    loop.run_until_complete(asyncio.gather(*tasks))
    return (len(tasks))

运行结果:

[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166

到此终于把并发和异步编程实例代码测试完,下边贴出全部代码,共读者自行测试,在任务量加大时,相信结果会大不一样。

#! python3
# coding:utf-8

import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import asyncio
import aiohttp
import time
from time import ctime

def tsfunc(func):
  def wrappedFunc(*args,**kargs):
    start = time.clock()
    action = func(*args,**kargs)
    time_delta = time.clock() - start
    print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
    return action
  return wrappedFunc

def blocking_way():
  sock = socket.socket()
  sock.connect(('www.sina.com',80))
  request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
  sock.send(request.encode('ascii'))
  response = b''
  chunk = sock.recv(4096)
  while chunk:
    response += chunk
    chunk = sock.recv(4096)
  return response

def nonblocking_way():
  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  sock.setblocking(False)
  try:
    sock.connect(('www.sina.com', 80))
  except BlockingIOError:
    pass
  request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
  data = request.encode('ascii')
  while True:
    try:
      sock.send(data)
      break
    except OSError:
      pass

  response = b''
  while True:
    try:
      chunk = sock.recv(4096)
      while chunk:
        response += chunk
        chunk = sock.recv(4096)
      break
    except OSError:
      pass

  return response


selector = DefaultSelector()
stopped = False
urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']


class Crawler():
  def __init__(self,url):
    self.url = url
    self.sock = None
    self.response = b''

  def fetch(self):
    self.sock = socket.socket()
    self.sock.setblocking(False)
    try:
      self.sock.connect(('www.sina.com',80))
    except BlockingIOError:
      pass
    selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)

  def connected(self,key,mask):
    selector.unregister(key.fd)
    get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
    self.sock.send(get.encode('ascii'))
    selector.register(key.fd,EVENT_READ,self.read_response)

  def read_response(self,key,mask):
    global stopped
    while True:
      try:
        chunk = self.sock.recv(4096)
        if chunk:
          self.response += chunk
          chunk = self.sock.recv(4096)
        else:
          selector.unregister(key.fd)
          urls_todo.remove(self.url)
          if not urls_todo:
            stopped = True
        break
      except:
        pass

def loop():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback(event_key,event_mask)


# 基于生成器的协程
class Future:
  def __init__(self):
    self.result = None
    self._callbacks = []

  def add_done_callback(self,fn):
    self._callbacks.append(fn)

  def set_result(self,result):
    self.result = result
    for fn in self._callbacks:
      fn(self)

class Crawler1():
  def __init__(self,url):
    self.url = url
    self.response = b''

  def fetch(self):
    sock = socket.socket()
    sock.setblocking(False)
    try:
      sock.connect(('www.sina.com',80))
    except BlockingIOError:
      pass

    f = Future()
    def on_connected():
      f.set_result(None)

    selector.register(sock.fileno(),EVENT_WRITE,on_connected)
    yield f
    selector.unregister(sock.fileno())
    get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))

    global stopped
    while True:
      f = Future()
      def on_readable():
        f.set_result(sock.recv(4096))
      selector.register(sock.fileno(),EVENT_READ,on_readable)
      chunk = yield f
      selector.unregister(sock.fileno())
      if chunk:
        self.response += chunk
      else:
        urls_todo.remove(self.url)
        if not urls_todo:
          stopped = True
        break


# yield from 改进的生成器协程
class Future1:
  def __init__(self):
    self.result = None
    self._callbacks = []

  def add_done_callback(self,fn):
    self._callbacks.append(fn)

  def set_result(self,result):
    self.result = result
    for fn in self._callbacks:
      fn(self)

  def __iter__(self):
    yield self
    return self.result

def connect(sock, address):
  f = Future1()
  sock.setblocking(False)
  try:
    sock.connect(address)
  except BlockingIOError:
    pass

  def on_connected():
    f.set_result(None)

  selector.register(sock.fileno(), EVENT_WRITE, on_connected)
  yield from f
  selector.unregister(sock.fileno())

def read(sock):
  f = Future1()

  def on_readable():
    f.set_result(sock.recv(4096))

  selector.register(sock.fileno(), EVENT_READ, on_readable)
  chunk = yield from f
  selector.unregister(sock.fileno())
  return chunk

def read_all(sock):
  response = []
  chunk = yield from read(sock)
  while chunk:
    response.append(chunk)
    chunk = yield from read(sock)
  return b''.join(response)

class Crawler2:
  def __init__(self, url):
    self.url = url
    self.response = b''

  def fetch(self):
    global stopped
    sock = socket.socket()
    yield from connect(sock, ('www.sina.com', 80))
    get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
    sock.send(get.encode('ascii'))
    self.response = yield from read_all(sock)
    urls_todo.remove(self.url)
    if not urls_todo:
      stopped = True


class Task:
  def __init__(self, coro):
    self.coro = coro
    f = Future1()
    f.set_result(None)
    self.step(f)

  def step(self, future):
    try:
      # send会进入到coro执行, 即fetch, 直到下次yield
      # next_future 为yield返回的对象
      next_future = self.coro.send(future.result)
    except StopIteration:
      return
    next_future.add_done_callback(self.step)

def loop1():
  while not stopped:
    events = selector.select()
    for event_key,event_mask in events:
      callback = event_key.data
      callback()


# asyncio 协程
host = 'http://www.sina.com'
loop = asyncio.get_event_loop()

async def fetch(url):
  async with aiohttp.ClientSession(loop=loop) as session:
    async with session.get(url) as response:
      response = await response.read()
      return response

@tsfunc
def asyncio_way():
    tasks = [fetch(host+url) for url in urls_todo]
    loop.run_until_complete(asyncio.gather(*tasks))
    return (len(tasks))

@tsfunc
def sync_way():
  res = []
  for i in range(10):
    res.append(blocking_way())
  return len(res)

@tsfunc
def process_way():
  worker = 10
  with futures.ProcessPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])

@tsfunc
def thread_way():
  worker = 10
  with futures.ThreadPoolExecutor(worker) as executor:
    futs = {executor.submit(blocking_way) for i in range(10)}
  return len([fut.result() for fut in futs])

@tsfunc
def async_way():
  res = []
  for i in range(10):
    res.append(nonblocking_way())
  return len(res)

@tsfunc
def callback_way():
  for url in urls_todo:
    crawler = Crawler(url)
    crawler.fetch()
  loop1()

@tsfunc
def generate_way():
  for url in urls_todo:
    crawler = Crawler2(url)
    Task(crawler.fetch())
  loop1()

if __name__ == '__main__':

  #sync_way()
  #process_way()
  #thread_way()
  #async_way()
  #callback_way()
  #generate_way()
  asyncio_way()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 本文向大家介绍python并发编程之多进程、多线程、异步和协程详解,包括了python并发编程之多进程、多线程、异步和协程详解的使用技巧和注意事项,需要的朋友参考一下 最近学习python并发,于是对多进程、多线程、异步和协程做了个总结。 一、多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行。即使是单CPU的计算机,也可以通过不停地在不同

  • 本文向大家介绍Python中的并发编程实例,包括了Python中的并发编程实例的使用技巧和注意事项,需要的朋友参考一下 一、简介   我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程。在任何给定的时刻,一个程序只做一件事

  • 问题内容: python中有异步编程的一般概念吗?我可以为一个函数分配一个回调,执行该回调并立即返回主程序流程,无论该函数执行需要多长时间? 问题答案: 在这里看看: Python异步编程 异步编程和扭曲简介 值得检查: asyncio(以前为Tulip)已被检入Python默认分支 于14-Mar-2018编辑 如今,Python具有asyncIO-内置的异步I / O,事件循环,协程和任务 。

  • 本文向大家介绍Python的Tornado框架异步编程入门实例,包括了Python的Tornado框架异步编程入门实例的使用技巧和注意事项,需要的朋友参考一下 Tornado Tornado 是一款非阻塞可扩展的使用Python编写的web服务器和Python Web框架, 可以使用Tornado编写Web程序并不依赖任何web服务器直接提供高效的web服务.所以Tornado不仅仅是一个web框

  • 同步 同步指的是线程之间的协作配合,以共同完成某个任务。在整个过程中,需要注意两个关键点:一是共享资源的访问, 二是访问资源的顺序。通过前面的介绍,我们已经知道了如何让多个线程访问共享资源,但并没介绍如何控制访问顺序,才不会出现错误。如果两个线程同时访问同一内存地址的数据,一个写,一个读,如果不加控制,写线程只写了一半,读线程就开始读,必然读到的数据是错误的,不可用的,从而造成程序错误,这就造成了

  • 介绍 Javascript 是一个单线程的编程语言,单线程的特点就是一次只能处理一件事情,当前代码任务耗时执行会阻塞后续代码的执行。异步编程则是一种事件驱动编程,请求调用函数或方法后,无需立即等待响应,可以继续执行其他任务,而之前任务响应返回后可以通过状态、通知和回调来通知调用者。 异步编程方法 js 中的异步编程方法有回调函数、事件处理函数、观察者、Promise、Generator、async