#!/usr/bin/env python
# -*- coding: utf-8 -*-
[docs]__copyright__ = "Copyright (c) 2024 Nuanguang Gu(Sunny) Reserved"
[docs]__author__ = "Nuanguang Gu(Sunny)"
[docs]__email__ = "nuanguang.gu@aliyun.com"
"""
Test Engine
"""
import os
import inspect
import importlib
import threading
import traceback
from enum import Enum
from pkginfo import Installed
from pkg_resources import get_distribution
from testbot.config import CONFIG_PATH, RUNNER_LOGS_PATH, CASE_LOGS_PATH
from testbot.case.precondition import IsTestCaseType, IsTestCasePriority, IsPreCasePassed, IsHigherPriorityPassed
from testbot.config.setting import static_setting, SettingBase
from testbot.result.testreporter import StepReporter, CaseEntry
from testbot.case.base import TestCaseBase
from testbot.resource.error import ResourceNotMeetConstraintError, ResourceLoadError, ResourceNotRelease
from testbot.resource.pool import ResourcePool
from testbot.testengine.testlist import TestList
from testbot.config.logicmodule import ModuleManager, ModuleType
from testbot.result.logger import logger_manager
from testbot.utilities.time import get_time_stamp
@static_setting.setting("CaseRunner")
[docs]class CaseRunnerSetting(SettingBase):
"""
The case runner setting
"""
[docs] default_case_setting_path = CONFIG_PATH
[docs] log_path = RUNNER_LOGS_PATH
[docs] case_log = CASE_LOGS_PATH
[docs]class CaseImportError(Exception):
def __init__(self, msg, inner_ex=None):
super().__init__(msg)
self.inner_ex = inner_ex
[docs]class TestEngineNotReadyError(Exception):
def __init__(self, msg):
super().__init__(msg)
[docs]class RunningStatus(Enum):
[docs]class CaseRunner(object):
"""
测试用例执行器
"""
def __init__(self):
self.resource_pool = None
self.list_setting = None
self.test_list = None
self.case_tree = dict()
self.priority_list = list()
self.pre_conditions = list()
self.status = RunningStatus.Idle
self.module_manager = ModuleManager()
self.running_thread = None
self.logger = logger_manager.register("CaseRunner", filename=os.path.join(RUNNER_LOGS_PATH, "CaseRunner.log"), default_level=CaseRunnerSetting.log_level, for_test=True)
# self.result_report = ResultReporter(logger=self.logger)
self.step_report = StepReporter.get_instance(logger=self.logger)
self.logger.info("执行器装载完毕")
self.case_log_folder = None
self.case_result = dict()
[docs] def load_resource(self, file_name: str, username: str):
"""
加载测试资源
:param file_name: 资源文件路径
:type file_name: str
:param username: 资源拥有者
:type username: str
:return:
:rtype:
"""
self.resource_pool = ResourcePool()
try:
self.resource_pool.load(file_name, username)
for key, device in self.resource_pool.topology.items():
if device.pre_connect:
device_instance = device.get_comm_instance()
if hasattr(device_instance, "connect"):
device_instance.connect()
self.resource_pool.discover_resources()
except ResourceLoadError as rle:
#资源文件读取错误
self.logger.exception(rle)
self.resource_pool = None
except ResourceNotRelease as rnr:
#资源文件被占用
self.logger.exception(rnr)
self.resource_pool = None
except Exception as ex:
self.logger.exception(ex)
self.resource_pool = None
self.logger.info("测试资源装载完毕")
@property
[docs] def resource_ready(self):
"""
资源是否已准备好
:return:
:rtype:
"""
return self.resource_pool is not None
@property
[docs] def test_list_ready(self):
"""
测试用例列表是否已准备好
:return:
:rtype:
"""
return self.test_list is not None
[docs] def load_test(self, test_name: str) -> TestCaseBase:
"""
实例化测试用例
:param test_name: 测试名称
:type test_name: str
:return:
:rtype:
"""
# 获取测试用例的模块名和类名
case_module_name = ".".join(test_name.split(".")[0: -1])
case_name = test_name.split(".")[-1]
try:
self.logger.info(f"正在加载用例模块: {case_module_name}...")
self.print_module_info()
case_module = importlib.import_module(case_module_name)
return getattr(case_module, case_name)(reporter=self.step_report)
except Exception as ex:
traceinfo = "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
# 导入测试用例失败,抛出异常
raise CaseImportError("Failed to Import Test Case %s" % test_name, traceinfo)
[docs] def print_module_info(self):
self.logger.info("#" * 80)
fwk_versions = {
"testbot": None,
"testbot_aw": None,
"testbot_scripts": None,
"testbot_tools": None
}
for key in fwk_versions:
pkg = None
location = None
try:
pkg = Installed(key)
location = get_distribution(key)
except:
pass
self.logger.info(f"############## {key}模块 【Version】:{pkg.version},【概要】:{pkg.summary},【安装路径】:{str(location)} ##############")
self.logger.info("#" * 80)
[docs] def set_test_list(self, test_list: TestList):
"""
#装载测试列表
"""
self.test_list = test_list
self.list_setting = None
self.case_tree.clear()
self._import_list_case(self.case_tree, self.test_list)
if any(self.test_list.setting.priority_to_run):
self.priority_list = self.test_list.setting.priority_to_run
self.logger.info("测试列表装载完毕")
[docs] def start(self):
"""
测试引擎开始执行
"""
if self.status == RunningStatus.Running:
return
if not self.resource_ready:
raise TestEngineNotReadyError("测试引擎未准备就绪,测试资源未装载")
if self.test_list is None:
raise TestEngineNotReadyError("测试引擎未准备就绪,测试列表未装载")
self.status = RunningStatus.Running
self.case_log_folder = os.path.join(CaseRunnerSetting.case_log, get_time_stamp())
self.running_thread = threading.Thread(target=self.__main_test_thread)
self.running_thread.start()
[docs] def wait_for_test_done(self):
"""
等待测试完成
:return:
:rtype:
"""
self.running_thread.join()
[docs] def run_case_lcm(self, test: TestCaseBase, node: CaseEntry):
"""
执行测试用例生命周期管理
这个方法应该在子线程被运行
"""
self.__init_precondition(test)
if not self.__pre_check(test):
return
self.module_manager.run_module(ModuleType.PRE)
self.module_manager.run_module(ModuleType.PARALLEL)
self.__run_case(test=test, node=node)
self.module_manager.stop_module()
self.module_manager.run_module(ModuleType.POST)
[docs] def _import_list_case(self, case_tree_node, test_list, log_path=None):
"""
递归导入测试列表中的测试用例
"""
case_log_path = test_list.test_list_name
if log_path:
case_log_path = log_path + "/" + case_log_path
case_tree_node["list_name"] = test_list.test_list_name
case_tree_node["test_cases"] = list()
for testcase in test_list.test_cases:
if testcase.strip() == "":
continue
case_descriptor = dict()
case_entry = testcase.split(",")
case_name = case_entry[0]
case_setting_file = ""
if len(case_entry) > 1:
case_setting_file = case_entry[1]
try:
# 导入测试用例
case_descriptor['case'] = self.load_test(case_name)
case_descriptor['case_name'] = case_name.split(".")[-1]
case_descriptor['log_path'] = case_log_path
case_descriptor['filename'] = case_setting_file
# 设置测试用例配置文件路径
if test_list.setting.case_setting_path:
case_descriptor['setting_path'] = test_list.setting.case_setting_path
else:
case_descriptor['setting_path'] = os.path.dirname(inspect.getfile(case_descriptor['case'].__class__)) if case_descriptor['case'] else CaseRunnerSetting.default_case_setting_path
case_priority = getattr(case_descriptor['case'], "priority", 999)
if case_priority not in self.priority_list:
self.priority_list.append(case_priority)
except CaseImportError as cie:
# 测试用例导入失败
self.logger.error(f"不能导入测试用例{case_name}")
self.logger.exception(cie)
case_tree_node['test_cases'].append(case_descriptor)
case_tree_node['sub_list'] = list()
for sub_list in test_list.sub_list:
sub_list_dict = dict()
case_tree_node['sub_list'].append(sub_list_dict)
self._import_list_case(sub_list_dict, sub_list, log_path=case_log_path)
[docs] def __init_precondition(self, test: TestCaseBase):
self.pre_conditions.clear()
self.pre_conditions.append(IsTestCaseType(self.test_list.setting.run_type))
if any(self.test_list.setting.priority_to_run):
self.pre_conditions.append(IsTestCasePriority(self.test_list.setting.priority_to_run))
if any(test.pre_tests):
self.pre_conditions.append(IsPreCasePassed(self.case_result))
self.pre_conditions.append(IsHigherPriorityPassed(test.priority, self.case_result))
[docs] def __pre_check(self, test:TestCaseBase):
for condition in self.pre_conditions:
if not condition.is_meet(test, reporter=self.step_report):
self.step_report.logger.info(f"{test.__class__.__name__}不能执行!")
return False
return True
[docs] def __get_case_log(self, path, case_name):
log_path = os.path.join(self.case_log_folder, path, f"{case_name}.log")
return logger_manager.register(case_name, filename=log_path, is_test=True)
[docs] def __main_test_thread(self):
try:
self.__run_test_list(self.case_tree)
finally:
self.status = RunningStatus.Idle
[docs] def __run_test_list(self, testlist):
with self.step_report.root.start_node(headline="执行测试节点", message="").start_case(headline="执行测试用例") as node:
for test in testlist['test_cases']:
test["case"].get_setting(test["setting_path"], test["filename"])
self.step_report.logger = self.__get_case_log(test['log_path'], test['case_name'])
self.case_result[test["case_name"]] = dict()
self.case_result[test["case_name"]]['priority'] = test["case"].priority
self.case_result[test["case_name"]]['result'] = False
with node.start(headline="执行用例", message="", prefix="RUNCASE") as node2:
self.run_case_lcm(test=test['case'], node=node2)
self.step_report.logger = None
logger_manager.unregister(test['case_name'])
for list in testlist['sub_list']:
self.__run_test_list(list)
[docs] def __run_case(self, test: TestCaseBase, node: CaseEntry):
"""
测试用例执行线程
"""
test._run_case(pool=self.resource_pool, node=node)
[docs] def __call_cleanup(self, test: TestCaseBase, node: CaseEntry):
"""
执行清除操作
"""
with node.start(headline="清理后置条件", message="", prefix="CLEANUP") as node2:
try:
test.cleanup(node=node2)
except Exception as ex:
traceinfo = "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
self.logger.error(traceinfo)
node2.info(message=f"捕获异常: {traceinfo}")
finally:
pass