常规做APPUI自动化时,基本上都是采取的pom模式加上关键字驱动、数据驱动、实现测试数据分离。
(1)实现基本业务流程的测试
(2)多设备同时运行
(3)自动拉取最新apk并自动安装
(4)持续集成
业务的基本流程覆盖,相信做过接口web端的自动化或者接口自动化的同学来说,实现app ui自动化其实不难,也是相同的思路pom模式、关键字驱动、数据驱动、数据分离等。
1.先来自动拉取最新apk 并且实现自动化安装。
import os
import re
import shutil
from urllib.request import urlopen # 用于获取网页
import requests
from bs4 import BeautifulSoup # 用于解析网页
path = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))
class automatic(object):
def __init__(self):
self.url = 'http://10.124.106.120:28759/service/rest/repository/browse/maven-releases/tcl/release/android/tclplus/'
@staticmethod
def _get_url(url):
"""
根据传入的url来获取超链接
"""
html = urlopen(url)
bsObj = BeautifulSoup(html, 'html.parser')
t1 = bsObj.find_all('a')
return t1
# 获取线上最新版本号与版本时间
def get_latest_version(self):
"""
获取最新版本号
"""
t1 = self._get_url(self.url)
time_version_dict = []
for t2 in t1:
t3 = t2.get('href')
if '2.0' not in t3:
# 将2.0以上的版本全部过滤掉,然后加入到列表里面
time_version_dict.append(str(t3).split("/")[0].strip("..").strip("http:"))
# 找出最大的版本号
latest_version = max(time_version_dict)
print(f"最大版本号为:{latest_version}")
result_url = self.url + str(latest_version)
return result_url, latest_version
def download_apk(self):
"""
下载apk
"""
global apk_name
urls, number = self.get_latest_version()
t1 = self._get_url(urls)
time_version_dict = []
url_list = []
for t2 in t1:
t3 = t2.get('href')
url_list.append(t3)
# 利用正则表达式找到2021开头的版本号
pattern = re.compile(r'(?:2021)\d+\.?\d*')
time_version_dict.append(pattern.findall(t3))
Ak = max(time_version_dict)[0]
for url in url_list:
if str(Ak) in url:
file = requests.get(url, timeout=60)
apk_name = f"{number}-{max(time_version_dict)[0]}-jiagu-xtest-release.apk"
with open(path + '/APK/' + apk_name, 'wb') as zip_file:
zip_file.write(file.content)
return apk_name
主要就是发起请求,取到最新版本号,然后获取链接地址,拿到下载地址之后,来写file.content,保存apk 即可。
2.第二步,多设备链接并自动安装
Decorator.py
import time
from functools import wraps
from Common.BasePage import BasePage
from Common.ReportPath import ReportPath
from Common.Log import Log
flag = 'IMAGE:'
log = Log()
def _screenshot(name):
date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
screenshot = name + '-' + date_time + '.PNG'
path = ReportPath().get_path() + '/' + screenshot
driver, sess = BasePage().get_driver()
driver.screenshot(path)
return screenshot
def teststep(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
log.i('\t--> %s', func.__qualname__)
ret = func(*args, **kwargs)
return ret
except AssertionError as e:
log.e('AssertionError, %s', e)
log.e('\t<-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')
if flag in str(e):
raise AssertionError(e)
else:
raise AssertionError(flag + _screenshot(func.__qualname__))
except Exception as e:
log.e('Exception, %s', e)
log.e('\t<-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')
if flag in str(e):
raise Exception(e)
else:
raise Exception(flag + _screenshot(func.__qualname__))
return wrapper
def teststeps(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
log.i(' --> %s', func.__qualname__)
ret = func(*args, **kwargs)
log.i(' <-- %s, %s', func.__qualname__, 'Success')
return ret
except AssertionError as e:
log.e('AssertionError, %s', e)
log.e(' <-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')
if flag in str(e):
raise AssertionError(e)
else:
raise AssertionError(flag + _screenshot(func.__qualname__))
except Exception as e:
log.e('Exception, %s', e)
log.e(' <-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')
if flag in str(e):
raise Exception(e)
else:
raise Exception(flag + _screenshot(func.__qualname__))
return wrapper
def _wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
log.i('--> %s', func.__qualname__)
ret = func(*args, **kwargs)
log.i('<-- %s, %s\n', func.__qualname__, 'Success')
return ret
except AssertionError as e:
log.e('AssertionError, %s', e)
log.e('<-- %s, %s, %s\n', func.__qualname__, 'AssertionError', 'Fail')
if flag in str(e):
raise AssertionError(e)
else:
raise AssertionError(flag + _screenshot(func.__qualname__))
except Exception as e:
log.e('Exception, %s', e)
log.e('<-- %s, %s, %s\n', func.__qualname__, 'Exception', 'Error')
if flag in str(e):
raise Exception(e)
else:
raise Exception(flag + _screenshot(func.__qualname__))
return wrapper
def testcase(func):
return _wrapper(func)
def setup(func):
return _wrapper(func)
def teardown(func):
return _wrapper(func)
def setupclass(func):
return _wrapper(func)
def teardownclass(func):
return _wrapper(func)
基础定位方法封装
BasePage.py
import time
import uiautomator2 as u2
from Common.chromedriver import ChromeDriver
from Common.Ports import Ports
from Common.ReportPath import ReportPath
class BasePage(object):
@classmethod
def set_driver(cls, dri):
cls.d = u2.connect(dri)
cls.sess = cls.d.session("com.tcl.tclplus")
def get_driver(self):
return self.d, self.sess
def ClickText(self, text):
"""
点击文本方法操作
"""
self.d(text=text).click()
def ClickResId(self, page):
"""
根据resourceId 进行点击
"""
self.d(resourceId="com.tcl.tclplus:id/" + page).click()
def ExistsResId(self, page):
"""
判断resid 是否存在
"""
return self.d(resourceId="com.tcl.tclplus:id/" + page).exists()
def ExistsText(self, text):
"""判断文本 是否存在"""
return self.d(text=text).exists
def ClickResIdSendKeys(self, *args):
"""
通过ResId 定位并且 输入内容
"""
self.ClickResId(args[0])
self.d.send_keys(args[1])
def GetTextResID(self, page):
"""
根据RESid 来获取文本
"""
return self.d(resourceId="com.tcl.tclplus:id/" + page).get_text()
def GetTextXpath(self, text):
"""
获取文本的xpath
"""
return self.d.xpath(f'//*[@text="{text}"]').get_text()
def ClickResIdText(self, page, text):
"""
根据resIDhe文本进行定位
"""
self.d(resourceId="com.tcl.tclplus:id/" + page, text=text).click()
def SwipeOrExist(self, taper, text, up="up"):
"""
向上滑动,判断元素是否存在,存在即可点击
根据taper为True或者false 来执行文本判断和resID判断
滑动语句写到循环里面,每次执行循环,然后进行判断是否存在
根据参数 up 或者down 来决定滑动的方向。
"""
while True:
if taper:
self.d.swipe_ext(up, 1)
if not self.d(text=text).exists:
continue
else:
self.ClickText(text)
break
else:
self.d.swipe_ext(up, 0.5)
if not self.ExistsResId(text):
continue
else:
self.ClickResId(text)
break
@classmethod
def back(cls):
'''点击返回
页面没有加载完的时候,会出现返回失败的情况,使用前确认页面加载完成'''
time.sleep(1)
cls.d.press('back')
time.sleep(1)
@classmethod
def identify(cls):
cls.d.open_identify()
def set_chromedriver(self, device_ip=None, package=None, activity=None, process=None):
driver = ChromeDriver(self.d, Ports().get_ports(1)[0]). \
driver(device_ip=device_ip, package=package, attach=True, activity=activity, process=process)
return driver
@classmethod
def watch_device(cls, watch_list):
'''
如果存在元素则自动点击
:param watch_list: exp: watch_list=['允许','yes','跳过']
'''
cls.d.watchers.watched = False
for i in watch_list:
cls.d.watcher(i).when(text=i).click(text=i)
# cls.d.watcher("允许").when(text="允许").click(text="允许")
print('Starting watcher,parameter is %s' % watch_list)
cls.d.watchers.watched = True
@classmethod
def unwatch_device(cls):
'''关闭watcher '''
print('Stop all watcher')
cls.d.watchers.watched = False
@classmethod
def get_toast_message(cls):
message = cls.d.toast.get_message(3, 3)
cls.d.toast.reset()
return message
@classmethod
def set_fastinput_ime(cls):
cls.d.set_fastinput_ime(True)
@classmethod
def set_original_ime(cls):
cls.d.set_fastinput_ime(False)
@classmethod
def screenshot(cls):
"""截图并打印特定格式的输出,保证用例显示截图"""
date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
screenshot_name = cls.__qualname__ + '-' + date_time + '.PNG'
path = ReportPath().get_path() + '/' + screenshot_name
cls.d.screenshot(path)
print('IMAGE:' + screenshot_name)
@staticmethod
def find_message(elements, text):
"""查找元素列表中是否存在 text"""
count = elements.count
while count > 0:
count = count - 1
message = elements[count].info['text']
if text in message:
return True
elif count == 0:
return False
else:
return False
def _get_window_size(self):
window = self.d.window_size()
x = window[0]
y = window[1]
return x, y
@staticmethod
def _get_element_size(element):
# rect = element.info['visibleBounds']
rect = element.info['bounds']
# print(rect)
x_center = (rect['left'] + rect['right']) / 2
y_center = (rect['bottom'] + rect['top']) / 2
x_left = rect['left']
y_up = rect['top']
x_right = rect['right']
y_down = rect['bottom']
return x_left, y_up, x_center, y_center, x_right, y_down
# def _swipe(self, fromX, fromY, toX, toY, steps):
# self.d.swipe(fromX, fromY, toX, toY, steps)
#
# def swipe_up(self, element=None, steps=0.2):
# """
# swipe up
# :param element: UI element, if None while swipe window of phone
# :param steps: steps of swipe for Android, The lower the faster
# :return: None
# """
# if element:
# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
# fromX = x_center
# fromY = y_center
# toX = x_center
# toY = y_up
# else:
# x, y = self._get_window_size()
# fromX = 0.5 * x
# fromY = 0.5 * y
# toX = 0.5 * x
# toY = 0.25 * y
#
# self._swipe(fromX, fromY, toX, toY, steps)
#
# def swipe_down(self, element=None, steps=0.2):
# """
# swipe down
# :param element: UI element, if None while swipe window of phone
# :param steps: steps of swipe for Android, The lower the faster
# :return: None
# """
# if element:
# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
#
# fromX = x_center
# fromY = y_center
# toX = x_center
# toY = y_down
# else:
# x, y = self._get_window_size()
# fromX = 0.5 * x
# fromY = 0.5 * y
# toX = 0.5 * x
# toY = 0.75 * y
#
# self._swipe(fromX, fromY, toX, toY, steps)
#
# def swipe_left(self, element=None, steps=0.2):
# """
# swipe left
# :param element: UI element, if None while swipe window of phone
# :param steps: steps of swipe for Android, The lower the faster
# :return: None
# """
# if element:
# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
# fromX = x_center
# fromY = y_center
# toX = x_left
# toY = y_center
# else:
# x, y = self._get_window_size()
# fromX = 0.5 * x
# fromY = 0.5 * y
# toX = 0.25 * x
# toY = 0.5 * y
# self._swipe(fromX, fromY, toX, toY, steps)
#
# def swipe_right(self, element=None, steps=0.2):
# """
# swipe right
# :param element: UI element, if None while swipe window of phone
# :param steps: steps of swipe for Android, The lower the faster
# :return: None
# """
# if element:
# x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
# fromX = x_center
# fromY = y_center
# toX = x_right
# toY = y_center
# else:
# x, y = self._get_window_size()
# fromX = 0.5 * x
# fromY = 0.5 * y
# toX = 0.75 * x
# toY = 0.5 * y
# self._swipe(fromX, fromY, toX, toY, steps)
#
# def _find_element_by_swipe(self, direction, value, element=None, steps=0.2, max_swipe=6):
# """
# :param direction: swip direction exp: right left up down
# :param value: The value of the UI element location strategy. exp: d(text='Logina')
# :param element: UI element, if None while swipe window of phone
# :param steps: steps of swipe for Android, The lower the faster
# :param max_swipe: the max times of swipe
# :return: UI element
# """
# times = max_swipe
# for i in range(times):
# try:
# if value.exists:
# return value
# else:
# raise UiObjectNotFoundError
# except UiObjectNotFoundError:
# if direction == 'up':
# self.swipe_up(element=element, steps=steps)
# elif direction == 'down':
# self.swipe_down(element=element, steps=steps)
# elif direction == 'left':
# self.swipe_left(element=element, steps=steps)
# elif direction == 'right':
# self.swipe_right(element=element, steps=steps)
# if i == times - 1:
# raise UiObjectNotFoundError
#
# def find_element_by_swipe_up(self, value, element=None, steps=0.2, max_swipe=6):
# return self._find_element_by_swipe('up', value,
# element=element, steps=steps, max_swipe=max_swipe)
#
# def find_element_by_swipe_down(self, value, element=None, steps=0.2, max_swipe=6):
# return self._find_element_by_swipe('down', value,
# element=element, steps=steps, max_swipe=max_swipe)
#
# def find_element_by_swipe_left(self, value, element=None, steps=0.2, max_swipe=6):
# return self._find_element_by_swipe('left', value,
# element=element, steps=steps, max_swipe=max_swipe)
#
# def find_element_by_swipe_right(self, value, element=None, steps=0.2, max_swipe=6):
# return self._find_element_by_swipe('right', value,
# element=element, steps=steps, max_swipe=max_swipe)
chromedriver.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# extension for https://sites.google.com/a/chromium.org/chromedriver/
# Experimental, maybe change in the future
# Created by <hzsunshx> 2017-01-20
from __future__ import absolute_import
import atexit
import six
from selenium import webdriver
import psutil as pt
import os
if six.PY3:
import subprocess
from urllib.error import URLError
else:
from urllib2 import URLError
import subprocess32 as subprocess
def getPidByName(Str):
pids = pt.process_iter()
pidList = []
for pid in pids:
if pid.name() == Str:
pidList.append(int(pid.pid))
return pidList
class ChromeDriver(object):
def __init__(self, d, port):
self._d = d
self._port = port
def _launch_webdriver(self):
# print("start chromedriver instance")
p = subprocess.Popen(['chromedriver', '--port=' + str(self._port)])
try:
p.wait(timeout=2.0)
return False
except subprocess.TimeoutExpired:
return True
def driver(self, device_ip=None, package=None, attach=True, activity=None, process=None):
"""
Args:
- package(string): default current running app
- attach(bool): default true, Attach to an already-running app instead of launching the app with a clear data directory
- activity(string): Name of the Activity hosting the WebView.
- process(string): Process name of the Activity hosting the WebView (as given by ps).
If not given, the process name is assumed to be the same as androidPackage.
Returns:
selenium driver
"""
app = self._d.current_app()
capabilities = {
'chromeOptions': {
'androidDeviceSerial': device_ip or self._d.serial,
'androidPackage': package or app['package'],
'androidUseRunningApp': attach,
'androidProcess': process or app['package'],
'androidActivity': activity or app['activity'],
}
}
try:
dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)
except URLError:
self._launch_webdriver()
dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)
# always quit driver when done
atexit.register(dr.quit)
return dr
@staticmethod
def kill():
# # for windows
# pid = getPidByName('chromedriver.exe')
# for i in pid:
# os.popen('taskkill /PID %d /F' % i)
# # for mac
pid = getPidByName('chromedriver')
for i in pid:
os.popen('kill -9 %d' % i)
print('All chromedriver pid killed')
# if __name__ == '__main__':
import uiautomator2 as u2
# d = u2.connect()
# driver = ChromeDriver(d).driver()
# elem = driver.find_element_by_link_text(u"登录")
# elem.click()
# driver.quit()
# ChromeDriver.kill()
业务方法封装,包含自动安装apk
LoginPage.py
# -*- coding: utf-8 -*-
import random
import string
import threading
from Common.osDriver import *
from Common.Decorator import *
filepath = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))
class LoginPage(BasePage):
# @teststep
# def loginApps(self):
# self.ClickText("立即登录")
# time.sleep(1)
# if self.ExistsText("确认"):
# # if self.ExistsResId("bt_confirm"):
# log.i("存在确认,先输入账号")
# if self.d.device_info["brand"] == "Xiaomi": # 根据设备型号指定账号
# CountUser = '13720140005'
# log.i(f"取到的账号为{CountUser}")
# self.ClickResIdSendKeys("ll_phone", CountUser)
# self.ClickText("确认")
# elif self.d.device_info["brand"] == "HUAWEI": # 根据设备型号指定账号
# CountUser = '13720140008'
# log.i(f"取到的账号为{CountUser}")
# self.ClickResIdSendKeys("ll_phone", CountUser)
# self.ClickText("确认")
#
# time.sleep(3)
# if self.ExistsText("账号密码登录"):
# self.ClickText("账号密码登录")
# self.ClickResIdSendKeys("et_pwd", '123456wqw')
# if not self.ExistsResId("cb_login"):
# self.d.press("back")
# while True:
# self.ClickResId("cb_login")
# self.ClickText("登录")
# time.sleep(5)
# message = self.sess.toast.get_message(wait_timeout=0.5) # 获取页面toast
# print(f'获取到的toast为:{message}')
# if message == u"请阅读并同意协议":
# continue
# else:
# break
#
# @teststep
# def Eor(self):
# """
# 判断是否存在 存在即点击
# """
# if self.ExistsText("允许"):
# self.ClickText("允许")
# if self.ExistsText("始终允许"):
# self.ClickText("始终允许")
#
# def Type_exit(self, types):
# """
# 判断返回 是否为 左 或者 右
# 左:确认离开 支付的状态
# 右:确认离开 没有提交订单 只是点了立即购买
# @object:
# """
# if types == 'left':
# if self.ExistsResId("tv_left"):
# self.ClickResId("tv_left")
# elif types == 'right':
#
# if self.ExistsResId("tv_right"):
# self.ClickText("确认离开")
# elif types == 'give':
# ss2 = self.d(resourceId="com.tencent.mm:id/ffp").exists()
# if ss2:
# self.d(text='放弃').click()
# self.back()
# self.d(text='确认离开').click()
# elif types == 'li':
# if self.ExistsResId("tv_right"):
# self.ClickText("确认")
#
# else:
# pass
#
# def openIndex(self, ides=''):
# """
# 一直点返回按钮,当出现 提示语 就结束
# """
#
# while True:
# self.back()
# time.sleep(1)
# self.Type_exit(ides)
# message = self.sess.toast.get_message(wait_timeout=0.5) # 获取页面toast
# if message == u"再按一次退出程序":
# break
#
# @teststep
# def ClearAndLogin(self):
# """
# 执行清除缓存 ,然后登录
# """
# # dev_list = []
# # for d in adbutils.adb.device_list():
# # dev_list.append(d)
# # for i in dev_list:
# # s = str(i).split("=")[1].split(")")[0]
# # print(f"""取到的设备id为:{s}""")
# # os.system(f"adb shell -s {s} pm clear com.tcl.tclplus")
# # time.sleep(5)
# pass
#
# @teststep
# def update_nickname(self, *args):
# """修改昵称"""
# if self.d(text="保存").exists():
# pass
# else:
# self.d.xpath(
# '//*[@resource-id="com.tcl.tclplus:id/fl_head_view"]/android.widget.ImageView[5]').click()
# try:
# # self.ClickResIdSendKeys("et_nickname", args[0])
# self.ClickResId("et_nickname")
# time.sleep(1)
# # 点旁边的“x”情况昵称
# self.ClickResId("iv_clear")
# self.d.send_keys(args[0])
# time.sleep(1)
# self.ClickText("保存")
# time.sleep(1)
# args[1](args[0], self.GetTextResID("tv_nickName"))
# log.i("修改昵称成功")
# except Exception as e:
# raise log.i("修改昵称失败")
#
# @teststep
# def tv_sms(self, *args):
# """"""
# try:
# self.ClickResId(args[1])
# time.sleep(2)
# args[0](args[2], self.GetTextResID("toolbar_title"))
# log.i(f"成功进入{args[2]}页面")
# except Exception as e:
# raise log.i(f'进入{args[2]}页面失败')
# finally:
# self.d.press("back")
#
# @teststep
# def add_invoice(self, Invoice, types=''):
# """
# 新增发票
# """
# a = random.sample(string.ascii_letters, 4)
# data = ''.join([str(x) for x in a])
# time.sleep(1)
# self.ClickText("添加发票抬头")
# if Invoice == '企业':
# if types == '电子发票':
# self.ClickText("企业")
# self.ClickResIdSendKeys("et_invoice_header", data)
# self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))
# self.d.press("back")
# self.ClickText("完成")
# else:
# self.ClickText("企业")
# self.ClickResId("rb_VAT_invoice")
# time.sleep(2)
# self.ClickResIdSendKeys("et_invoice_header", data)
# self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))
# self.ClickResIdSendKeys("et_bank_name", "建设银行坂田支行")
# self.ClickResIdSendKeys("et_bank_no", "6217007200031609020")
# self.d.press("back")
# self.ClickResIdSendKeys("et_address", "南山区")
# self.d.press("back")
# self.ClickResIdSendKeys("et_mobile", "07554243060")
# self.d.press("back")
# self.ClickText("完成")
# else:
# self.ClickResIdSendKeys("et_invoice_header", data)
# self.ClickResIdSendKeys("et_phone_no", "13599837022")
# self.ClickText("完成")
#
# @teststep
# def update_invoice(self, ys):
# """修改发票抬头"""
# ids = 1
# # 第一次进入 需要点编辑
# self.d.xpath(
# f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
# while True:
# ids += 1
# time.sleep(2)
# if self.d(text=ys).exists():
# break
# else:
# time.sleep(2)
# self.d.press('back')
# time.sleep(1)
# self.d.xpath(
# f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[{ids}]/android.widget.ImageView[1]').click()
# continue
#
# @teststep
# def add_address_phone(self):
# """
# 新增地址时,填写信息
# """
# self.ClickResIdSendKeys("et_name", "李白")
# self.ClickResIdSendKeys("et_phone", "13699837021")
# self.ClickResId("tv_area")
# for i in ["安徽省", "安庆市", "大观区", "山口乡"]: self.ClickText(i)
# self.ClickResId("et_addr")
# self.d.send_keys("安徽省安庆市")
# self.ClickText("公司")
# self.d.press('back')
# self.ClickText("保存")
#
# @teststep
# def binding(self, *args):
# """
# 绑定qq 或者微信
# custom_qq,custom_wechat
# """
# # 如果类型是qq或者微信,先点击取消绑定,然后进行绑定
# self.d.xpath(
# f'//*[@resource-id="com.tcl.tclplus:id/{args[0]}"]/android.widget.FrameLayout['
# '1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]').click()
# time.sleep(2)
# if self.d(text="继续解绑").exists():
# self.ClickText("继续解绑")
# else:
# time.sleep(2)
# if args[0] == "custom_qq":
# time.sleep(2)
# self.ClickText("授权登录")
# time.sleep(5)
# self.ClickText("确认绑定")
# time.sleep(2)
# args[1]("绑定成功", self.sess.toast.get_message())
@teststep
def local_install(self):
"""
下载apk 到apk路径下,然后进行区分安装
"""
# 先删除该文件夹下所有的文件:然后再去下载apk
automatic().del_file('../apk')
# 清空文件夹下文件之后,再去下载最新apk
apk_name = automatic().download_apk()
if self.d.device_info["brand"] == "HUAWEI": # 根据设备型号来执行不同的安装方式
threaded1 = install_thead(self.d)
threaded1.start()
threaded1.app_install(path + '/apk/' + apk_name)
threaded1.join() # 等待所有线程终止
time.sleep(5)
os.system(f"adb -s 8KE0220413004952 shell pm clear com.tcl.tclplus")
self.d.app_start("com.tcl.tclplus")
elif self.d.device_info["brand"] == "Xiaomi": # 根据设备型号来执行不同的安装方式
os.system(f"adb -s d083a03e install -r -d " + filepath + "/apk/" + apk_name)
time.sleep(2)
os.system(f"adb -s d083a03e shell pm clear com.tcl.tclplus")
self.d.app_start("com.tcl.tclplus")
class install_thead(threading.Thread):
def __init__(self, devices): # 需要传入self.d 设备的信息
self.drivers = devices
threading.Thread.__init__(self, )
def usb_install(self):
for i in range(3):
try:
self.drivers(text='继续安装').click()
except:
pass
try:
self.drivers(text='打开').click()
except:
pass
def run(self): # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
self.usb_install()
def app_install(self, app_file_path):
self.drivers.app_install(app_file_path)
这里根据每个设备的self.d 的devices_info中的brand 来区分 进行不同的安装方法,采用了多线程运行,来去掉安装过程中存在警告提示等。自行添加即可。
以上就完成了 基础的定位方法封装、业务定位封装、多设备启动、多设备安装。
3.同时运行
CaseStrategy.py
import os
import unittest
class CaseStrategy:
def __init__(self):
self.suite_path = 'TestSuite_'
self.case_path = 'TestCase'
self.case_pattern = 'test*.py'
def _collect_cases(self, cases, top_dir=None):
suites = unittest.defaultTestLoader.discover(self.case_path,
pattern=self.case_pattern, top_level_dir=top_dir)
for suite in suites:
for case in suite:
print(f"需要执行的测试用例为:{case}")
cases.addTest(case)
def collect_cases(self, suite=False):
"""collect cases
collect cases from the giving path by case_path via the giving pattern by case_pattern
return: all cases that collected by the giving path and pattern, it is a unittest.TestSuite()
"""
cases = unittest.TestSuite()
if suite:
test_suites = []
for file in os.listdir('.'):
if self.suite_path in file:
if os.path.isdir(file):
test_suites.append(file)
for test_suite in test_suites:
self._collect_cases(cases, top_dir=test_suite)
else:
self._collect_cases(cases, top_dir=None)
return cases
devices.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
多进程check_alive
Mac下需要配置 `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量,不然python会挂掉
'''
from Common.ReadConfig import ReadConfig
from Common.ATX_Server import ATX_Server
import uiautomator2 as u2
import subprocess
import re
from multiprocessing import Pool
def get_devices():
'''get the devices from Pubilc/config.ini devices list
return alive devices'''
devices_ip = ReadConfig().get_devices_ip()
print('Connect devices from config devices IP list %s' % devices_ip)
pool = Pool(processes=len(devices_ip))
tmp_list = []
for run in devices_ip:
tmp_list.append(pool.apply_async(check_alive, args=(run,)))
pool.close()
pool.join()
devices_list = []
for i in tmp_list:
if i.get():
devices_list.append(i.get())
return devices_list
def get_online_devices():
'''get the devices from ATX-Server
return alive devices'''
devices = ATX_Server(ReadConfig().get_server_url()).online_devices()
print('There has %s online devices on ATX-Server' % len(devices))
if devices:
pool = Pool(processes=len(devices))
tmp_list = []
for run in devices:
tmp_list.append(pool.apply_async(check_alive, args=(run,)))
pool.close()
pool.join()
devices_list = []
for i in tmp_list:
if i.get():
devices_list.append(i.get())
return devices_list
else:
raise Exception('ATX-Server has no online device!!! ')
def connect_devices():
'''get the devices USB connected on PC
return alive devices'''
output = subprocess.check_output(['adb', 'devices'])
pattern = re.compile(
r'(?P<serial>[^\s]+)\t(?P<status>device|offline)')
matches = pattern.findall(output.decode())
valid_serials = [m[0] for m in matches if m[1] == 'device']
if valid_serials:
print('There has %s devices connected on PC: ' % len(valid_serials))
pool = Pool(processes=len(valid_serials))
tmp_list = []
for run in valid_serials:
tmp_list.append(pool.apply_async(check_alive, args=(run,)))
pool.close()
pool.join()
devices_list = []
for i in tmp_list:
if i.get():
devices_list.append(i.get())
return devices_list
if len(valid_serials) == 0:
print("No available android devices detected.")
return []
def check_alive(device):
if isinstance(device, dict):
d = u2.connect(device['ip'])
if d.agent_alive:
d.healthcheck()
if d.alive:
print('%s is alive' % device['udid'])
return d.device_info
else:
print('%s is not alive' % device['udid'])
return None
else:
print('The device atx_agent %s is not alive,please checkout!' % device['udid'])
return None
else:
d = u2.connect(device)
if d.agent_alive:
d.healthcheck()
if d.alive:
print('%s is alive' % device)
return d.device_info
else:
print('%s is not alive' % device)
return None
else:
print('The device atx_agent %s is not alive,please checkout!' % device)
return None
# if __name__ == '__main__':
# devices_ip = get_devices()
# devices = connect_devices()
# devices = get_online_devices()
# print(devices_ip)
#
# pool = Pool(processes=len(devices_ip))
# tmp_list = []
# for run in devices_ip:
# tmp_list.append(pool.apply_async(check_alive, args=(run,)))
# # alive_list.append(tmp)
# pool.close()
# pool.join()
# print('All runs done........ ')
# print(tmp_list)
# for i in tmp_list:
# print(i.get())
# print(get_devices())
# print(get_online_devices())
# print(connect_devices())
driver.py
from Common.Devices import * # 多进程 check_alive ,Mac下需要配置 `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量
from Common.RunCases import RunCases
from Common.ReportPath import ReportPath
from Common.BasePage import BasePage
from Common.Log import Log
from Common.ReadConfig import ReadConfig
from Common.chromedriver import ChromeDriver
from Common.Test_data import generate_test_data
from Common.Report import create_statistics_report
class Drivers:
@staticmethod
def _run_cases(run, cases):
log = Log()
log.set_logger(run.get_device()['model'], run.get_path() + '/' + 'client.log')
log.i('udid: %s', run.get_device()['udid'])
# set cls.path, it must be call before operate on any page
path = ReportPath()
path.set_path(run.get_path())
# set cls.driver, it must be call before operate on any page
base_page = BasePage()
if 'ip' in run.get_device():
base_page.set_driver(run.get_device()['ip'])
else:
base_page.set_driver(run.get_device()['serial'])
try:
# run cases
base_page.set_fastinput_ime()
run.run(cases)
base_page.set_original_ime()
base_page.identify()
except AssertionError as e:
log.e('AssertionError, %s', e)
def run(self, cases):
# 根据method 获取android设备
method = ReadConfig().get_method().strip()
if method == 'SERVER':
# get ATX-Server Online devices
# devices = ATX_Server(ReadConfig().get_server_url()).online_devices()
print('Checking available online devices from ATX-Server...')
devices = get_online_devices()
print('\nThere has %s online devices in ATX-Server' % len(devices))
elif method == 'IP':
# get devices from config devices list
print('Checking available IP devices from config... ')
devices = get_devices()
print('\nThere has %s devices alive in config IP list' % len(devices))
elif method == 'USB':
# get devices connected PC with USB
print('Checking available USB devices connected on PC... ')
devices = connect_devices()
print('\nThere has %s USB devices alive ' % len(devices))
else:
raise Exception('Config.ini method illegal:method =%s' % method)
if not devices:
print('There is no device found,test over.')
return
# generate test data data.json 准备测试数据
generate_test_data(devices)
print('Starting Run test >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
runs = []
for i in range(len(devices)):
runs.append(RunCases(devices[i]))
# run on every device 开始执行测试
pool = Pool(processes=len(runs))
for run in runs:
pool.apply_async(self._run_cases,
args=(run, cases,))
print('Waiting for all runs done........ ')
pool.close()
pool.join()
print('All runs done........ ')
ChromeDriver.kill()
# Generate statistics report 生成统计测试报告 将所有设备的报告在一个HTML中展示
create_statistics_report(runs)
# if __name__ == '__main__':
# print(ATX_Server(ReadConfig().get_url()).online_devices())
#
# print(get_devices())
# print(ReadConfig().get_atx_server('method'))
4.持续集成,就是Jenkins ,这里不做多说。