当前位置: 首页 > 工具软件 > Locust > 使用案例 >

Locust 之 User

薛扬
2023-12-01

        User类代表虚拟用户,User对象被孵化出来攻击需要进行压力测试的系统。我们来看看它的真身。User对象的行为取决于它内部的tasks。即可以直接在User类内部通过@tasks装饰器修饰方法的方式来声明tasks,也可以通过设置tasks属性来声明。

        User类通常应该被定义了一些列client的子类继承。比如当压力测试一个http系统时,你可能就想要使用HttpUser类。

一、成员变量

        1、host: Optional[str] = None

        代表被攻击的主机

        2、wait_time = constant(0)

        wait_time其实是一个函数类型,返回的是连续两个task执行的间隔时间,单位是秒。可以被覆写。

        3、tasks: List[Union[TaskSet, Callable]] = []

        这个属性就和TaskSet中的tasks属性定义的是一模一样了。

        4、weight = 1

        在执行任务时,user被旋转的可能性,值越大,越可能被选中

        5、abstract = True

        如果abstract = True,说明这个类将被子类继承。并且locust在测试过程中不会孵化这个类的模拟用户。

二、成员函数

1、__init__(self, environment) 构造器

def __init__(self, environment):
    super().__init__()
    self.environment = environment
    """A reference to the :py:class:`Environment <locust.env.Environment>` in which this user is running"""
    self._state = None
    self._greenlet: greenlet.Greenlet = None
    self._group: Group
    self._taskset_instance: TaskSet = None      
  •         创建User对象需要传入environment对象,这个对象具体是什么后面再研究;
  •         初始化user的状态self._state = None;
  •         初始化Greenlet对象self._greenlet: greenlet.Greenlet = None,这个涉及到协程,要好好研究下!
  •         初始化所属组self._group: Group
  •         最后初始化一个TaskSet任务集合对象self._taskset_instance: TaskSet = None

2、on_start(self)和on_stop(self) 开始和结束测试

        实现体为空,由子类实现。

3、run(self) 任务执行器

@final
def run(self):
    self._state = LOCUST_STATE_RUNNING
    self._taskset_instance = DefaultTaskSet(self)
    try:
        # run the TaskSet on_start method, if it has one
        self.on_start()

        self._taskset_instance.run()
    except (GreenletExit, StopUser):
        # run the on_stop method, if it has one
        self.on_stop()

        首先使用了@final注解,表示这个函数不能被子类覆写。所以user执行任务的处理逻辑是不变的。

        1、设置状态为running,表示要开始进行测试self._state = LOCUST_STATE_RUNNING;

        2、创建taskset实例,使用的是DefaultTaskSet类,传入User实例自身self作为TaskSet构造器的parent参数。即User是TaskSet的父亲。

User.py

self._taskset_instance = DefaultTaskSet(self)

TaskSet.py

def __init__(self, parent: "User") -> None:

        3、执行self.on_start() 执行user实例自身的on_start()方法;

        4、执行self._taskset_instance.run() 执行user中tasks中的任务

        5、当出现异常时被捕获,在ctach中执行self.on_stop结束测试。

 类似资料: