最近,由于服务压测的需求,借助locust框架进行了二次开发,整体来说locust满足当前需求,缺点是即便采用分布式部署,worker的丢失容易导致压测流量的变化,从而影响压测体验,所以很有必要了解master和worker之间的分工及协作,以便掌握该框架的优劣,对日后的服务升级打下基础。
1、main里做了什么:
(1)使用command line parser对-f后的locustfile进行验证,然后import该locustfile,导入后docstring, user_classes, shape_class这3个,如果启动的时候我们不指定shape,docstring有值,user_classes应该是locustfile里继承自User或者HttpUser的用户自定义类名。如果用户编写的locustfile对user这块的定义有问题,那么locust启动压测的时候就会退出。
(2)使用parse_options 解析所有command line options。这里如果我们不显示指定headless的话,应该可以通过网页访问到locust的压测设置页面,options.headless = False
(3)日志的设置(这里没有设置)
(4)检查user_classes,主要看定义是否正确,以及是否包含用户启动指令指定的user,把用户要启动的user筛出来装到一个容器里。
(5)无
(6)创建environment环境
environment = create_environment(user_classes, options, events=locust.events, shape_class=shape_class)
根据启动命令创建压测进程的环境实例:
def create_environment(user_classes, options, events=None, shape_class=None):
"""
Create an Environment instance from options
"""
return Environment(
user_classes=user_classes,
shape_class=shape_class,
tags=options.tags,
exclude_tags=options.exclude_tags,
events=events,
host=options.host,
reset_stats=options.reset_stats,
stop_timeout=options.stop_timeout,
parsed_options=options,
)
我们继续深入,看看Environment里干了啥
def __init__(
self,
*,
user_classes=[], user_classes
shape_class=None, None
tags=None, None
exclude_tags=None, None
events=None, Events()-如果没有指定events
host=None, host
reset_stats=False, False
stop_timeout=None, None
catch_exceptions=True, True
parsed_options=None, None
):
(7)创建master的environment的runner:
def create_master_runner(self, master_bind_host="*", master_bind_port=5557):
return self._create_runner(
MasterRunner,
master_bind_host=master_bind_host,
master_bind_port=master_bind_port,
)
里边干了啥
def __init__(self, environment, master_bind_host, master_bind_port):
"""
:param environment: Environment instance
:param master_bind_host: Host/interface to use for incoming worker connections
:param master_bind_port: Port to use for incoming worker connections
"""
super().__init__(environment)
self.worker_cpu_warning_emitted = False
self.master_bind_host = master_bind_host
self.master_bind_port = master_bind_port
class WorkerNodesDict(dict):
def get_by_state(self, state):
return [c for c in self.values() if c.state == state]
@property
def all(self):
return self.values()
@property
def ready(self):
return self.get_by_state(STATE_INIT)
@property
def spawning(self):
return self.get_by_state(STATE_SPAWNING)
@property
def running(self):
return self.get_by_state(STATE_RUNNING)
@property
def missing(self):
return self.get_by_state(STATE_MISSING)
self.clients = WorkerNodesDict()
try:
self.server = rpc.Server(master_bind_host, master_bind_port)
except RPCError as e:
if e.args[0] == "Socket bind failure: Address already in use":
port_string = master_bind_host + ":" + master_bind_port if master_bind_host != "*" else master_bind_port
logger.error(
f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"
)
sys.exit(1)
else:
raise
self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)
# listener that gathers info on how many users the worker has spawned
def on_worker_report(client_id, data):
if client_id not in self.clients:
logger.info("Discarded report from unrecognized worker %s", client_id)
return
self.clients[client_id].user_count = data["user_count"]
self.environment.events.worker_report.add_listener(on_worker_report)
# register listener that sends quit message to worker nodes
def on_quitting(environment, **kw):
self.quit()
self.environment.events.quitting.add_listener(on_quitting)
整理worker的地址,创建rpc server端,然后创建2个协程,1个用来监测worker们的心跳;另1个用于server监听worker们上报的状态
(8)启动web UI
(9)Fire locust init event which can be used by end-users’ code to run setup code that
# need access to the Environment, Runner or WebUI.
environment.events.init.fire(environment=environment, runner=runner, web_ui=web_ui)
(10)input_listener_greenlet
(11)gevent.spawn(stats_history, runner)
1、main里做了什么:
(1)~(4)相同
(5)检查系统允许打开文件的数量/小的话设置成1w
(6)相同
(7)创建worker的environment的runner:
runner = environment.create_worker_runner(options.master_host, options.master_port)
def create_worker_runner(self, master_host, master_port):
return self._create_runner(
WorkerRunner,
master_host=master_host,
master_port=master_port,
)
def _create_runner(self, runner_class, *args, **kwargs):
if self.runner is not None:
raise RunnerAlreadyExistsError("Environment.runner already exists (%s)" % self.runner)
self.runner = runner_class(self, *args, **kwargs)
return self.runner
这里就引入了locust的核心类–runner,这里WorkerRunner连接到一个MasterRunner,我们还是先看看WorkerRunner初始化干了啥吧
def __init__(self, environment, master_host, master_port):
"""
:param environment: Environment instance
:param master_host: Host/IP to use for connection to the master
:param master_port: Port to use for connecting to the master
"""
super().__init__(environment)
self.worker_state = STATE_INIT
self.client_id = socket.gethostname() + "_" + uuid4().hex
self.master_host = master_host
self.master_port = master_port
self.client = rpc.Client(master_host, master_port, self.client_id)
self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)
self.client.send(Message("client_ready", None, self.client_id))
self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler)
通过master_host,master_port与MasterRunner进程建立rpc连接,然后启动3个协程,第1个是向mastere发送心跳;第2个是真正干活的协程:第3个是状态上报。接下来我们看看这个真正干活的协程:
def worker(self):
while True:
try:
msg = self.client.recv()
except RPCError as e:
logger.error("RPCError found when receiving from master: %s" % (e))
continue
if msg.type == "spawn":
self.worker_state = STATE_SPAWNING
self.client.send(Message("spawning", None, self.client_id))
job = msg.data
self.spawn_rate = job["spawn_rate"]
self.target_user_count = job["num_users"]
self.environment.host = job["host"]
self.environment.stop_timeout = job["stop_timeout"]
if self.spawning_greenlet:
# kill existing spawning greenlet before we launch new one
self.spawning_greenlet.kill(block=True)
self.spawning_greenlet = self.greenlet.spawn(
lambda: self.start(user_count=job["num_users"], spawn_rate=job["spawn_rate"])
)
self.spawning_greenlet.link_exception(greenlet_exception_handler)
elif msg.type == "stop":
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
self.client.send(Message("client_ready", None, self.client_id))
self.worker_state = STATE_INIT
elif msg.type == "quit":
logger.info("Got quit message from master, shutting down...")
self.stop()
self._send_stats() # send a final report, in case there were any samples not yet reported
self.greenlet.kill(block=True)
该协程阻塞接收master发来的消息。消息有"spawn", “stop”, "quit"三类,分别对应着孵化流量,停止和退出。我们先看spawn消息,核心是下边这一句:
self.spawning_greenlet = self.greenlet.spawn(
lambda: self.start(user_count=job["num_users"], spawn_rate=job["spawn_rate"])
)
这里start就启动了一个负载任务,是我们压力的来源,我们看看start,此时runner的状态是STATE_SPAWNING
def start(self, user_count, spawn_rate, wait=False):
if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:
self.stats.clear_all()
self.exceptions = {}
self.cpu_warning_emitted = False
self.worker_cpu_warning_emitted = False
self.target_user_count = user_count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
logger.debug(
"Updating running test with %d users, %.2f spawn rate and wait=%r" % (user_count, spawn_rate, wait)
)
self.update_state(STATE_SPAWNING)
if self.user_count > user_count:
# Stop some users
stop_count = self.user_count - user_count
self.stop_users(stop_count, spawn_rate)
elif self.user_count < user_count:
# Spawn some users
spawn_count = user_count - self.user_count
self.spawn_users(spawn_count=spawn_count, spawn_rate=spawn_rate)
else:
self.environment.events.spawning_complete.fire(user_count=self.user_count)
else:
self.spawn_rate = spawn_rate
self.spawn_users(user_count, spawn_rate=spawn_rate, wait=wait)
这里的逻辑很简单,根据master发送的"SPAWN"消息,我们判断user_count和当前的user个数是否相等,少则通过spawn_users()增加,多则通过stop_users()减少,个数相当则触发一个孵化完成的事件。然后我们继续看spawn_users:
def spawn_users(self, spawn_count, spawn_rate, wait=False):
bucket = self.weight_users(spawn_count)
spawn_count = len(bucket)
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.update_state(STATE_SPAWNING)
existing_count = len(self.user_greenlets)
logger.info(
"Spawning %i users at the rate %g users/s (%i users already running)..."
% (spawn_count, spawn_rate, existing_count)
)
occurrence_count = dict([(l.__name__, 0) for l in self.user_classes])
def spawn():
sleep_time = 1.0 / spawn_rate
while True:
if not bucket:
logger.info(
"All users spawned: %s (%i total running)"
% (
", ".join(["%s: %d" % (name, count) for name, count in occurrence_count.items()]),
len(self.user_greenlets),
)
)
self.environment.events.spawning_complete.fire(user_count=len(self.user_greenlets))
return
user_class = bucket.pop(random.randint(0, len(bucket) - 1))
occurrence_count[user_class.__name__] += 1
new_user = user_class(self.environment)
new_user.start(self.user_greenlets)
if len(self.user_greenlets) % 10 == 0:
logger.debug("%i users spawned" % len(self.user_greenlets))
if bucket:
gevent.sleep(sleep_time)
spawn()
if wait:
self.user_greenlets.join()
logger.info("All users stopped\n")
这里spawn进来后还是个死循环,关键点在new_user.start,这里应该就是调用用户定义的任务的地方。当期待的user_count达到后,输出日志然后return,返回后我们层层退出,可以看到回到最初的worker里继续阻塞在worker等待master发号施令的地方了。
(8)无
(9)Fire locust init event which can be used by end-users’ code to run setup code that
# need access to the Environment, Runner or WebUI.
environment.events.init.fire(environment=environment, runner=runner, web_ui=web_ui)
(10)无
(11) gevent.spawn(stats_history, runner)
(12)等待停止信号
(13)结束