Files
pqAutomationApp/app/pq/pq_result.py
2026-04-21 16:03:11 +08:00

586 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
@dataclass
class TestItemResult:
"""单个测试项的结果数据"""
item_name: str # 测试项名称 (如 "gamut", "gamma", "cct" 等)
item_display_name: str # 测试项显示名称 (如 "色域", "Gamma", "色温一致性" 等)
status: str # 测试状态: "completed", "failed", "skipped"
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
intermediate_data: Dict[str, Any] = None # 中间过程数据
final_result: Dict[str, Any] = None # 最终测试结果
error_message: Optional[str] = None # 错误信息
def __post_init__(self):
if self.intermediate_data is None:
self.intermediate_data = {}
if self.final_result is None:
self.final_result = {}
def to_dict(self):
"""转换为字典格式"""
data = asdict(self)
if self.start_time:
data["start_time"] = self.start_time.isoformat()
if self.end_time:
data["end_time"] = self.end_time.isoformat()
return data
@classmethod
def from_dict(cls, data):
"""从字典创建对象"""
if "start_time" in data and data["start_time"]:
data["start_time"] = datetime.datetime.fromisoformat(data["start_time"])
if "end_time" in data and data["end_time"]:
data["end_time"] = datetime.datetime.fromisoformat(data["end_time"])
return cls(**data)
class PQResult:
"""PQ测试结果管理类"""
def __init__(
self, test_type: str = "", test_name: str = "", output_dir: str = "results"
):
"""
初始化PQ测试结果管理器
Args:
test_type: 测试类型 ("screen_module", "sdr_movie", "hdr_movie")
test_name: 测试名称显示
output_dir: 结果输出目录
"""
self.test_id = self._generate_test_id()
self.test_type = test_type
self.test_name = test_name
self.output_dir = output_dir
# 测试基本信息
self.start_time = datetime.datetime.now()
self.end_time = None
self.status = "running" # "running", "completed", "failed", "stopped"
# 测试配置信息
self.test_config = {}
# 测试项结果
self.test_items: Dict[str, TestItemResult] = {}
# 全局测试数据
self.global_data = {
"device_info": {},
"environment_info": {},
"measurement_settings": {},
}
# 确保输出目录存在
self._ensure_output_dir()
# =============================================================================
# 存放当次测试的中间数据,方便调用
# =============================================================================
self.fix_pattern_rgb = None
self.fix_pattern_gray = None
def _generate_test_id(self) -> str:
"""生成唯一的测试ID"""
return datetime.datetime.now().strftime("PQ_%Y%m%d_%H%M%S_%f")
def _ensure_output_dir(self):
pass
def set_test_config(self, config: Dict[str, Any]):
"""设置测试配置信息"""
self.test_config = config.copy()
def set_global_data(
self,
device_info: Dict = None,
environment_info: Dict = None,
measurement_settings: Dict = None,
):
"""设置全局测试数据"""
if device_info:
self.global_data["device_info"].update(device_info)
if environment_info:
self.global_data["environment_info"].update(environment_info)
if measurement_settings:
self.global_data["measurement_settings"].update(measurement_settings)
def add_test_item(self, item_name: str, item_display_name: str) -> TestItemResult:
"""添加测试项"""
test_item = TestItemResult(
item_name=item_name, item_display_name=item_display_name, status="pending"
)
self.test_items[item_name] = test_item
return test_item
def start_test_item(self, item_name: str):
"""开始测试项"""
if item_name in self.test_items:
self.test_items[item_name].status = "running"
self.test_items[item_name].start_time = datetime.datetime.now()
def add_intermediate_data(self, item_name: str, data_key: str, data_value: Any):
"""添加测试项的中间过程数据"""
if item_name in self.test_items:
self.test_items[item_name].intermediate_data[data_key] = data_value
if data_key == "rgb":
self.fix_pattern_rgb = data_value
if data_key == "gray":
self.fix_pattern_gray = data_value
def get_intermediate_data(self, item_name: str, data_key: str) -> Any:
"""
获取测试项的中间过程数据
Args:
item_name: 测试项名称 (如 "gamut", "gamma", "cct", "shared")
data_key: 数据键名 (如 "rgb", "gray", "measurement_points")
Returns:
对应的数据,如果不存在则返回 None
Examples:
>>> pq_result.get_intermediate_data("gamut", "rgb")
[[0.64, 0.33, 100.5, ...], ...]
>>> pq_result.get_intermediate_data("shared", "gray")
[[0.31, 0.33, 50.2, ...], ...]
"""
# 方式1: 从 test_items 中获取
if item_name in self.test_items:
intermediate_data = self.test_items[item_name].intermediate_data
if data_key in intermediate_data:
return intermediate_data[data_key]
# 方式2: 从快捷属性中获取(用于 "shared" 数据)
if item_name == "shared":
if data_key == "rgb" and self.fix_pattern_rgb is not None:
return self.fix_pattern_rgb
if data_key == "gray" and self.fix_pattern_gray is not None:
return self.fix_pattern_gray
# 未找到数据
return None
def has_intermediate_data(self, item_name: str, data_key: str) -> bool:
"""
检查是否存在指定的中间数据
Args:
item_name: 测试项名称
data_key: 数据键名
Returns:
bool: 数据是否存在
"""
return self.get_intermediate_data(item_name, data_key) is not None
def get_all_intermediate_data(self, item_name: str) -> Dict[str, Any]:
"""
获取测试项的所有中间数据
Args:
item_name: 测试项名称
Returns:
包含所有中间数据的字典,如果测试项不存在则返回空字典
"""
if item_name in self.test_items:
return self.test_items[item_name].intermediate_data.copy()
if item_name == "shared":
return {
"rgb": self.fix_pattern_rgb,
"gray": self.fix_pattern_gray,
}
return {}
def clear_intermediate_data(self, item_name: str = None):
"""
清除中间数据
Args:
item_name: 测试项名称,如果为 None 则清除所有中间数据
"""
if item_name is None:
# 清除所有测试项的中间数据
for item in self.test_items.values():
item.intermediate_data.clear()
# 清除快捷属性
self.fix_pattern_rgb = None
self.fix_pattern_gray = None
elif item_name in self.test_items:
# 清除指定测试项的中间数据
self.test_items[item_name].intermediate_data.clear()
def set_test_item_result(
self,
item_name: str,
result_data: Dict[str, Any],
status: str = "completed",
error_message: str = None,
):
"""设置测试项的最终结果"""
if item_name in self.test_items:
self.test_items[item_name].final_result = result_data
self.test_items[item_name].status = status
self.test_items[item_name].end_time = datetime.datetime.now()
if error_message:
self.test_items[item_name].error_message = error_message
def complete_test(self, status: str = "completed"):
"""完成整个测试"""
self.end_time = datetime.datetime.now()
self.status = status
def get_test_summary(self) -> Dict[str, Any]:
"""获取测试摘要信息"""
completed_items = len(
[item for item in self.test_items.values() if item.status == "completed"]
)
failed_items = len(
[item for item in self.test_items.values() if item.status == "failed"]
)
total_items = len(self.test_items)
duration = None
if self.start_time and self.end_time:
duration = (self.end_time - self.start_time).total_seconds()
return {
"test_id": self.test_id,
"test_type": self.test_type,
"test_name": self.test_name,
"status": self.status,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"duration_seconds": duration,
"total_items": total_items,
"completed_items": completed_items,
"failed_items": failed_items,
}
def to_dict(self) -> Dict[str, Any]:
"""转换为完整的字典格式"""
return {
"test_summary": self.get_test_summary(),
"test_config": self.test_config,
"global_data": self.global_data,
"test_items": {
name: item.to_dict() for name, item in self.test_items.items()
},
"export_timestamp": datetime.datetime.now().isoformat(),
"format_version": "1.0",
}
def save_to_file(self, file_path: str) -> bool:
return False
def save_to_json(self, filename: str = None) -> str:
return ""
@classmethod
def load_from_json(cls, file_path: str) -> "PQResult":
"""从JSON文件加载测试结果"""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 创建PQResult实例
test_summary = data.get("test_summary", {})
pq_result = cls(
test_type=test_summary.get("test_type", ""),
test_name=test_summary.get("test_name", ""),
)
# 恢复基本信息
pq_result.test_id = test_summary.get("test_id", pq_result.test_id)
pq_result.status = test_summary.get("status", "unknown")
if test_summary.get("start_time"):
pq_result.start_time = datetime.datetime.fromisoformat(
test_summary["start_time"]
)
if test_summary.get("end_time"):
pq_result.end_time = datetime.datetime.fromisoformat(
test_summary["end_time"]
)
# 恢复配置和全局数据
pq_result.test_config = data.get("test_config", {})
pq_result.global_data = data.get("global_data", {})
# 恢复测试项
test_items_data = data.get("test_items", {})
for item_name, item_data in test_items_data.items():
pq_result.test_items[item_name] = TestItemResult.from_dict(item_data)
return pq_result
def export_item_data(self, item_name: str, export_format: str = "json") -> str:
"""
导出单个测试项的数据
Args:
item_name: 测试项名称
export_format: 导出格式 ("json", "csv")
Returns:
导出文件路径
"""
if item_name not in self.test_items:
raise ValueError(f"测试项 {item_name} 不存在")
item = self.test_items[item_name]
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
if export_format == "json":
filename = f"{self.test_id}_{item_name}_{timestamp}.json"
if self.test_type:
file_path = os.path.join(self.output_dir, self.test_type, filename)
else:
file_path = os.path.join(self.output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(item.to_dict(), f, ensure_ascii=False, indent=2)
elif export_format == "csv":
import csv
filename = f"{self.test_id}_{item_name}_{timestamp}.csv"
if self.test_type:
file_path = os.path.join(self.output_dir, self.test_type, filename)
else:
file_path = os.path.join(self.output_dir, filename)
with open(file_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# 写入基本信息
writer.writerow(["测试项", item.item_display_name])
writer.writerow(["状态", item.status])
writer.writerow(
["开始时间", item.start_time.isoformat() if item.start_time else ""]
)
writer.writerow(
["结束时间", item.end_time.isoformat() if item.end_time else ""]
)
writer.writerow([])
# 写入最终结果数据
writer.writerow(["最终结果"])
for key, value in item.final_result.items():
writer.writerow([key, str(value)])
else:
raise ValueError(f"不支持的导出格式: {export_format}")
return file_path
def get_progress_info(self) -> Dict[str, Any]:
"""获取测试进度信息"""
total_items = len(self.test_items)
completed_items = len(
[
item
for item in self.test_items.values()
if item.status in ["completed", "failed"]
]
)
running_items = len(
[item for item in self.test_items.values() if item.status == "running"]
)
progress_percentage = (
(completed_items / total_items * 100) if total_items > 0 else 0
)
return {
"total_items": total_items,
"completed_items": completed_items,
"running_items": running_items,
"pending_items": total_items - completed_items - running_items,
"progress_percentage": progress_percentage,
"current_status": self.status,
}
class PQResultStore:
"""按 test_type 管理多个 PQResult 实例。
解决两个问题:
1. `self.results` 在测试流程启动前就存在(避免 AttributeError / NoneType 访问)。
2. 支持同时保留多种 test_typescreen_module / sdr_movie / hdr_movie的历史结果
而不是每次启动新测试就把上一次的结果覆盖掉。
使用方式:
# 主类 __init__ 中:
self.results = PQResultStore()
# 启动一次新测试时:
self.results.new("screen_module", "屏模组性能测试")
# 现有调用点(隐式访问当前活跃结果)仍然生效:
self.results.add_intermediate_data("gamut", "rgb", data)
self.results.get_intermediate_data("shared", "gray")
# 跨类型显式访问某次历史结果:
pq = self.results.get("sdr_movie")
if pq is not None:
data = pq.get_intermediate_data("gamut", "rgb")
"""
# __getattr__ 代理时,这些方法若在无活跃结果时被调用,返回 None 而不是报错,
# 以兼容"尝试读取但数据还没来"的场景。
_SAFE_GETTERS = frozenset({
"get_intermediate_data",
"has_intermediate_data",
"get_all_intermediate_data",
"get_test_summary",
"get_progress_info",
"to_dict",
})
def __init__(self):
self._results: Dict[str, PQResult] = {}
self._active_type: Optional[str] = None
# ---------- 显式 API ----------
def new(self, test_type: str, test_name: str) -> PQResult:
"""为指定 test_type 创建新的 PQResult并置为当前活跃。
若该 test_type 已有旧结果,会被替换。
"""
result = PQResult(test_type=test_type, test_name=test_name)
self._results[test_type] = result
self._active_type = test_type
return result
def get(self, test_type: str) -> Optional[PQResult]:
"""按 test_type 取结果;不存在则返回 None。"""
return self._results.get(test_type)
def has(self, test_type: str) -> bool:
"""判断某个 test_type 是否有结果。"""
return test_type in self._results
def set_active(self, test_type: str) -> bool:
"""切换当前活跃结果。成功返回 True。"""
if test_type in self._results:
self._active_type = test_type
return True
return False
def clear(self, test_type: Optional[str] = None) -> None:
"""清空结果。不传参数清全部,否则只清指定 test_type。"""
if test_type is None:
self._results.clear()
self._active_type = None
else:
self._results.pop(test_type, None)
if self._active_type == test_type:
self._active_type = None
@property
def current(self) -> Optional[PQResult]:
"""当前活跃的 PQResult无则返回 None。"""
if self._active_type is None:
return None
return self._results.get(self._active_type)
@property
def current_test_type(self) -> Optional[str]:
return self._active_type
@property
def all_types(self) -> List[str]:
return list(self._results.keys())
# ---------- 兼容性:透明代理到 current ----------
def __bool__(self) -> bool:
"""`if self.results:` 仍可用于判断当前是否有活跃结果。"""
return self.current is not None
def __getattr__(self, name: str) -> Any:
# 只有常规属性查找失败时才会走到这里
if name.startswith("_"):
raise AttributeError(name)
current = self.current
if current is not None:
return getattr(current, name)
# 无活跃结果:读类方法返回空值 stub写类方法抛出清晰错误
if name in self._SAFE_GETTERS:
def _null_getter(*_args, **_kwargs):
return None
return _null_getter
raise AttributeError(
f"PQResultStore 当前没有活跃的 PQResult还未调用 new()"
f"无法访问 '{name}'。如需按 test_type 取历史结果,请用 .get(test_type)。"
)
# 使用示例和工具函数
def create_pq_result_from_config(config: Dict[str, Any]) -> PQResult:
"""根据配置创建PQResult实例"""
test_type = config.get("test_type", "")
test_name = config.get("test_name", "")
pq_result = PQResult(test_type=test_type, test_name=test_name)
pq_result.set_test_config(config)
# 添加测试项
test_items = config.get("test_items", [])
test_items_names = config.get("test_items_chinese", [])
for i, item in enumerate(test_items):
display_name = test_items_names[i] if i < len(test_items_names) else item
pq_result.add_test_item(item, display_name)
return pq_result
if __name__ == "__main__":
# 测试代码
print("PQResult类测试")
# 创建测试实例
pq_result = PQResult("screen_module", "屏模组性能测试")
# 设置配置
config = {
"test_type": "screen_module",
"test_name": "屏模组性能测试",
"test_items": ["gamut", "gamma", "cct"],
"test_items_chinese": ["色域", "Gamma", "色温一致性"],
}
pq_result.set_test_config(config)
# 添加测试项
pq_result.add_test_item("gamut", "色域")
pq_result.add_test_item("gamma", "Gamma")
pq_result.add_test_item("cct", "色温一致性")
# 模拟测试过程
pq_result.start_test_item("gamut")
pq_result.add_intermediate_data(
"gamut", "measurement_points", [[0.64, 0.33], [0.30, 0.60]]
)
pq_result.set_test_item_result("gamut", {"coverage": 95.2, "accuracy": 98.5})
# 完成测试
pq_result.complete_test()
print(f"测试结果已保存到: {pq_result.save_to_json()}")
print("测试摘要:", pq_result.get_test_summary())