123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- # -*- coding:utf-8 -*-
- from ssat_sdk.utils.LoggingUtil import printLog
- from ssat_sdk.pic_tool import ImageCMP
- from ssat_sdk.sat_environment import getSATTmpDIR
- from ssat_sdk.sat_environment import getMenuTree3SelectedProjectCfgPath
- import os, sys, time
- import cv2 as cv
- import json
- class TSourceImpl():
- def __init__(self, ocr, ccard, tvOperator, tfocus, tconfig, uitRunner, \
- ocrDict=[{"lan": "ChinesePRC+English", "type": 4}, {"lan": "ChinesePRC+English", "type": 253}, \
- {"lan": "ChinesePRC+English", "type": 10001}]):
- self.ocr = ocr
- self.ccard = ccard
- self.tvOperator = tvOperator
- self.tFocus = tfocus
- self.tConfig = tconfig
- self.uitRunner = uitRunner
- self.imgCMP = ImageCMP()
- if not self.get_ocr_list():
- self.ocrDict = ocrDict
- # 某数据是否读取过;
- self.got_dict = {}
- self.sourceKeyErrCount = 0
- def get_ocr_list(self):
- if self.tConfig.has_option('First', 'source.ocr'):
- self.ocrDict = self.tConfig.get_dict(self.tConfig.get_value('First', 'source.ocr'))
- return True
- else:
- return False
- # 截图并返回当前截图路径
- def getCurrentUIPath(self):
- current_uiPic = os.path.join(getSATTmpDIR(), "menutree_runpath.png")
- self.ccard.takePicture(current_uiPic)
- return current_uiPic
- def getFocusTextBox2(self, level, root, focus_box):
- icon_path = os.path.join(getMenuTree3SelectedProjectCfgPath(), "icon\\",
- str(root) + "." + str(level) + ".dir.png")
- if icon_path in self.got_dict:
- x, y = focus_box[0] - self.got_dict[icon_path]['ref_box'][0], focus_box[1] - \
- self.got_dict[icon_path]['ref_box'][1]
- return [x, y, x + self.got_dict[icon_path]['width'], y + self.got_dict[icon_path]['height']]
- else:
- if os.path.exists(icon_path) is True:
- # 读取例图,宽高;
- img = cv.imread(icon_path)
- # 在例图中查找轮廓;
- result, box = self.tFocus.findRectByIcon(icon_path, level, root)
- if result is True:
- self.got_dict[icon_path] = {"ref_box": box, "width": img.shape[1], "height": img.shape[0]}
- x, y = focus_box[0] - box[0], focus_box[1] - box[1]
- return [x, y, x + img.shape[1], y + img.shape[0]]
- # endif
- # endif
- # 如果没有字段原样返回;
- return focus_box
- '''
- 检测信源界面是否存在,分成两部分:1 检测焦点框;2 检测文字(可选 checkOCR控制)。
- '''
- def checkSourceView(self, level, first_parent, parent, option, value_for_ocr, isForValue=False, checkOCR=False):
- print "checkSourceView:", level, first_parent, parent, option
- current_uiPic = self.getCurrentUIPath()
- if isForValue:
- isFind, contourRect = self.tFocus.findRectByIcon(current_uiPic, "value", first_parent, option=parent)
- else:
- isFind, contourRect = self.tFocus.findRectByIcon(current_uiPic, level, first_parent, option=option)
- if not isFind or contourRect == []:
- return False
- else:
- if checkOCR is False:
- return True
- else:
- isFind, ocrStr = self.getFocusText(current_uiPic, level, first_parent, parent, option, value_for_ocr)
- return isFind
- def getFocusText(self, current_uiPic, level, first_parent, parent, option, list_str, isSource=False,
- isForValue=False):
- if isForValue:
- isFind, contourRect = self.tFocus.findRectByIcon(current_uiPic, "value", first_parent, option=parent)
- else:
- isFind, contourRect = self.tFocus.findRectByIcon(current_uiPic, level, first_parent, option=option)
- if not isFind or contourRect == []:
- return False, ""
- found, ocr_str = False, ''
- # tmpPic = os.path.join(getSATTmpDIR(), "meuttree_area_text11.png")
- # self.imgCMP.saveCropPic(current_uiPic, tmpPic,
- # (contourRect[0], contourRect[1], contourRect[2], contourRect[3]))
- # 是否有文字方向字段存在在ini里,如果有则转换文本框坐标;
- tmpPic = os.path.join(getSATTmpDIR(), "meuttree_area_text.png")
- contourRect = self.getFocusTextBox2(level, first_parent, contourRect)
- self.imgCMP.saveCropPic(current_uiPic, tmpPic,
- (contourRect[0], contourRect[1], contourRect[2], contourRect[3]))
- # print "self.ocrDict:", self.ocrDict
- # 获取ocr_list;
- ocr_list = self.uitRunner.uitData.UITree.get_ocr_list(level, parent, option, True if parent == option else False)
- print "%s,%s,%s =>ocr_list=%s" % (level, parent, option, str(ocr_list))
- # 遍历ocr类型;
- for item in self.ocrDict:
- # 识别ocr;
- thresholdDict = self.tConfig.getThresholdDict(first_parent)
- ocr_str = self.ocr.getStrWithImgProcess(tmpPic, {}, item["lan"], item["type"], reconTimes=1)
- # print "\n\ngetCurrentFocusTextEx ==========================:OCR-Type=%s, OCR-Text=%s\n\n" % (
- # str(item), str(ocr_str))
- printLog(u"OCR识别的类型字典:%s" % str(item))
- printLog(u"OCR识别出的ocr_str为:%s" % str(ocr_str))
- # print(u"getCurrentFocusTextEx.ocr_str land = %s, type =%d, ocr_str=%s:"%(item["lan"], item["type"], ocr_str.encode('GB18030')))
- ocr_str = unicode(ocr_str).lower()
- if isSource is False:
- parent_ocr_dict = self.uitRunner.uitData.UITree.get_parent_ocr_dict(option)
- print "getCurrentFocusTextEx.parent_ocr_dict:", parent_ocr_dict
- list_parent_ocr_value = list(parent_ocr_dict.keys())
- print "getCurrentFocusTextEx.list_parent_ocr_value:", list_parent_ocr_value
- current_parent_ocr_value = ""
- for parent_ocr_value in list_parent_ocr_value:
- parent_ocr_value = str(parent_ocr_value).lower()
- if parent_ocr_value in ocr_str or parent_ocr_value == ocr_str:
- current_parent_ocr_value = parent_ocr_value
- break
- try:
- curentOption = parent_ocr_dict[current_parent_ocr_value]
- except Exception, e:
- curentOption = ""
- printLog(u"OCR识别出的current_parent_ocr_value为:%s" % str(current_parent_ocr_value))
- printLog(u"OCR识别出的curentOCROption为:%s" % str(curentOption))
- printLog(u"传入的目标option为:%s" % str(option))
- # 通过OCR来反向识别判断当前识别的option是否为传入的option
- if str(curentOption).lower() != "" and str(curentOption).lower() != str(option).lower():
- # print "非焦点框=%s" % (ocr_str)
- printLog(u"非焦点框=%s" % str(ocr_str))
- return False, ocr_str
- # 再遍历目标焦点ocr;
- for std_str in list_str:
- std_str = str(std_str).lower()
- # print 'std_str:', std_str, type(std_str)
- # print 'ocr_str:', ocr_str, type(ocr_str)
- if std_str in ocr_str or std_str == ocr_str:
- found = True
- break
- # endfor
- if found is True:
- break
- else:
- if isSource is True:
- # 如果是信源 而且识别出是非目标信源则跳出该语言
- for ocr_std_str_list in ocr_list:
- # print "ocr_std_str_list:",ocr_std_str_list
- for ocr_std_str in ocr_std_str_list:
- # print "ocr_std_str:",ocr_std_str
- ocr_std_str = str(ocr_std_str).lower()
- # print 'ocr_std_str:', ocr_std_str, type(ocr_std_str)
- # print 'ocr_str:', ocr_str, type(ocr_str)
- if ocr_std_str in ocr_str or ocr_std_str == ocr_str:
- printLog(u"非焦点信源=%s" % str(ocr_str))
- return False, ocr_str
- # endfor
- return found, ocr_str
- # 遍历获取ocr结果;
- def getCurrentFocusTextEx(self, level, first_parent, parent, option, list_str, isSource=False,
- isForValue=False):
- print level, first_parent, parent, option
- current_uiPic = self.getCurrentUIPath()
- if isForValue:
- isFind, contourRect = self.tFocus.findRectByIcon(current_uiPic, "value", first_parent, option=parent)
- else:
- isFind, contourRect = self.tFocus.findRectByIcon(current_uiPic, level, first_parent, option=option)
- # print "getCurrentFocusText isFind:", isFind, contourRect
- printLog(u"获取当前聚焦效果isFind:%s 聚焦区域contourRect:%s" % (str(isFind), str(contourRect)))
- if not isFind or contourRect == []:
- return -1, ""
- else:
- found, ocr_str = False, ''
- # tmpPic = os.path.join(getSATTmpDIR(), "meuttree_area_text11.png")
- # self.imgCMP.saveCropPic(current_uiPic, tmpPic,
- # (contourRect[0], contourRect[1], contourRect[2], contourRect[3]))
- # 是否有文字方向字段存在在ini里,如果有则转换文本框坐标;
- tmpPic = os.path.join(getSATTmpDIR(), "meuttree_area_text.png")
- contourRect = self.getFocusTextBox2(level, first_parent, contourRect)
- self.imgCMP.saveCropPic(current_uiPic, tmpPic,
- (contourRect[0], contourRect[1], contourRect[2], contourRect[3]))
- if isSource:
- self.tvOperator.sendKey("ok")
- # print "self.ocrDict:", self.ocrDict
- # 获取ocr_list;
- ocr_list = self.uitRunner.uitData.UITree.get_ocr_list(level, parent, option, True if parent == option else False)
- print "%s,%s,%s =>ocr_list=%s" % (level, parent, option, str(ocr_list))
- # 遍历ocr类型;
- for item in self.ocrDict:
- # 识别ocr;
- thresholdDict = self.tConfig.getThresholdDict(first_parent)
- ocr_str = self.ocr.getStrWithImgProcess(tmpPic, thresholdDict, item["lan"], item["type"], reconTimes=1)
- # print "\n\ngetCurrentFocusTextEx ==========================:OCR-Type=%s, OCR-Text=%s\n\n" % (
- # str(item), str(ocr_str))
- printLog(u"OCR识别的类型字典:%s" % str(item))
- printLog(u"OCR识别出的ocr_str为:%s" % str(ocr_str))
- # print(u"getCurrentFocusTextEx.ocr_str land = %s, type =%d, ocr_str=%s:"%(item["lan"], item["type"], ocr_str.encode('GB18030')))
- ocr_str = unicode(ocr_str).lower()
- if isSource is False:
- parent_ocr_dict = self.uitRunner.uitData.UITree.get_parent_ocr_dict(option)
- print "getCurrentFocusTextEx.parent_ocr_dict:", parent_ocr_dict
- list_parent_ocr_value = list(parent_ocr_dict.keys())
- print "getCurrentFocusTextEx.list_parent_ocr_value:", list_parent_ocr_value
- current_parent_ocr_value = ""
- for parent_ocr_value in list_parent_ocr_value:
- parent_ocr_value = str(parent_ocr_value).lower()
- if parent_ocr_value in ocr_str or parent_ocr_value == ocr_str:
- current_parent_ocr_value = parent_ocr_value
- break
- try:
- curentOption = parent_ocr_dict[current_parent_ocr_value]
- except Exception, e:
- curentOption = ""
- printLog(u"OCR识别出的current_parent_ocr_value为:%s" % str(current_parent_ocr_value))
- printLog(u"OCR识别出的curentOCROption为:%s" % str(curentOption))
- printLog(u"传入的目标option为:%s" % str(option))
- # 通过OCR来反向识别判断当前识别的option是否为传入的option
- if str(curentOption).lower() != "" and str(curentOption).lower() != str(option).lower():
- # print "非焦点框=%s" % (ocr_str)
- printLog(u"非焦点框=%s" % str(ocr_str))
- return 0, ocr_str
- # 再遍历目标焦点ocr;
- for std_str in list_str:
- std_str = str(std_str).lower()
- # print 'std_str:', std_str, type(std_str)
- # print 'ocr_str:', ocr_str, type(ocr_str)
- if std_str in ocr_str or std_str == ocr_str:
- found = True
- break
- # endfor
- if found is True:
- break
- else:
- if isSource is True:
- # 如果是信源 而且识别出是非目标信源则跳出该语言
- for ocr_std_str_list in ocr_list:
- # print "ocr_std_str_list:",ocr_std_str_list
- for ocr_std_str in ocr_std_str_list:
- # print "ocr_std_str:",ocr_std_str
- ocr_std_str = str(ocr_std_str).lower()
- # print 'ocr_std_str:', ocr_std_str, type(ocr_std_str)
- # print 'ocr_str:', ocr_str, type(ocr_str)
- if ocr_std_str in ocr_str or ocr_std_str == ocr_str:
- printLog(u"非焦点信源=%s" % str(ocr_str))
- return 0, ocr_str
- # endfor
- return int(found), ocr_str
- def checkSource(self, value_for_ocr, option,enter_key, count=0):
- printLog(u"checkSource。开始检测当前信源")
- sourceWaitTime = self.tConfig.getParentWaitTime("source")
- self.tvOperator.sendKey('source', duration=sourceWaitTime)
- isFind, text = self.getCurrentFocusTextEx('First', 'source', 'source', option, value_for_ocr)
- # 未找到焦点
- if isFind == -1:
- isFind, text = self.getCurrentFocusTextEx('First', 'source', 'source', option, value_for_ocr)
- if isFind == -1 and count < 3:
- self.sourceKeyErrCount += 1
- count += 1
- return self.checkSource(value_for_ocr, option, enter_key, count)
- elif isFind == 1:
- self.tvOperator.sendKey(enter_key)
- return True
- else:
- return False
- # 信源不匹配
- elif isFind == 0:
- #画面卡顿处理
- isFind, text = self.getCurrentFocusTextEx('First', 'source', 'source', option, value_for_ocr)
- if isFind == 1:
- self.tvOperator.sendKey(enter_key)
- return True
- else:
- return False
- else:
- self.tvOperator.sendKey(enter_key)
- return True
- def toNextSource(self, moveKey, enterKey, option, value_for_ocr, count=0):
- printLog(u"进入下一个信源.moveKey, enterKey:" + moveKey + enterKey)
- sourceWaitTime = self.tConfig.getParentWaitTime("source")
- self.tvOperator.sendKey('source', duration=sourceWaitTime)
- ret = self.checkSourceView('First', 'source', 'source', option, value_for_ocr)
- # 未弹出source界面
- if ret is False:
- printLog(u"未进入信源界面")
- ret = self.checkSourceView('First', 'source', 'source', option, value_for_ocr)
- if ret is False and count < 3:
- count += 1
- self.tvOperator.sendKeys('exit', "return") # 在某些界面弹不出信源界面
- return self.toNextSource(moveKey, enterKey, option, value_for_ocr, count)
- elif count >= 3:
- return False, ""
- printLog(u"已进入信源界面")
- self.tvOperator.sendKey(moveKey)
- current_uiPic = self.getCurrentUIPath()
- self.tvOperator.sendKey(enterKey)
- isFind, text = self.getFocusText(current_uiPic, 'First', 'source', 'source', option, value_for_ocr,
- isSource=True)
- return isFind, text
- def exitUSB(self):
- self.tvOperator.sendKey("exit", duration=2)
- self.tvOperator.sendKey("return", duration=2)
- def exitSourceView(self, level, first_parent, parent, option, value_for_ocr, isForValue=False):
- print u"退出信源界面"
- count = 0
- while count < 3:
- count += 1
- ret = self.checkSourceView(level, first_parent, parent, option, value_for_ocr, isForValue, checkOCR=True)
- if ret is False:
- break
- self.tvOperator.sendKey("ok", duration=2)
- ret = self.checkSourceView(level, first_parent, parent, option, value_for_ocr, isForValue, checkOCR=True)
- if ret is False:
- break
- self.tvOperator.sendKey("exit", duration=2)
- ret = self.checkSourceView(level, first_parent, parent, option, value_for_ocr, isForValue, checkOCR=True)
- if ret is False:
- break
- self.tvOperator.sendKey("return", duration=2)
- ret = self.checkSourceView(level, first_parent, parent, option, value_for_ocr, isForValue, checkOCR=True)
- if ret is False:
- break
- def setSourceValue(self, option, value, sourceWaitTime=1.0, Max_Try=10):
- printLog(u"开始执行setSourceValue。option:%s value:%s" % (str(option), str(value)))
- # self.get_ocr_list('First', 'source')
- # 获取menu path;
- value_params, path_params = self.uitRunner.uitPathManage.get_menu_paths(option, value)
- # print "value_params:", value_params
- printLog(u"获取设置信源value:%s的值字典value_params:%s" % (str(value), str(value_params)))
- value_for_ocr = value_params['value_for_ocr']
- old_text = "init_old_text"
- enter_key = value_params["enter_key"]
- move_keyArr = value_params["move_key"]
- if enter_key.__len__() < 1 or move_keyArr.__len__() < 2:
- printLog(u"Error:Enter Key or Move Key错误:%s" % (str(enter_key), str(move_keyArr)))
- return False
- # 正向寻找的次数计数和最大次数
- count = 0
- # 读取menutree中的信源数量
- source_list = self.uitRunner.uitData.UITree.getSubOptionList(option)
- printLog(u"setSourceValue.%s source_list:%s" % (option, source_list))
- if source_list.__len__() != 0:
- Max_Try = source_list.__len__()
- isOldSouceCount = 0
- # 反向寻找的次数计数和最大次数
- Reversecount = 0
- Reverse_Max_Try = Max_Try
- sourceWaitTime = self.tConfig.getParentWaitTime("source")
- print "sourceWaitTime:", sourceWaitTime, type(sourceWaitTime)
- # 第一次直接判断是不是目标信源
- ret = self.checkSource(value_for_ocr, option, enter_key)
- printLog(u"第一次判断是否目标信源:" + str(ret))
- if ret is True:
- # 退出信源
- self.exitSourceView('First', 'source', 'source', option, value_for_ocr)
- return True
- if self.sourceKeyErrCount >= 3:
- printLog(u"第一次信源判断,Source界面弹出异常次数>3")
- return False
- # 用于信源切换不适用场景,快速结束脚本
- if self.enableSourceChange() is False:
- printLog(u"设定:不允许切换信源。")
- return False
- while True:
- # 循环遍历控制
- if count < Max_Try and Reversecount < Reverse_Max_Try and isOldSouceCount < 1:
- count += 1
- moveKey = move_keyArr[0]
- elif count >= Max_Try and Reversecount < Reverse_Max_Try and isOldSouceCount < 1:
- Reversecount += 1
- moveKey = move_keyArr[1]
- else:
- break
- # 切入下一个信源
- result, text = self.toNextSource(moveKey, enter_key, option, value_for_ocr)
- if result is True:
- return True
- elif result is False and ("usb" in text.lower() or "media" in text.lower()):
- isOldSouceCount = 0
- self.exitUSB()
- count = Max_Try
- elif result is False and (old_text in text):
- isOldSouceCount += 1
- # 信源与之前信源一样,超过3次调转方向,如果已经调转方向,则返回失败结果
- if isOldSouceCount > 3 and count >= Max_Try:
- Reversecount += Reverse_Max_Try
- isOldSouceCount = 0
- elif isOldSouceCount > 3 and count < Max_Try:
- count = Max_Try
- isOldSouceCount = 0
- # 退出信源
- self.exitSourceView('First', 'source', 'source', option, value_for_ocr)
- return False
- def enableSourceChange(self):
- jsonStr = self.tConfig.get_value("First", "source")
- sourceDict = json.loads(jsonStr)
- if sourceDict.has_key("enable"):
- enable = sourceDict["enable"]
- return enable == 1
- else:
- return True
|