当前位置: 首页 > 工具软件 > ATX > 使用案例 >

基于ATX的app自动化

卓麒
2023-12-01
常规做APPUI自动化时,基本上都是采取的pom模式加上关键字驱动、数据驱动、实现测试数据分离。

需求是:

(1)实现基本业务流程的测试
(2)多设备同时运行
(3)自动拉取最新apk并自动安装
(4)持续集成

分析:

业务的基本流程覆盖,相信做过接口web端的自动化或者接口自动化的同学来说,实现app ui自动化其实不难,也是相同的思路pom模式、关键字驱动、数据驱动、数据分离等。

1.先来自动拉取最新apk 并且实现自动化安装。

import os
import re
import shutil
from urllib.request import urlopen  # 用于获取网页
import requests
from bs4 import BeautifulSoup  # 用于解析网页

path = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))


class automatic(object):
    def __init__(self):
        self.url = 'http://10.124.106.120:28759/service/rest/repository/browse/maven-releases/tcl/release/android/tclplus/'

    @staticmethod
    def _get_url(url):
        """
        根据传入的url来获取超链接
        """
        html = urlopen(url)
        bsObj = BeautifulSoup(html, 'html.parser')
        t1 = bsObj.find_all('a')
        return t1

    # 获取线上最新版本号与版本时间
    def get_latest_version(self):
        """
        获取最新版本号
        """
        t1 = self._get_url(self.url)
        time_version_dict = []
        for t2 in t1:
            t3 = t2.get('href')
            if '2.0' not in t3:
                # 将2.0以上的版本全部过滤掉,然后加入到列表里面
                time_version_dict.append(str(t3).split("/")[0].strip("..").strip("http:"))
        # 找出最大的版本号
        latest_version = max(time_version_dict)
        print(f"最大版本号为:{latest_version}")
        result_url = self.url + str(latest_version)
        return result_url, latest_version

    def download_apk(self):
        """
        下载apk
        """
        global apk_name
        urls, number = self.get_latest_version()
        t1 = self._get_url(urls)
        time_version_dict = []
        url_list = []
        for t2 in t1:
            t3 = t2.get('href')
            url_list.append(t3)
            # 利用正则表达式找到2021开头的版本号
            pattern = re.compile(r'(?:2021)\d+\.?\d*')
            time_version_dict.append(pattern.findall(t3))
        Ak = max(time_version_dict)[0]
        for url in url_list:
            if str(Ak) in url:
                file = requests.get(url, timeout=60)
                apk_name = f"{number}-{max(time_version_dict)[0]}-jiagu-xtest-release.apk"
                with open(path + '/APK/' + apk_name, 'wb') as zip_file:
                    zip_file.write(file.content)
        return apk_name

主要就是发起请求,取到最新版本号,然后获取链接地址,拿到下载地址之后,来写file.content,保存apk 即可。

2.第二步,多设备链接并自动安装
Decorator.py

import time

from functools import wraps
from Common.BasePage import BasePage
from Common.ReportPath import ReportPath
from Common.Log import Log

flag = 'IMAGE:'
log = Log()


def _screenshot(name):
    date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    screenshot = name + '-' + date_time + '.PNG'
    path = ReportPath().get_path() + '/' + screenshot

    driver, sess = BasePage().get_driver()
    driver.screenshot(path)

    return screenshot


def teststep(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            log.i('\t--> %s', func.__qualname__)
            ret = func(*args, **kwargs)
            return ret
        except AssertionError as e:
            log.e('AssertionError, %s', e)
            log.e('\t<-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')

            if flag in str(e):
                raise AssertionError(e)
            else:
                raise AssertionError(flag + _screenshot(func.__qualname__))
        except Exception as e:
            log.e('Exception, %s', e)
            log.e('\t<-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')

            if flag in str(e):
                raise Exception(e)
            else:
                raise Exception(flag + _screenshot(func.__qualname__))

    return wrapper


def teststeps(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            log.i('  --> %s', func.__qualname__)
            ret = func(*args, **kwargs)
            log.i('  <-- %s, %s', func.__qualname__, 'Success')
            return ret
        except AssertionError as e:
            log.e('AssertionError, %s', e)
            log.e('  <-- %s, %s, %s', func.__qualname__, 'AssertionError', 'Error')

            if flag in str(e):
                raise AssertionError(e)
            else:
                raise AssertionError(flag + _screenshot(func.__qualname__))
        except Exception as e:
            log.e('Exception, %s', e)
            log.e('  <-- %s, %s, %s', func.__qualname__, 'Exception', 'Error')

            if flag in str(e):
                raise Exception(e)
            else:
                raise Exception(flag + _screenshot(func.__qualname__))

    return wrapper


def _wrapper(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            log.i('--> %s', func.__qualname__)
            ret = func(*args, **kwargs)
            log.i('<-- %s, %s\n', func.__qualname__, 'Success')
            return ret
        except AssertionError as e:
            log.e('AssertionError, %s', e)
            log.e('<-- %s, %s, %s\n', func.__qualname__, 'AssertionError', 'Fail')

            if flag in str(e):
                raise AssertionError(e)
            else:
                raise AssertionError(flag + _screenshot(func.__qualname__))
        except Exception as e:
            log.e('Exception, %s', e)
            log.e('<-- %s, %s, %s\n', func.__qualname__, 'Exception', 'Error')

            if flag in str(e):
                raise Exception(e)
            else:
                raise Exception(flag + _screenshot(func.__qualname__))

    return wrapper


def testcase(func):
    return _wrapper(func)


def setup(func):
    return _wrapper(func)


def teardown(func):
    return _wrapper(func)


def setupclass(func):
    return _wrapper(func)


def teardownclass(func):
    return _wrapper(func)

基础定位方法封装

基础定位方法封装

BasePage.py

import time
import uiautomator2 as u2
from Common.chromedriver import ChromeDriver
from Common.Ports import Ports
from Common.ReportPath import ReportPath


class BasePage(object):
    @classmethod
    def set_driver(cls, dri):
        cls.d = u2.connect(dri)
        cls.sess = cls.d.session("com.tcl.tclplus")

    def get_driver(self):
        return self.d, self.sess

    def ClickText(self, text):
        """
        点击文本方法操作
        """
        self.d(text=text).click()

    def ClickResId(self, page):
        """
        根据resourceId 进行点击
        """
        self.d(resourceId="com.tcl.tclplus:id/" + page).click()

    def ExistsResId(self, page):
        """
        判断resid 是否存在
        """
        return self.d(resourceId="com.tcl.tclplus:id/" + page).exists()

    def ExistsText(self, text):
        """判断文本 是否存在"""
        return self.d(text=text).exists

    def ClickResIdSendKeys(self, *args):
        """
        通过ResId 定位并且 输入内容
        """
        self.ClickResId(args[0])
        self.d.send_keys(args[1])

    def GetTextResID(self, page):
        """
        根据RESid 来获取文本
        """
        return self.d(resourceId="com.tcl.tclplus:id/" + page).get_text()

    def GetTextXpath(self, text):
        """
        获取文本的xpath
        """
        return self.d.xpath(f'//*[@text="{text}"]').get_text()

    def ClickResIdText(self, page, text):
        """
        根据resIDhe文本进行定位
        """
        self.d(resourceId="com.tcl.tclplus:id/" + page, text=text).click()

    def SwipeOrExist(self, taper, text, up="up"):
        """
        向上滑动,判断元素是否存在,存在即可点击
        根据taper为True或者false 来执行文本判断和resID判断
        滑动语句写到循环里面,每次执行循环,然后进行判断是否存在
        根据参数 up 或者down 来决定滑动的方向。
        """
        while True:
            if taper:
                self.d.swipe_ext(up, 1)
                if not self.d(text=text).exists:
                    continue
                else:
                    self.ClickText(text)
                    break
            else:
                self.d.swipe_ext(up, 0.5)
                if not self.ExistsResId(text):
                    continue
                else:
                    self.ClickResId(text)
                    break

    @classmethod
    def back(cls):
        '''点击返回
        页面没有加载完的时候,会出现返回失败的情况,使用前确认页面加载完成'''
        time.sleep(1)
        cls.d.press('back')
        time.sleep(1)

    @classmethod
    def identify(cls):
        cls.d.open_identify()

    def set_chromedriver(self, device_ip=None, package=None, activity=None, process=None):
        driver = ChromeDriver(self.d, Ports().get_ports(1)[0]). \
            driver(device_ip=device_ip, package=package, attach=True, activity=activity, process=process)
        return driver

    @classmethod
    def watch_device(cls, watch_list):
        '''
        如果存在元素则自动点击
        :param watch_list: exp: watch_list=['允许','yes','跳过']
        '''
        cls.d.watchers.watched = False
        for i in watch_list:
            cls.d.watcher(i).when(text=i).click(text=i)
            # cls.d.watcher("允许").when(text="允许").click(text="允许")
        print('Starting watcher,parameter is %s' % watch_list)
        cls.d.watchers.watched = True

    @classmethod
    def unwatch_device(cls):
        '''关闭watcher '''
        print('Stop all watcher')
        cls.d.watchers.watched = False

    @classmethod
    def get_toast_message(cls):
        message = cls.d.toast.get_message(3, 3)
        cls.d.toast.reset()
        return message

    @classmethod
    def set_fastinput_ime(cls):
        cls.d.set_fastinput_ime(True)

    @classmethod
    def set_original_ime(cls):
        cls.d.set_fastinput_ime(False)

    @classmethod
    def screenshot(cls):
        """截图并打印特定格式的输出,保证用例显示截图"""
        date_time = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
        screenshot_name = cls.__qualname__ + '-' + date_time + '.PNG'
        path = ReportPath().get_path() + '/' + screenshot_name
        cls.d.screenshot(path)
        print('IMAGE:' + screenshot_name)

    @staticmethod
    def find_message(elements, text):
        """查找元素列表中是否存在 text"""
        count = elements.count
        while count > 0:
            count = count - 1
            message = elements[count].info['text']
            if text in message:
                return True
            elif count == 0:
                return False
        else:
            return False

    def _get_window_size(self):
        window = self.d.window_size()
        x = window[0]
        y = window[1]
        return x, y

    @staticmethod
    def _get_element_size(element):
        # rect = element.info['visibleBounds']
        rect = element.info['bounds']
        # print(rect)
        x_center = (rect['left'] + rect['right']) / 2
        y_center = (rect['bottom'] + rect['top']) / 2
        x_left = rect['left']
        y_up = rect['top']
        x_right = rect['right']
        y_down = rect['bottom']

        return x_left, y_up, x_center, y_center, x_right, y_down

    # def _swipe(self, fromX, fromY, toX, toY, steps):
    #     self.d.swipe(fromX, fromY, toX, toY, steps)
    #
    # def swipe_up(self, element=None, steps=0.2):
    #     """
    #     swipe up
    #     :param element: UI element, if None while swipe window of phone
    #     :param steps: steps of swipe for Android, The lower the faster
    #     :return: None
    #     """
    #     if element:
    #         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
    #         fromX = x_center
    #         fromY = y_center
    #         toX = x_center
    #         toY = y_up
    #     else:
    #         x, y = self._get_window_size()
    #         fromX = 0.5 * x
    #         fromY = 0.5 * y
    #         toX = 0.5 * x
    #         toY = 0.25 * y
    #
    #     self._swipe(fromX, fromY, toX, toY, steps)
    #
    # def swipe_down(self, element=None, steps=0.2):
    #     """
    #     swipe down
    #     :param element: UI element, if None while swipe window of phone
    #     :param steps: steps of swipe for Android, The lower the faster
    #     :return: None
    #     """
    #     if element:
    #         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
    #
    #         fromX = x_center
    #         fromY = y_center
    #         toX = x_center
    #         toY = y_down
    #     else:
    #         x, y = self._get_window_size()
    #         fromX = 0.5 * x
    #         fromY = 0.5 * y
    #         toX = 0.5 * x
    #         toY = 0.75 * y
    #
    #     self._swipe(fromX, fromY, toX, toY, steps)
    #
    # def swipe_left(self, element=None, steps=0.2):
    #     """
    #     swipe left
    #     :param element: UI element, if None while swipe window of phone
    #     :param steps: steps of swipe for Android, The lower the faster
    #     :return: None
    #     """
    #     if element:
    #         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
    #         fromX = x_center
    #         fromY = y_center
    #         toX = x_left
    #         toY = y_center
    #     else:
    #         x, y = self._get_window_size()
    #         fromX = 0.5 * x
    #         fromY = 0.5 * y
    #         toX = 0.25 * x
    #         toY = 0.5 * y
    #     self._swipe(fromX, fromY, toX, toY, steps)
    #
    # def swipe_right(self, element=None, steps=0.2):
    #     """
    #     swipe right
    #     :param element: UI element, if None while swipe window of phone
    #     :param steps: steps of swipe for Android, The lower the faster
    #     :return: None
    #     """
    #     if element:
    #         x_left, y_up, x_center, y_center, x_right, y_down = self._get_element_size(element)
    #         fromX = x_center
    #         fromY = y_center
    #         toX = x_right
    #         toY = y_center
    #     else:
    #         x, y = self._get_window_size()
    #         fromX = 0.5 * x
    #         fromY = 0.5 * y
    #         toX = 0.75 * x
    #         toY = 0.5 * y
    #     self._swipe(fromX, fromY, toX, toY, steps)
    #
    # def _find_element_by_swipe(self, direction, value, element=None, steps=0.2, max_swipe=6):
    #     """
    #     :param direction: swip direction exp: right left up down
    #     :param value: The value of the UI element location strategy. exp: d(text='Logina')
    #     :param element: UI element, if None while swipe window of phone
    #     :param steps: steps of swipe for Android, The lower the faster
    #     :param max_swipe: the max times of swipe
    #     :return: UI element
    #     """
    #     times = max_swipe
    #     for i in range(times):
    #         try:
    #             if value.exists:
    #                 return value
    #             else:
    #                 raise UiObjectNotFoundError
    #         except UiObjectNotFoundError:
    #             if direction == 'up':
    #                 self.swipe_up(element=element, steps=steps)
    #             elif direction == 'down':
    #                 self.swipe_down(element=element, steps=steps)
    #             elif direction == 'left':
    #                 self.swipe_left(element=element, steps=steps)
    #             elif direction == 'right':
    #                 self.swipe_right(element=element, steps=steps)
    #             if i == times - 1:
    #                 raise UiObjectNotFoundError
    #
    # def find_element_by_swipe_up(self, value, element=None, steps=0.2, max_swipe=6):
    #     return self._find_element_by_swipe('up', value,
    #                                        element=element, steps=steps, max_swipe=max_swipe)
    #
    # def find_element_by_swipe_down(self, value, element=None, steps=0.2, max_swipe=6):
    #     return self._find_element_by_swipe('down', value,
    #                                        element=element, steps=steps, max_swipe=max_swipe)
    #
    # def find_element_by_swipe_left(self, value, element=None, steps=0.2, max_swipe=6):
    #     return self._find_element_by_swipe('left', value,
    #                                        element=element, steps=steps, max_swipe=max_swipe)
    #
    # def find_element_by_swipe_right(self, value, element=None, steps=0.2, max_swipe=6):
    #     return self._find_element_by_swipe('right', value,
    #                                        element=element, steps=steps, max_swipe=max_swipe)

chromedriver.py

# !/usr/bin/env python
# -*- coding: utf-8 -*-
#
# extension for https://sites.google.com/a/chromium.org/chromedriver/
# Experimental, maybe change in the future
# Created by <hzsunshx> 2017-01-20


from __future__ import absolute_import

import atexit
import six
from selenium import webdriver
import psutil as pt
import os


if six.PY3:
    import subprocess
    from urllib.error import URLError
else:
    from urllib2 import URLError
    import subprocess32 as subprocess


def getPidByName(Str):
    pids = pt.process_iter()
    pidList = []
    for pid in pids:
        if pid.name() == Str:
            pidList.append(int(pid.pid))
    return pidList


class ChromeDriver(object):
    def __init__(self, d, port):
        self._d = d
        self._port = port

    def _launch_webdriver(self):
        # print("start chromedriver instance")
        p = subprocess.Popen(['chromedriver', '--port=' + str(self._port)])
        try:
            p.wait(timeout=2.0)
            return False
        except subprocess.TimeoutExpired:
            return True

    def driver(self, device_ip=None, package=None, attach=True, activity=None, process=None):
        """
        Args:
            - package(string): default current running app
            - attach(bool): default true, Attach to an already-running app instead of launching the app with a clear data directory
            - activity(string): Name of the Activity hosting the WebView.
            - process(string): Process name of the Activity hosting the WebView (as given by ps).
                If not given, the process name is assumed to be the same as androidPackage.

        Returns:
            selenium driver
        """
        app = self._d.current_app()
        capabilities = {
            'chromeOptions': {
                'androidDeviceSerial': device_ip or self._d.serial,
                'androidPackage': package or app['package'],
                'androidUseRunningApp': attach,
                'androidProcess': process or app['package'],
                'androidActivity': activity or app['activity'],
            }
        }

        try:
            dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)
        except URLError:
            self._launch_webdriver()
            dr = webdriver.Remote('http://localhost:%d' % self._port, capabilities)

        # always quit driver when done
        atexit.register(dr.quit)
        return dr

    @staticmethod
    def kill():
        # # for windows
        # pid = getPidByName('chromedriver.exe')
        # for i in pid:
        #     os.popen('taskkill /PID %d /F' % i)

        # # for mac
        pid = getPidByName('chromedriver')
        for i in pid:
            os.popen('kill -9 %d' % i)

        print('All chromedriver pid killed')


# if __name__ == '__main__':
    import uiautomator2 as u2

    # d = u2.connect()
    # driver = ChromeDriver(d).driver()
    # elem = driver.find_element_by_link_text(u"登录")
    # elem.click()
    # driver.quit()
    # ChromeDriver.kill()

业务方法封装,包含自动安装apk

业务方法封装

LoginPage.py

# -*- coding: utf-8 -*-
import random
import string
import threading
from Common.osDriver import *
from Common.Decorator import *

filepath = os.path.normpath(os.path.join(os.path.join(os.path.dirname(os.path.dirname(__file__)))))


class LoginPage(BasePage):

    # @teststep
    # def loginApps(self):
    #     self.ClickText("立即登录")
    #     time.sleep(1)
    #     if self.ExistsText("确认"):
    #         # if self.ExistsResId("bt_confirm"):
    #         log.i("存在确认,先输入账号")
    #         if self.d.device_info["brand"] == "Xiaomi":  # 根据设备型号指定账号
    #             CountUser = '13720140005'
    #             log.i(f"取到的账号为{CountUser}")
    #             self.ClickResIdSendKeys("ll_phone", CountUser)
    #             self.ClickText("确认")
    #         elif self.d.device_info["brand"] == "HUAWEI":  # 根据设备型号指定账号
    #             CountUser = '13720140008'
    #             log.i(f"取到的账号为{CountUser}")
    #             self.ClickResIdSendKeys("ll_phone", CountUser)
    #             self.ClickText("确认")
    # 
    #     time.sleep(3)
    #     if self.ExistsText("账号密码登录"):
    #         self.ClickText("账号密码登录")
    #     self.ClickResIdSendKeys("et_pwd", '123456wqw')
    #     if not self.ExistsResId("cb_login"):
    #         self.d.press("back")
    #     while True:
    #         self.ClickResId("cb_login")
    #         self.ClickText("登录")
    #         time.sleep(5)
    #         message = self.sess.toast.get_message(wait_timeout=0.5)  # 获取页面toast
    #         print(f'获取到的toast为:{message}')
    #         if message == u"请阅读并同意协议":
    #             continue
    #         else:
    #             break
    # 
    # @teststep
    # def Eor(self):
    #     """
    #     判断是否存在 存在即点击
    #     """
    #     if self.ExistsText("允许"):
    #         self.ClickText("允许")
    #     if self.ExistsText("始终允许"):
    #         self.ClickText("始终允许")
    # 
    # def Type_exit(self, types):
    #     """
    #     判断返回 是否为 左 或者 右
    #     左:确认离开  支付的状态
    #     右:确认离开  没有提交订单 只是点了立即购买
    #     @object:
    #     """
    #     if types == 'left':
    #         if self.ExistsResId("tv_left"):
    #             self.ClickResId("tv_left")
    #     elif types == 'right':
    # 
    #         if self.ExistsResId("tv_right"):
    #             self.ClickText("确认离开")
    #     elif types == 'give':
    #         ss2 = self.d(resourceId="com.tencent.mm:id/ffp").exists()
    #         if ss2:
    #             self.d(text='放弃').click()
    #             self.back()
    #             self.d(text='确认离开').click()
    #     elif types == 'li':
    #         if self.ExistsResId("tv_right"):
    #             self.ClickText("确认")
    # 
    #     else:
    #         pass
    # 
    # def openIndex(self, ides=''):
    #     """
    #     一直点返回按钮,当出现 提示语 就结束
    #     """
    # 
    #     while True:
    #         self.back()
    #         time.sleep(1)
    #         self.Type_exit(ides)
    #         message = self.sess.toast.get_message(wait_timeout=0.5)  # 获取页面toast
    #         if message == u"再按一次退出程序":
    #             break
    # 
    # @teststep
    # def ClearAndLogin(self):
    #     """
    #     执行清除缓存 ,然后登录
    #     """
    #     # dev_list = []
    #     # for d in adbutils.adb.device_list():
    #     #     dev_list.append(d)
    #     # for i in dev_list:
    #     #     s = str(i).split("=")[1].split(")")[0]
    #     #     print(f"""取到的设备id为:{s}""")
    #     #     os.system(f"adb shell -s {s} pm clear com.tcl.tclplus")
    #     # time.sleep(5)
    #     pass
    # 
    # @teststep
    # def update_nickname(self, *args):
    #     """修改昵称"""
    #     if self.d(text="保存").exists():
    #         pass
    #     else:
    #         self.d.xpath(
    #             '//*[@resource-id="com.tcl.tclplus:id/fl_head_view"]/android.widget.ImageView[5]').click()
    #     try:
    #         # self.ClickResIdSendKeys("et_nickname", args[0])
    #         self.ClickResId("et_nickname")
    #         time.sleep(1)
    #         # 点旁边的“x”情况昵称
    #         self.ClickResId("iv_clear")
    #         self.d.send_keys(args[0])
    #         time.sleep(1)
    #         self.ClickText("保存")
    #         time.sleep(1)
    #         args[1](args[0], self.GetTextResID("tv_nickName"))
    #         log.i("修改昵称成功")
    #     except Exception as e:
    #         raise log.i("修改昵称失败")
    # 
    # @teststep
    # def tv_sms(self, *args):
    #     """"""
    #     try:
    #         self.ClickResId(args[1])
    #         time.sleep(2)
    #         args[0](args[2], self.GetTextResID("toolbar_title"))
    #         log.i(f"成功进入{args[2]}页面")
    #     except Exception as e:
    #         raise log.i(f'进入{args[2]}页面失败')
    #     finally:
    #         self.d.press("back")
    # 
    # @teststep
    # def add_invoice(self, Invoice, types=''):
    #     """
    #     新增发票
    #     """
    #     a = random.sample(string.ascii_letters, 4)
    #     data = ''.join([str(x) for x in a])
    #     time.sleep(1)
    #     self.ClickText("添加发票抬头")
    #     if Invoice == '企业':
    #         if types == '电子发票':
    #             self.ClickText("企业")
    #             self.ClickResIdSendKeys("et_invoice_header", data)
    #             self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))
    #             self.d.press("back")
    #             self.ClickText("完成")
    #         else:
    #             self.ClickText("企业")
    #             self.ClickResId("rb_VAT_invoice")
    #             time.sleep(2)
    #             self.ClickResIdSendKeys("et_invoice_header", data)
    #             self.ClickResIdSendKeys("et_tax_no", "42112588888232" + str(random.randint(1, 88)))
    #             self.ClickResIdSendKeys("et_bank_name", "建设银行坂田支行")
    #             self.ClickResIdSendKeys("et_bank_no", "6217007200031609020")
    #             self.d.press("back")
    #             self.ClickResIdSendKeys("et_address", "南山区")
    #             self.d.press("back")
    #             self.ClickResIdSendKeys("et_mobile", "07554243060")
    #             self.d.press("back")
    #             self.ClickText("完成")
    #     else:
    #         self.ClickResIdSendKeys("et_invoice_header", data)
    #         self.ClickResIdSendKeys("et_phone_no", "13599837022")
    #         self.ClickText("完成")
    # 
    # @teststep
    # def update_invoice(self, ys):
    #     """修改发票抬头"""
    #     ids = 1
    #     # 第一次进入 需要点编辑
    #     self.d.xpath(
    #         f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[1]/android.widget.ImageView[1]').click()
    #     while True:
    #         ids += 1
    #         time.sleep(2)
    #         if self.d(text=ys).exists():
    #             break
    #         else:
    #             time.sleep(2)
    #             self.d.press('back')
    #             time.sleep(1)
    #             self.d.xpath(
    #                 f'//*[@resource-id="com.tcl.tclplus:id/recyclerview"]/android.view.ViewGroup[{ids}]/android.widget.ImageView[1]').click()
    #             continue
    # 
    # @teststep
    # def add_address_phone(self):
    #     """
    #      新增地址时,填写信息
    #     """
    #     self.ClickResIdSendKeys("et_name", "李白")
    #     self.ClickResIdSendKeys("et_phone", "13699837021")
    #     self.ClickResId("tv_area")
    #     for i in ["安徽省", "安庆市", "大观区", "山口乡"]: self.ClickText(i)
    #     self.ClickResId("et_addr")
    #     self.d.send_keys("安徽省安庆市")
    #     self.ClickText("公司")
    #     self.d.press('back')
    #     self.ClickText("保存")
    # 
    # @teststep
    # def binding(self, *args):
    #     """
    #     绑定qq 或者微信
    #     custom_qq,custom_wechat
    #     """
    #     # 如果类型是qq或者微信,先点击取消绑定,然后进行绑定
    #     self.d.xpath(
    #         f'//*[@resource-id="com.tcl.tclplus:id/{args[0]}"]/android.widget.FrameLayout['
    #         '1]/android.view.ViewGroup[1]/android.widget.FrameLayout[1]').click()
    #     time.sleep(2)
    #     if self.d(text="继续解绑").exists():
    #         self.ClickText("继续解绑")
    #     else:
    #         time.sleep(2)
    #         if args[0] == "custom_qq":
    #             time.sleep(2)
    #             self.ClickText("授权登录")
    #         time.sleep(5)
    #         self.ClickText("确认绑定")
    #         time.sleep(2)
    #         args[1]("绑定成功", self.sess.toast.get_message())

    @teststep
    def local_install(self):
        """
        下载apk 到apk路径下,然后进行区分安装
        """
        # 先删除该文件夹下所有的文件:然后再去下载apk
        automatic().del_file('../apk')
        # 清空文件夹下文件之后,再去下载最新apk
        apk_name = automatic().download_apk()
        if self.d.device_info["brand"] == "HUAWEI":  # 根据设备型号来执行不同的安装方式
            threaded1 = install_thead(self.d)
            threaded1.start()
            threaded1.app_install(path + '/apk/' + apk_name)
            threaded1.join()  # 等待所有线程终止
            time.sleep(5)
            os.system(f"adb -s 8KE0220413004952 shell pm clear com.tcl.tclplus")
            self.d.app_start("com.tcl.tclplus")
        elif self.d.device_info["brand"] == "Xiaomi":  # 根据设备型号来执行不同的安装方式
            os.system(f"adb -s d083a03e install -r -d " + filepath + "/apk/" + apk_name)
            time.sleep(2)
            os.system(f"adb -s d083a03e shell pm clear com.tcl.tclplus")
            self.d.app_start("com.tcl.tclplus")


class install_thead(threading.Thread):
    def __init__(self, devices):  # 需要传入self.d 设备的信息
        self.drivers = devices
        threading.Thread.__init__(self, )

    def usb_install(self):
        for i in range(3):
            try:
                self.drivers(text='继续安装').click()
            except:
                pass
            try:
                self.drivers(text='打开').click()
            except:
                pass

    def run(self):  # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
        self.usb_install()

    def app_install(self, app_file_path):

        self.drivers.app_install(app_file_path)

这里根据每个设备的self.d 的devices_info中的brand 来区分 进行不同的安装方法,采用了多线程运行,来去掉安装过程中存在警告提示等。自行添加即可。

以上就完成了 基础的定位方法封装、业务定位封装、多设备启动、多设备安装。

3.同时运行
CaseStrategy.py

import os
import unittest


class CaseStrategy:
    def __init__(self):
        self.suite_path = 'TestSuite_'
        self.case_path = 'TestCase'
        self.case_pattern = 'test*.py'

    def _collect_cases(self, cases, top_dir=None):
        suites = unittest.defaultTestLoader.discover(self.case_path,
                                                     pattern=self.case_pattern, top_level_dir=top_dir)
        for suite in suites:
            for case in suite:
                print(f"需要执行的测试用例为:{case}")
                cases.addTest(case)

    def collect_cases(self, suite=False):
        """collect cases

        collect cases from the giving path by case_path via the giving pattern by case_pattern

        return: all cases that collected by the giving path and pattern, it is a unittest.TestSuite()

        """
        cases = unittest.TestSuite()

        if suite:
            test_suites = []
            for file in os.listdir('.'):
                if self.suite_path in file:
                    if os.path.isdir(file):
                        test_suites.append(file)

            for test_suite in test_suites:
                self._collect_cases(cases, top_dir=test_suite)
        else:
            self._collect_cases(cases, top_dir=None)

        return cases

devices.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
多进程check_alive
Mac下需要配置  `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量,不然python会挂掉
'''
from Common.ReadConfig import ReadConfig
from Common.ATX_Server import ATX_Server
import uiautomator2 as u2
import subprocess
import re

from multiprocessing import Pool


def get_devices():
    '''get the devices from Pubilc/config.ini devices list
    return alive devices'''
    devices_ip = ReadConfig().get_devices_ip()
    print('Connect devices from config devices IP list %s' % devices_ip)
    pool = Pool(processes=len(devices_ip))
    tmp_list = []
    for run in devices_ip:
        tmp_list.append(pool.apply_async(check_alive, args=(run,)))
    pool.close()
    pool.join()
    devices_list = []
    for i in tmp_list:
        if i.get():
            devices_list.append(i.get())
    return devices_list


def get_online_devices():
    '''get the devices from ATX-Server
    return alive devices'''
    devices = ATX_Server(ReadConfig().get_server_url()).online_devices()
    print('There has %s online devices on ATX-Server' % len(devices))
    if devices:
        pool = Pool(processes=len(devices))
        tmp_list = []
        for run in devices:
            tmp_list.append(pool.apply_async(check_alive, args=(run,)))
        pool.close()
        pool.join()
        devices_list = []
        for i in tmp_list:
            if i.get():
                devices_list.append(i.get())
        return devices_list
    else:
        raise Exception('ATX-Server has no online device!!! ')


def connect_devices():
    '''get the devices USB connected on PC
    return alive devices'''
    output = subprocess.check_output(['adb', 'devices'])
    pattern = re.compile(
        r'(?P<serial>[^\s]+)\t(?P<status>device|offline)')
    matches = pattern.findall(output.decode())
    valid_serials = [m[0] for m in matches if m[1] == 'device']

    if valid_serials:
        print('There has %s devices connected on PC: ' % len(valid_serials))
        pool = Pool(processes=len(valid_serials))
        tmp_list = []
        for run in valid_serials:
            tmp_list.append(pool.apply_async(check_alive, args=(run,)))
        pool.close()
        pool.join()
        devices_list = []
        for i in tmp_list:
            if i.get():
                devices_list.append(i.get())
        return devices_list
    if len(valid_serials) == 0:
        print("No available android devices detected.")
        return []


def check_alive(device):
    if isinstance(device, dict):
        d = u2.connect(device['ip'])
        if d.agent_alive:
            d.healthcheck()
            if d.alive:
                print('%s is alive' % device['udid'])
                return d.device_info
            else:
                print('%s is not alive' % device['udid'])
                return None
        else:
            print('The device atx_agent %s  is not alive,please checkout!' % device['udid'])
            return None
    else:
        d = u2.connect(device)
        if d.agent_alive:
            d.healthcheck()
            if d.alive:
                print('%s is alive' % device)
                return d.device_info
            else:
                print('%s is not alive' % device)
                return None
        else:
            print('The device atx_agent %s  is not alive,please checkout!' % device)
            return None





# if __name__ == '__main__':
    # devices_ip = get_devices()

    # devices = connect_devices()
    # devices = get_online_devices()
    # print(devices_ip)
    #
    # pool = Pool(processes=len(devices_ip))
    # tmp_list = []
    # for run in devices_ip:
    #     tmp_list.append(pool.apply_async(check_alive, args=(run,)))
    #     # alive_list.append(tmp)
    # pool.close()
    # pool.join()
    # print('All runs done........ ')
    # print(tmp_list)
    # for i in tmp_list:
    #     print(i.get())
    # print(get_devices())
    # print(get_online_devices())
    # print(connect_devices())

driver.py

from Common.Devices import *    # 多进程 check_alive ,Mac下需要配置  `export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES`到环境变量

from Common.RunCases import RunCases
from Common.ReportPath import ReportPath
from Common.BasePage import BasePage
from Common.Log import Log
from Common.ReadConfig import ReadConfig
from Common.chromedriver import ChromeDriver
from Common.Test_data import generate_test_data
from Common.Report import create_statistics_report


class Drivers:
    @staticmethod
    def _run_cases(run, cases):
        log = Log()
        log.set_logger(run.get_device()['model'], run.get_path() + '/' + 'client.log')
        log.i('udid: %s', run.get_device()['udid'])

        # set cls.path, it must be call before operate on any page
        path = ReportPath()
        path.set_path(run.get_path())

        # set cls.driver, it must be call before operate on any page
        base_page = BasePage()
        if 'ip' in run.get_device():
            base_page.set_driver(run.get_device()['ip'])
        else:
            base_page.set_driver(run.get_device()['serial'])

        try:
            # run cases
            base_page.set_fastinput_ime()

            run.run(cases)

            base_page.set_original_ime()
            base_page.identify()
        except AssertionError as e:
            log.e('AssertionError, %s', e)

    def run(self, cases):
        # 根据method 获取android设备
        method = ReadConfig().get_method().strip()
        if method == 'SERVER':
            # get ATX-Server Online devices
            # devices = ATX_Server(ReadConfig().get_server_url()).online_devices()
            print('Checking available online devices from ATX-Server...')
            devices = get_online_devices()
            print('\nThere has %s online devices in ATX-Server' % len(devices))
        elif method == 'IP':
            # get  devices from config devices list
            print('Checking available IP devices from config... ')
            devices = get_devices()
            print('\nThere has %s  devices alive in config IP list' % len(devices))
        elif method == 'USB':
            # get  devices connected PC with USB
            print('Checking available USB devices connected on PC... ')
            devices = connect_devices()
            print('\nThere has %s  USB devices alive ' % len(devices))

        else:
            raise Exception('Config.ini method illegal:method =%s' % method)

        if not devices:
            print('There is no device found,test over.')
            return

        # generate test data data.json 准备测试数据
        generate_test_data(devices)

        print('Starting Run test >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
        runs = []
        for i in range(len(devices)):
            runs.append(RunCases(devices[i]))

        # run on every device 开始执行测试
        pool = Pool(processes=len(runs))
        for run in runs:
            pool.apply_async(self._run_cases,
                             args=(run, cases,))
        print('Waiting for all runs done........ ')
        pool.close()
        pool.join()
        print('All runs done........ ')
        ChromeDriver.kill()

        #  Generate statistics report  生成统计测试报告 将所有设备的报告在一个HTML中展示
        create_statistics_report(runs)

# if __name__ == '__main__':
    # print(ATX_Server(ReadConfig().get_url()).online_devices())
    #
    # print(get_devices())
    # print(ReadConfig().get_atx_server('method'))

4.持续集成,就是Jenkins ,这里不做多说。

 类似资料: