当前位置: 首页 > 编程笔记 >

python实现接口并发测试脚本

周飞语
2023-03-14
本文向大家介绍python实现接口并发测试脚本,包括了python实现接口并发测试脚本的使用技巧和注意事项,需要的朋友参考一下

常用的网站性能测试指标有:并发数、响应时间、吞吐量、性能计数器等。

1、并发数

并发数是指系统同时能处理的请求数量,这个也是反应了系统的负载能力。

2、响应时间

响应时间是一个系统最重要的指标之一,它的数值大小直接反应了系统的快慢。响应时间是指执行一个请求从开始到最后收到响应数据所花费的总体时间。

3、吞吐量

吞吐量是指单位时间内系统能处理的请求数量,体现系统处理请求的能力,这是目前最常用的性能测试指标。
QPS(每秒查询数)、TPS(每秒事务数)是吞吐量的常用量化指标,另外还有HPS(每秒HTTP请求数)。
跟吞吐量有关的几个重要是:并发数、响应时间。
QPS(TPS),并发数、响应时间它们三者之间的关系是:
QPS(TPS)= 并发数/平均响应时间

4、性能计数器

性能计数器是描述服务器或操作系统性能的一些数据指标,如使用内存数、进程时间,在性能测试中发挥着"监控和分析"的作用,尤其是在分析统统可扩展性、进行新能瓶颈定位时有着非常关键的作用。
Linux中可以使用top或者uptime命令看到当前系统的负载及资源利用率情况。
资源利用率:指系统各种资源的使用情况,如cpu占用率为68%,内存占用率为55%,一般使用"资源实际使用/总的资源可用量"形成资源利用率。

压测脚本(下单的接口):

#!/usr/bin/env python
#-*- coding:utf-8 -*-

import requests,time,json,threading,random

class Presstest(object):
  headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36',
    'Content-Type': 'application/json; charset=UTF-8',
  }
  def __init__(self,login_url,press_url,phone="1376193000",password="123456"):
    self.login_url = login_url
    self.press_url = press_url
    self.phone = phone
    self.password = password
    self.session = requests.Session()
    self.session.headers = self.headers

  def login(self):
    '''登陆获取session'''
    data = data = {'t': int(time.time() * 1000), 'userName': self.phone, 'passWord': self.password}
    res = self.session.post(self.login_url,data=json.dumps(data))
    XToken = res.json().get('data').get('companyToken')
    self.session.headers['X-Token'] = XToken

  def testinterface(self):
    '''压测接口'''
    self.session.headers['X-UnionId'] = 'of6uw1CUVhP533sQok'
    data = {"id": int(''.join(str(random.choice(range(10))) for _ in range(10))),
        "openId": "oMr0c5LGJjlTc", "addressId": 10, "shipType": "SELF", "totalAmount": 5,
        "receivable": 5, "carts": [
        {"amount": 1, "barcode": "1234567890", "skuId": 1, "spec": "34", "itemAmount": 5, "price": 0,
         "cover": "xxxx-dd.oss-cn-shanghai.aliyuncs.com/dfc91fd067ac464c096c90af33a196a5.png",
         "name": "沙宣洗发水", "packingType": "瓶", "placeOfOrigin": "上海", "productId": "310153323435134976",
         "retailPrice": 5, "suitableAge": "1-100"}], "formId": "the formId is a mock one", "comments": "aa"}
    global ERROR_NUM
    try:
      html = self.session.post(self.press_url, data=json.dumps(data))
      if html.json().get('code') != 0:
        print(html.json())
        ERROR_NUM += 1
    except Exception as e:
      print(e)
      ERROR_NUM += 1

  def testonework(self):
    '''一次并发处理单个任务'''
    i = 0
    while i < ONE_WORKER_NUM:
      i += 1
      self.work()
    time.sleep(LOOP_SLEEP)

  def run(self):
    '''使用多线程进程并发测试'''
    t1 = time.time()
    Threads = []

    for i in range(THREAD_NUM):
      t = threading.Thread(target=self.testonework, name="T" + str(i))
      t.setDaemon(True)
      Threads.append(t)

    for t in Threads:
      t.start()
    for t in Threads:
      t.join()
    t2 = time.time()

    print("===============压测结果===================")
    print("URL:", self.press_url)
    print("任务数量:", THREAD_NUM, "*", ONE_WORKER_NUM, "=", THREAD_NUM * ONE_WORKER_NUM)
    print("总耗时(秒):", t2 - t1)
    print("每次请求耗时(秒):", (t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM))
    print("每秒承载请求数:", 1 / ((t2 - t1) / (THREAD_NUM * ONE_WORKER_NUM)))
    print("错误数量:", ERROR_NUM)


if __name__ == '__main__':
  login_url = 'https://ds.xxxxx.com/sys/sysUser/login'
  press_url = 'https://ds.xxxxx.com/weshop/order/checkout'
  phone = "1376193000"
  password = "123456"
  
  THREAD_NUM = 1     # 并发线程总数
  ONE_WORKER_NUM = 5   # 每个线程的循环次数
  LOOP_SLEEP = 0.1    # 每次请求时间间隔(秒)
  ERROR_NUM = 0      # 出错数
  
  obj = Presstest(login_url=login_url,press_url=press_url,phone=phone,password=password)
  obj.login()
  obj.run()

输出结果:

===============压测结果===================
URL: https://ds.xxxxx.com/weshop/order/checkout
任务数量: 1 * 5 = 5
总耗时(秒): 1.9810078144073486
每次请求耗时(秒): 0.39620156288146974
每秒承载请求数: 2.5239678327547805
错误数量: 0

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 本文向大家介绍Python接口自动化测试的实现,包括了Python接口自动化测试的实现的使用技巧和注意事项,需要的朋友参考一下 1)环境准备:   接口测试的方式有很多,比如可以用工具(jmeter,postman)之类,也可以自己写代码进行接口测试,工具的使用相对来说都比较简单,重点是要搞清楚项目接口的协议是什么,然后有针对性的进行选择,甚至当工具不太适合项目时需要自己进行开发。   在我们项目

  • 本文向大家介绍Python+threading模块对单个接口进行并发测试,包括了Python+threading模块对单个接口进行并发测试的使用技巧和注意事项,需要的朋友参考一下 本文实例为大家分享了Python threading模块对单个接口进行并发测试的具体代码,供大家参考,具体内容如下 本文知识点 通过在threading.Thread继承类中重写run()方法实现定制输出结果 代码如下

  • 问题内容: 这可能以前曾被问过,但是快速搜索只提出了询问C#的相同问题。看这里。 我基本上想做的是检查给定对象是否实现了给定接口。 我有点想出了一个解决方案,但这不足以在if或case语句中频繁使用它,我想知道Java是否没有内置解决方案。 编辑:好的,谢谢您的回答。特别是对于Damien Pollet和Noldorin,您让我重新考虑了设计,因此我不再测试接口。 问题答案: 该运营商确实在工作安

  • 我正在使用POM设计为开放式MRS应用程序创建一个演示框架。应用程序的凭据是“admin/Admin123” 我创建了以下3个类: 1)登录测试:包含所有登录(1ve和2ve场景)和注销测试(总共4个@tests) 2)注册患者测试:它将注册患者,并验证患者是否已成功注册(总共3次ve@Tests) 3)捕获生命体征测试:将捕获上述登记患者的生命体征,并验证捕获的细节是否正确。(总共2次ve@测试

  • 我们的spring webapp实现了很多autowired接口。接口实现中的一些方法做了很多工作。方法本身有一个公共入口点,许多逻辑被抽象成许多私有方法。因此,单元和集成测试相当庞大,因为有很多要测试。在编写集成测试方面,我考虑的是一种模式,即让测试自动与实现而不是接口相关,将这些私有方法的作用域更改为公共的,但不将它们公开给拥有的接口。 这将允许对这些子例程进行单元测试,同时保持契约中没有很多

  • 本文向大家介绍Python接口开发实现步骤详解,包括了Python接口开发实现步骤详解的使用技巧和注意事项,需要的朋友参考一下 一、操作步骤 1. 导入:import flask,json 2. 实例化:api = flask.Flask(__name__) 3. 定义接口访问路径及访问方式:@api.route('/index',methods=['get/post/PUT/DELETE'])

  • 本文向大家介绍python+requests+unittest API接口测试实例(详解),包括了python+requests+unittest API接口测试实例(详解)的使用技巧和注意事项,需要的朋友参考一下 我在网上查找了下接口测试相关的资料,大都重点是以数据驱动的形式,将用例维护在文本或表格中,而没有说明怎么样去生成想要的用例, 问题: 测试接口时,比如参数a,b,c,我要先测a参数,有

  • 接口测试 可以在兑吧后台接口配置处测试接口是否可以ping通和查看请求URL 1.免登接口测试 免登录地址接口开发完成之后,将免登录接口地址配置到兑吧后台,并在商品(或者活动)列表中获取商品(或者活动)链接,链接地址格式为:免登录接口地址+dbredirect+商品链接(链接经过encode编码)。开发者可以将该商品(或者活动)链接配置到客户端投放入口,点击测试访问,看是否可以正常跳转到商城指定页