Source code for testbot.testengine.caserunner

#!/usr/bin/env python
# -*- coding: utf-8 -*-


[docs]__author__ = "Nuanguang Gu(Sunny)"
[docs]__email__ = "nuanguang.gu@aliyun.com"
""" Test Engine """ import os import inspect import importlib import threading import traceback from enum import Enum from pkginfo import Installed from pkg_resources import get_distribution from testbot.config import CONFIG_PATH, RUNNER_LOGS_PATH, CASE_LOGS_PATH from testbot.case.precondition import IsTestCaseType, IsTestCasePriority, IsPreCasePassed, IsHigherPriorityPassed from testbot.config.setting import static_setting, SettingBase from testbot.result.testreporter import StepReporter, CaseEntry from testbot.case.base import TestCaseBase from testbot.resource.error import ResourceNotMeetConstraintError, ResourceLoadError, ResourceNotRelease from testbot.resource.pool import ResourcePool from testbot.testengine.testlist import TestList from testbot.config.logicmodule import ModuleManager, ModuleType from testbot.result.logger import logger_manager from testbot.utilities.time import get_time_stamp @static_setting.setting("CaseRunner")
[docs]class CaseRunnerSetting(SettingBase): """ The case runner setting """
[docs] default_case_setting_path = CONFIG_PATH
[docs] log_path = RUNNER_LOGS_PATH
[docs] case_log = CASE_LOGS_PATH
[docs] log_level = "INFO"
[docs]class CaseImportError(Exception): def __init__(self, msg, inner_ex=None): super().__init__(msg) self.inner_ex = inner_ex
[docs]class TestEngineNotReadyError(Exception): def __init__(self, msg): super().__init__(msg)
[docs]class RunningStatus(Enum):
[docs] Idle = 1
[docs] Running = 3
[docs]class CaseRunner(object): """ 测试用例执行器 """ def __init__(self): self.resource_pool = None self.list_setting = None self.test_list = None self.case_tree = dict() self.priority_list = list() self.pre_conditions = list() self.status = RunningStatus.Idle self.module_manager = ModuleManager() self.running_thread = None self.logger = logger_manager.register("CaseRunner", filename=os.path.join(RUNNER_LOGS_PATH, "CaseRunner.log"), default_level=CaseRunnerSetting.log_level, for_test=True) # self.result_report = ResultReporter(logger=self.logger) self.step_report = StepReporter.get_instance(logger=self.logger) self.logger.info("执行器装载完毕") self.case_log_folder = None self.case_result = dict()
[docs] def load_resource(self, file_name: str, username: str): """ 加载测试资源 :param file_name: 资源文件路径 :type file_name: str :param username: 资源拥有者 :type username: str :return: :rtype: """ self.resource_pool = ResourcePool() try: self.resource_pool.load(file_name, username) for key, device in self.resource_pool.topology.items(): if device.pre_connect: device_instance = device.get_comm_instance() if hasattr(device_instance, "connect"): device_instance.connect() self.resource_pool.discover_resources() except ResourceLoadError as rle: #资源文件读取错误 self.logger.exception(rle) self.resource_pool = None except ResourceNotRelease as rnr: #资源文件被占用 self.logger.exception(rnr) self.resource_pool = None except Exception as ex: self.logger.exception(ex) self.resource_pool = None self.logger.info("测试资源装载完毕")
@property
[docs] def resource_ready(self): """ 资源是否已准备好 :return: :rtype: """ return self.resource_pool is not None
@property
[docs] def test_list_ready(self): """ 测试用例列表是否已准备好 :return: :rtype: """ return self.test_list is not None
[docs] def load_test(self, test_name: str) -> TestCaseBase: """ 实例化测试用例 :param test_name: 测试名称 :type test_name: str :return: :rtype: """ # 获取测试用例的模块名和类名 case_module_name = ".".join(test_name.split(".")[0: -1]) case_name = test_name.split(".")[-1] try: self.logger.info(f"正在加载用例模块: {case_module_name}...") self.print_module_info() case_module = importlib.import_module(case_module_name) return getattr(case_module, case_name)(reporter=self.step_report) except Exception as ex: traceinfo = "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) # 导入测试用例失败,抛出异常 raise CaseImportError("Failed to Import Test Case %s" % test_name, traceinfo)
[docs] def print_module_info(self): self.logger.info("#" * 80) fwk_versions = { "testbot": None, "testbot_aw": None, "testbot_scripts": None, "testbot_tools": None } for key in fwk_versions: pkg = None location = None try: pkg = Installed(key) location = get_distribution(key) except: pass self.logger.info(f"############## {key}模块 【Version】:{pkg.version},【概要】:{pkg.summary},【安装路径】:{str(location)} ##############") self.logger.info("#" * 80)
[docs] def set_test_list(self, test_list: TestList): """ #装载测试列表 """ self.test_list = test_list self.list_setting = None self.case_tree.clear() self._import_list_case(self.case_tree, self.test_list) if any(self.test_list.setting.priority_to_run): self.priority_list = self.test_list.setting.priority_to_run self.logger.info("测试列表装载完毕")
[docs] def start(self): """ 测试引擎开始执行 """ if self.status == RunningStatus.Running: return if not self.resource_ready: raise TestEngineNotReadyError("测试引擎未准备就绪,测试资源未装载") if self.test_list is None: raise TestEngineNotReadyError("测试引擎未准备就绪,测试列表未装载") self.status = RunningStatus.Running self.case_log_folder = os.path.join(CaseRunnerSetting.case_log, get_time_stamp()) self.running_thread = threading.Thread(target=self.__main_test_thread) self.running_thread.start()
[docs] def wait_for_test_done(self): """ 等待测试完成 :return: :rtype: """ self.running_thread.join()
[docs] def run_case_lcm(self, test: TestCaseBase, node: CaseEntry): """ 执行测试用例生命周期管理 这个方法应该在子线程被运行 """ self.__init_precondition(test) if not self.__pre_check(test): return self.module_manager.run_module(ModuleType.PRE) self.module_manager.run_module(ModuleType.PARALLEL) self.__run_case(test=test, node=node) self.module_manager.stop_module() self.module_manager.run_module(ModuleType.POST)
[docs] def _import_list_case(self, case_tree_node, test_list, log_path=None): """ 递归导入测试列表中的测试用例 """ case_log_path = test_list.test_list_name if log_path: case_log_path = log_path + "/" + case_log_path case_tree_node["list_name"] = test_list.test_list_name case_tree_node["test_cases"] = list() for testcase in test_list.test_cases: if testcase.strip() == "": continue case_descriptor = dict() case_entry = testcase.split(",") case_name = case_entry[0] case_setting_file = "" if len(case_entry) > 1: case_setting_file = case_entry[1] try: # 导入测试用例 case_descriptor['case'] = self.load_test(case_name) case_descriptor['case_name'] = case_name.split(".")[-1] case_descriptor['log_path'] = case_log_path case_descriptor['filename'] = case_setting_file # 设置测试用例配置文件路径 if test_list.setting.case_setting_path: case_descriptor['setting_path'] = test_list.setting.case_setting_path else: case_descriptor['setting_path'] = os.path.dirname(inspect.getfile(case_descriptor['case'].__class__)) if case_descriptor['case'] else CaseRunnerSetting.default_case_setting_path case_priority = getattr(case_descriptor['case'], "priority", 999) if case_priority not in self.priority_list: self.priority_list.append(case_priority) except CaseImportError as cie: # 测试用例导入失败 self.logger.error(f"不能导入测试用例{case_name}") self.logger.exception(cie) case_tree_node['test_cases'].append(case_descriptor) case_tree_node['sub_list'] = list() for sub_list in test_list.sub_list: sub_list_dict = dict() case_tree_node['sub_list'].append(sub_list_dict) self._import_list_case(sub_list_dict, sub_list, log_path=case_log_path)
[docs] def __init_precondition(self, test: TestCaseBase): self.pre_conditions.clear() self.pre_conditions.append(IsTestCaseType(self.test_list.setting.run_type)) if any(self.test_list.setting.priority_to_run): self.pre_conditions.append(IsTestCasePriority(self.test_list.setting.priority_to_run)) if any(test.pre_tests): self.pre_conditions.append(IsPreCasePassed(self.case_result)) self.pre_conditions.append(IsHigherPriorityPassed(test.priority, self.case_result))
[docs] def __pre_check(self, test:TestCaseBase): for condition in self.pre_conditions: if not condition.is_meet(test, reporter=self.step_report): self.step_report.logger.info(f"{test.__class__.__name__}不能执行!") return False return True
[docs] def __get_case_log(self, path, case_name): log_path = os.path.join(self.case_log_folder, path, f"{case_name}.log") return logger_manager.register(case_name, filename=log_path, is_test=True)
[docs] def __main_test_thread(self): try: self.__run_test_list(self.case_tree) finally: self.status = RunningStatus.Idle
[docs] def __run_test_list(self, testlist): with self.step_report.root.start_node(headline="执行测试节点", message="").start_case(headline="执行测试用例") as node: for test in testlist['test_cases']: test["case"].get_setting(test["setting_path"], test["filename"]) self.step_report.logger = self.__get_case_log(test['log_path'], test['case_name']) self.case_result[test["case_name"]] = dict() self.case_result[test["case_name"]]['priority'] = test["case"].priority self.case_result[test["case_name"]]['result'] = False with node.start(headline="执行用例", message="", prefix="RUNCASE") as node2: self.run_case_lcm(test=test['case'], node=node2) self.step_report.logger = None logger_manager.unregister(test['case_name']) for list in testlist['sub_list']: self.__run_test_list(list)
[docs] def __run_case(self, test: TestCaseBase, node: CaseEntry): """ 测试用例执行线程 """ test._run_case(pool=self.resource_pool, node=node)
[docs] def __call_cleanup(self, test: TestCaseBase, node: CaseEntry): """ 执行清除操作 """ with node.start(headline="清理后置条件", message="", prefix="CLEANUP") as node2: try: test.cleanup(node=node2) except Exception as ex: traceinfo = "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)) self.logger.error(traceinfo) node2.info(message=f"捕获异常: {traceinfo}") finally: pass