当前位置: 首页 > 工具软件 > eventlet > 使用案例 >

Python协程、Eventlet原理剖析及并发场景下性能测试

陈俊誉
2023-12-01

1  协程

1.1  什么是协程

       协程,又称微线程,纤程。英文名Coroutine。协程是一条执行序列,拥有自己独立的栈、局部变量和指令指针,同时又与其他的协同程序共享全局变量。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程看上去也是子程序,但执行过程中,在“子程序”(协程)内部可中断,然后转而执行别的“子程序”,在适当的时候再返回来接着执行。

1.2  线程与协程

       一个线程可以多个协程,一个进程也可以单独拥有多个协程,这样python中则能使用多核CPU。线程进程都是同步机制,而协程则是异步。协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态,协程需要人为的设置使其互相让渡CPU控制权,而不是抢占。协程既能够共享数据结构,又不需要显式的互斥控制,因为只有当一个协程让出了控制权后其他的协程才能访问彼此共享的数据结构。

1.3  协程的优点

  • 每个协程都有自己私有的栈和局部变量;
  • 同一时间只有一个协程在运行,由于在同一个线程上,因此可以避免竞争关系而使用锁;
  • 顺序可控,完全是由程序控制执行的顺序。而通常多线程一旦启动,运行时序是没法预测的,因此通常会给测试所有的程序带来问题。

2  Eventlet

2.1  简介

官方解释:

Eventlet is a concurrent networking library for Python that allows you to change how you run your code, not how you write it.

  • It uses epoll or kqueue or libevent for highly scalable non-blocking I/O.
  • Coroutines ensure that the developer uses a blocking style of programming that is similar to threading, but provide the benefits of non-blocking I/O.
  • The event dispatch is implicit, which means you can easily use Eventlet from the Python interpreter, or as a small part of a larger application.

大意:高度可扩展的非阻塞I / O,这意味着非常适合高并发!!!

       Eventlet改写了系统IO库,并通过一些python的魔术方法和其他方法,将这些改写的IO库patch到spawn产生的GreenThread(绿色线程,即协程)上,从而在代码执行IO操作的时候,实现自动非阻塞。

2.2  Greenthread Spawn

eventlet.spawn(func,* args,** kw)
       这将启动一个greenthread来调用func。产生多个greenthreads可以并行完成工作。from的返回值spawn是一个greenthread.GreenThread对象,可用于检索func的返回值。

eventlet.spawn_n(func,* args,** kw)
      作用与spawn()相同,但无法知道函数如何终止(即没有返回值或异常)。这使执行速度更快。

2.3  Greenthread Control

eventlet.sleep(seconds=0)
       挂起当前的绿色线程,并让其他有机会进行处理。

eventlet.GreenPool
       池控制并发。在应用程序中,仅消耗有限数量的内存,或者限制一部分代码保持打开状态的连接数量,以便为其余部分保留更多空间,或者在遇到不可预测的输入时表现一致,这在应用程序中很常见数据。

eventlet.GreenPile
       GreenPile对象代表工作块。本质上,GreenPile是一个迭代器,可以填充工作,并且在以后读取结果。

2.4  Patching Functions

eventlet.import_patched(modulename, *additional_modules, **kw_additional_modules)

        以确保模块使用标准库模块的“绿色”版本的方式导入模块,以使所有内容都畅通无阻地工作。唯一需要的参数是要导入的模块的名称。具体用法待会会看到。

eventlet.monkey_patch(all=True, os=False, select=False, socket=False, thread=False, time=False)
       全局修补某些系统模块以使其对Greenthread友好。关键字参数提供了对修补哪些模块的控制。如果all为True,则修补所有模块,而不考虑其他参数。如果为False,则其余关键字参数控制标准库中特定子节的修补。多次调用monkey_patch是安全的。

       本文只列出了常用接口函数,参阅更多请访问官方文档:http://eventlet.net/doc/index.html

3  并发下性能测试

       环境:python2.7、win7+centos7*2

       本次测试使用ssh远程连接两台主机,上传一个文件,目的是更直观的看到Eventlet的非阻塞I/O运行方式,以及并发状态下的性能。废话不多说,直接上代码!!

# !/usr/bin/env python2.7.16
# -*- coding: UTF-8 -*-
# @author: hkf
# @License:
# @file: eventlet_test.py
# @time: 2020/4/23 19:40

import sys
import datetime
import eventlet
import paramiko
# 默认不加任何参数的情况下,所有的module都会被patch。读者可以试试注释以后的运行结果
eventlet.monkey_patch()


class SSHError(Exception):
    pass


class SSHConnectError(SSHError):
    pass


class SSHCmdError(SSHError):
    pass


class SSHDownloadError(SSHError):
    pass


class SSHUploadError(SSHError):
    pass


class SSHConnection(object):
    """
    SFTP传输管理类,负责SFTP操作的管理。
    """

    # 这里的host不能为空,否则后面会出现gaierror错误
    # @ArgumentValidator(rule_dict={
    #     ValueError: {
    #         'host': [NotNone(), InstanceOf(basestring)],
    #         'port': [InstanceOf(int), Range(0, 65535)],
    #         'username': [NotNone(), InstanceOf(basestring)],
    #         'password': [InstanceOf(basestring)]
    #     },
    # })
    def __init__(self, host, port, username, password):
        """
        初始化类需要的一些属性。
        :param host:服务器IP
        :param port:服务器端口
        :param username:用户名
        :param password:密码
        """

        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self.__transport = None

    def connect(self):
        """
        通过SSH协议连接到远端服务器。
        :return:
        """
        try:
            transport = paramiko.Transport(self._host, self._port)
        except paramiko.ssh_exception.SSHException as ex:
            # logging.error("Build transport error,error is {error}".format(error=ex))
            raise SSHConnectError("Build transport error,error is {error}".format(error=ex))(None).with_traceback(
                sys.exc_info()[2])

        try:
            transport.connect(username=self._username, password=self._password)
            self.__transport = transport
        except paramiko.ssh_exception.AuthenticationException as ex:
            # logging.error("SSH connect error,error is {error}".format(error=ex))
            raise SSHConnectError("SSH connect error,error is {error}".format(error=ex))(None).with_traceback(
                sys.exc_info()[2])

    def close(self):
        """
        关闭和远端服务器的SSH连接。
        :return:
        """
        if self.__transport:
            self.__transport.close()

    # @ArgumentValidator(rule_dict={
    #     ValueError: {
    #         'local_path': [NotNone(), InstanceOf(basestring), PathExist()],
    #         'remote_path': [NotNone(), InstanceOf(basestring)]
    #     },
    # })
    def upload(self, local_path, remote_path):
        """
        上传本地的文件到远端服务器。
        :param local_path:本地存放文件的路径
        :param remote_path:远端服务器存放文件的路径
        :return:
        """
        try:
            sftp = paramiko.SFTPClient.from_transport(self.__transport)
        except AttributeError as ex:
            # logging.error("SFTP upload sftpclient fail, error info is :{error}".format(error=ex))
            raise SSHUploadError("SFTP upload sftpclient fail, error info is :{error}".format(error=ex))
        try:

            sftp.put(local_path, remote_path)
            print(self._host, 'block')
        except IOError as ex:
            # logging.error("SFTP upload file fail, error info is :{error}".format(error=ex))
            raise SSHUploadError("SFTP upload file fail, error info is :{error}".format(error=ex))


def con_ssh(ip, username, password):
    print(ip, 'block')
    ssh_build = SSHConnection(ip, 22, username, password)
    ssh_build.connect()
    print(ip, 'block')
    ssh_build.upload('D:/random_results_total.rar', '/home/random_results_total.rar')
    ssh_build.close()


def main():
    # 计时器开始计时
    start_time = datetime.datetime.now()
    # 创建"协程池"(类似于线程池,但系统会自动进行非阻塞调度)
    pool = eventlet.GreenPool(1000)
    # 添加任务
    pool.spawn_n(con_ssh, '10.230.4.122', '****', '****')
    pool.spawn_n(con_ssh, '10.230.4.111', '****', '****')
    # 等待全部任务运行完成
    pool.waitall()
    # con_ssh('10.230.4.122', '****', '****')
    # con_ssh('10.230.4.111', '****', '****')
    # 计时器停止计时
    end_time = datetime.datetime.now()
    interval = (end_time - start_time).seconds
    print(interval)


if __name__ == "__main__":
    main()

运行结果:

('10.230.4.122', 'block')
('10.230.4.111', 'block')
('10.230.4.122', 'block')
('10.230.4.111', 'block')
('10.230.4.122', 'block')
('10.230.4.111', 'block')
14

        分析:122运行到ssh连接时阻塞,此时返回去调用111的ssh连接,等到122连接结束时,开始向122上传文件,又一次阻塞,此时111的ssh连接已完成,继续去执行111到上传操作,随后122和111的上传任务相继结束,程序结束!运行时间14秒

       之前为什么说是非阻塞,为什么说有利于高并发,一目了然!

接着我们尝试常规的阻塞执行方式:

def main():
    # 计时器开始计时
    start_time = datetime.datetime.now()
    # 创建"协程池"(类似于线程池,但系统会自动进行非阻塞调度)
    # pool = eventlet.GreenPool(1000)
    # # 添加任务
    # pool.spawn_n(con_ssh, '10.230.4.122', '****', '****')
    # pool.spawn_n(con_ssh, '10.230.4.111', '****', '****')
    # # 等待全部任务运行完成
    # pool.waitall()
    con_ssh('10.230.4.122', '****', '****')
    con_ssh('10.230.4.111', '****', '****')
    # 计时器停止计时
    end_time = datetime.datetime.now()
    interval = (end_time - start_time).seconds
    print(interval)

运行结果:

('10.230.4.122', 'block')
('10.230.4.122', 'block')
('10.230.4.122', 'block')
('10.230.4.111', 'block')
('10.230.4.111', 'block')
('10.230.4.111', 'block')
16

        不用解释大家也清楚了,快了大概2秒,本次测试只有两台机器,当在高并发环境下及I/O过程比较久的情况下,效果会更明显!!

 

 

 类似资料: