# -*- coding:utf-8 -*- import os import sys import time import re # import getopt import json import chardet import datetime import hashlib # md5验证; import subprocess import urllib2 import threading from ssat_sdk.basessh2 import baseSSH2 from ssat_sdk.tv_operator import TvOperator # 导入测试精灵; from ssat_sdk.device_manage.testWizardClient import TestWizardClient from SATClient import SATClient # 导入http相关库:python2.x中2个一起用,python3.x合并成一个urllib import urllib import urllib2 class SomkePretreatment: def __init__(self, taskId=None): self.taskId = taskId if self.taskId is None: self.__parseCommandLine() # 任务信息; self.taskInfo = {} self.ota_local_name = 'usb_ota.zip' # 上传升级包的位置; self.updateZip = ' /storage/905A-30AB/usb_ota.zip' # 弃用:改由usb切换器切换; # ota路径; self.ota_server_path = "/home/RT2841_2851_dailybuild/DailyBuild_RT2841_%s/" % str(time.strftime("%m%d")) # ota名称; self.ota_server_name = 'signed-ota_rt2841_update.zip' # apk安装ota的命令; self.installCmd = ' shell am start -n com.apps.ota/.ui.MainActivity' # ssh2; self.ssh2 = baseSSH2() # 遥控器; self.redhat3 = TvOperator() # 获取任务信息; self.getTaskInfo() ''' 函数:解析命令行参数 参数: 返回: 注意:argv[0]属于程序本身名称 ''' def __parseCommandLine(self): if sys.argv.__len__() > 1: try: args = json.loads(sys.argv[1].replace("'", '"')) # 解析出taskId; if "TaskId" in args: self.taskId = args["TaskId"] print "TaskId=", self.taskId except Exception, e: print u"解析json失败:%s" % sys.argv[1] # 不使用重定向; def __cmdExecute2(self, cmd): # 定义文件名称; file_name = str(time.time()) # 输出到文件中; cmd = cmd + ' >> ' + file_name print u'执行cmd:', cmd proc = subprocess.Popen(cmd, shell=True) # 等待完成; proc.wait() # 读取文件内容; data = '' with open(file_name, mode="rb") as f: data = f.read() os.remove(file_name) print u"cmd结果:", data return data def __cmdExecute(self, cmd): print u'执行cmd:', cmd proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) # 等待完成; stdout,stderr = proc.communicate() if proc.returncode == 0: print u'执行成功' # 返回执行结果; # print u"__cmdExecute结果:%s" % stdout return stdout def __adbConnect(self): try: # 启用adb的root权限; cmd = "adb connect " + self.taskInfo["deviceSerial"] proc = self.__cmdExecute(cmd) if proc.find("connected to " + self.taskInfo["deviceSerial"] + ":5555") < 0 and proc.find( "already connected to " + self.taskInfo["deviceSerial"] + ":5555") < 0: return False print "adb connect success" return True except Exception, e: print 'adb connect error:', e return False def adbCheck(self): try: cmd = "adb devices" proc = self.__cmdExecute(cmd) if proc.find(self.taskInfo["deviceSerial"] + ":5555\tdevice\r\n") > -1: print "adb connected" return True print "adb didn't connected" return False except Exception,e: return False def __adbDisconnect(self): # 启用adb的root权限; cmd = "adb disconnect " + self.taskInfo["deviceSerial"] proc = self.__cmdExecute(cmd) if proc.find("disconnected " + self.taskInfo["deviceSerial"]) < 0 and proc.find( "error: no such device '" + self.taskInfo["deviceSerial"] + ":5555'") < 0: return False print "adb disConnect success" return True def __adbRoot(self): try: # 启用adb的root权限; cmd = "adb -s " + self.taskInfo["deviceSerial"] + " root" proc = self.__cmdExecute(cmd) if proc.find("adbd is already running as root") < 0 and proc.find( "restarting adbd as root") < 0 and proc != '': return False print "adb root success" return True except Exception, e: print 'adb root error:', e return False def __adbRemount(self): # 启用adb的root权限; cmd = "adb -s " + self.taskInfo["deviceSerial"] + " remount /data" proc = self.__cmdExecute(cmd) if proc.find("adbd is already running as root") < 0 and proc.find("restarting adbd as root") < 0 and proc != '': return False print "adb root success" return True def __adbInstallApk(self, apk_path): if os.path.exists(apk_path): cmd = 'adb -s ' + self.taskInfo['deviceSerial'] + ' install -r -t ' + apk_path proc = self.__cmdExecute(cmd) if (proc.find('Success') >= 0): print "install ota_install.apk success" # 启动app; cmd = 'adb -s ' + self.taskInfo[ 'deviceSerial'] + r' shell am start -n com.apps.ota.myapplication/.DeleteZipActivity' proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).communicate()[ 0] return True return False def __adbUninstallApk(self, apk_name): return False def __requestTaskInfo(self): # 请求服务器; data = {'data':{"taskId" : self.taskId}} msg = {'requestMsg' : json.dumps(data)} try: post_data = urllib.urlencode(msg) url = 'http://10.126.16.60:8580/btc_task_se/ajaxExternalInteractiveManage!getSmokeTaskInfo.action' response = urllib2.urlopen(url, post_data) data = response.read().decode("utf-8") res = json.loads(data) if str(res['responseMsg']['code']) == '00': self.taskInfo = { "host": res['responseMsg']['data']['compileInfo']['serverName'], "hostPort": res['responseMsg']['data']['compileInfo']['serverPort'], "hostUser": res['responseMsg']['data']['compileInfo']['serverAccount'], "hostPwd": res['responseMsg']['data']['compileInfo']['serverPassword'], "deviceSerial": res['responseMsg']['data']['deviceId'] } # 服务器路径; ser_path = res['responseMsg']['data']['compileInfo']['resultPath'] self.ota_server_path = "%sDailyBuild_RT2841_%s/" % (ser_path, str(time.strftime("%m%d"))) print u"taskInfo",self.taskInfo print u"ser_path",ser_path print u"ota_server_path", self.ota_server_path # 返回结果; return True except Exception, e: print u"获取任务信息失败",e # 默认返回False return False # 获取Windows下的USB路径(多个时默认使用第一个识别的) def getWindowsUsbPath(self): disks = self.__cmdExecute("wmic logicaldisk get deviceid, description") charsInfo = chardet.detect(disks) print u'当前编码信息=',charsInfo if charsInfo['encoding'] == "GB2312": disks = disks.decode("GB2312").encode('utf-8') elif charsInfo['encoding'] == "UTF-16": disks = disks.decode("utf-16").encode('utf-8').decode('utf-8-sig') disks = disks.split('\r\n') for disk in disks: if u'可移动磁盘' in disk: return re.search(r'\w:', disk).group() return None # 适用于Android 9以上 def getTVUsbPath(self): # 获取storage下的目录; sdcards = self.__cmdExecute('adb shell ls storage') if sdcards.__len__() == 0: return None # 按换行符分隔; sdcards = sdcards.split('\r\n') # 要过滤的目录; sdcards = list(set(sdcards) ^ set(['emulated','self',''])) if sdcards.__len__() == 0: return None # 返回第一个元素; return sdcards[0] def __AdbPush(self, file, tvdir): data = self.__cmdExecute('adb push %s %s\n'%(file, tvdir)) str = '%s: 1 file pushed.'%file if data.find(str) < 0 or data.find("[100%%] %s" % tvdir) < 0: return False print u'push %s 成功'%file return True def AdbPush(self, file, tvdir, trys=3): if trys <= 0: trys = 1 while trys > 0: trys = trys - 1 if self.__AdbPush(file, tvdir) is True: return True return False def __AdbInstallAPK(self, file): data = self.__cmdExecute('adb install -r %s'%file) if data != "Success\r\n": return False return True def AdbInstallAPK(self, file, trys=3): if trys <= 0: trys = 1 while trys > 0: trys = trys - 1 if self.__AdbInstallAPK(file) is True: return True return False # adb shell pm list packages | findstr /x "" def IsAPKInstalled(self, apk_name): apk_name = 'package:%s'%apk_name data = self.__cmdExecute('adb shell pm list packages | findstr /x "%s"'%apk_name) return True if data.lower() == apk_name.lower() else False ''' 函数:获取任务信息; 参数: 返回: ''' def getTaskInfo(self): # 任务信息; if self.taskId is None or self.taskInfo.__len__() == 0: #测试开关; self.taskId = 5520 self.taskInfo = { "host": "10.201.251.254", "hostPort": 22, "hostUser": "wjf", "hostPwd": "wjf2019", "deviceSerial": '192.168.18.210' } # 请求服务器; if self.__requestTaskInfo() is False: return # 初始化ssh2; self.ssh2.init_ssh2(self.taskInfo['hostUser'], self.taskInfo['hostPwd'], self.taskInfo['host'], self.taskInfo['hostPort']) self.redhat3.sendKey('home',2) self.redhat3.sendKey('home',2) self.__adbConnect() ''' 函数:下载apk 参数: 返回: ''' def downloadApk(self, apk_url, save_path): pass ''' 函数:apk是否已在本地; 参数: 返回: ''' def isApkExist(self, apk_md5, apk_path): local_md5 = '' file_data = [] md5_local = hashlib.md5() if os.path.exists(apk_path): with open(apk_path, mode='rb') as f: while True: data = f.read(8192) if not data: break md5_local.update(data) local_md5 = md5_local.hexdigest() print u"本地MD5=", local_md5 return True if apk_md5 == local_md5 else False def installApk(self, apk_path): # 先连接才能root; if self.__adbConnect() is False: return False # 安装apk if self.__adbInstallApk(apk_path) is False: return False # 断开adb连接; # self.__adbDisconnect() return True def unInstallApk(self, apk_name): # 先连接才能root; if self.__adbConnect() is False: return # 启用adb的root权限; if self.__adbRoot() is False: return if self.__adbUninstallApk(apk_name) is False: return def parseOTAImageName(self): pass def isOTAImageOnServer(self): cmd = "ls %s" % self.ota_server_path status, data = self.ssh2.execute_cmd(cmd) if status is True and data.find(self.ota_server_name) >= 0: return True return False def downloadOTAImage(self): print u"downloadOTAImage start" sftp_path = self.ota_server_path + self.ota_server_name usbSwitch = TestWizardClient() ''' 尝试5次切换U盘 ''' tryCount = 5 switchRet = False while tryCount > 0: if usbSwitch.sendUsbSwitch(1) is True: switchRet = True break time.sleep(2) tryCount = tryCount - 1 if switchRet is False: print u"切换U盘到电脑失败" return False time.sleep(5) # 等待Windows识别U盘。 usbPath = self.getWindowsUsbPath() if usbPath is None: print u"获取U盘盘符失败" return False local_path = usbPath + "\\" + self.ota_local_name return self.ssh2.sftp_download_md5(sftp_path, local_path) def isOTAImageOnLocal(self): pass def uploadOTAImage2TV(self): # 必须先root才能push; if self.__adbRoot() is False: print u"adb root失败" return False ''' 尝试5次切换U盘 ''' tryCount = 5 switchRet = False usbSwitch = TestWizardClient() while tryCount > 0: if usbSwitch.sendUsbSwitch(0) is True: switchRet = True break time.sleep(2) tryCount = tryCount - 1 if switchRet is False: print u"切换U盘到TV失败" return False return True def reomveOTAImageOnTV(self): pass def installOTAImage(self): print u"installOTAImage start" try: self.redhat3.sendKey("home") time.sleep(1.5) cmd = 'adb -s ' + self.taskInfo['deviceSerial'] + self.installCmd proc = self.__cmdExecute(cmd) print proc self.redhat3.sendKey("down") time.sleep(1.5) self.redhat3.sendKey("ok") time.sleep(1.5) self.redhat3.sendKey("ok") # 等待60秒启动安装; time.sleep(60) except Exception, e: print u'安装失败=', e def checkOTAInstall(self): try: # 先connect,再devices cmd = 'adb connect ' + self.taskInfo['deviceSerial'] proc = self.__cmdExecute(cmd) if proc.find('connected') >= 0: cmd = 'adb devices' proc = self.__cmdExecute(cmd) if proc.find(self.taskInfo['deviceSerial'] + ':5555\tdevice') >= 0: print "device connect success" print "OTA update success!" return True time.sleep(10) return False except Exception, e: print u'检测失败=', e def initAtx(self): # 重启后,重连adb; client = SATClient() if client.send_add_device(self.taskInfo['deviceSerial']) is False: print u"重连adb设备失败",self.taskInfo['deviceSerial'] self.__adbConnect() # 等待10; print "等待30秒,让TV完全启动。" time.sleep(30) # os.system(r'D:\SAT\tools\atx-init\atx-init.bat') # push 文件; if self.AdbPush(r'D:\SAT\tools\atx-init\atx-agent', '/data/local/tmp') is False: return False if self.AdbPush(r'D:\SAT\tools\atx-init\minicap', '/data/local/tmp') is False: return False if self.AdbPush(r'D:\SAT\tools\atx-init\minicap.so', '/data/local/tmp') is False: return False if self.AdbPush(r'D:\SAT\tools\atx-init\minitouch', '/data/local/tmp') is False: return False self.__cmdExecute('adb shell chmod 755 /data/local/tmp/atx-agent') self.__cmdExecute('adb shell chmod 755 /data/local/tmp/minicap') self.__cmdExecute('adb shell chmod 755 /data/local/tmp/minicap.so') self.__cmdExecute('adb shell chmod 755 /data/local/tmp/minitouch') if self.IsAPKInstalled('com.github.uiautomator') is True: print u'卸载:uiautomator' self.__cmdExecute('adb uninstall com.github.uiautomator') else: print 'uiautomator false' if self.IsAPKInstalled('com.github.uiautomator.test') is True: print u'卸载:uiautomator.test' self.__cmdExecute('adb uninstall com.github.uiautomator.test') else: print 'uiautomator.test false' # 安装apk if self.AdbInstallAPK(r'D:\SAT\tools\atx-init\app-uiautomator.apk') is False: return False if self.AdbInstallAPK(r'D:\SAT\tools\atx-init\app-uiautomator-test.apk') is False: return False if self.AdbInstallAPK(r'D:\SAT\tools\atx-init\exoplayer-demo-2.8.4.apk') is False: return False def turnOnTV(self): tryCount = 5 while tryCount > 0: if self.adbCheck() is True: print u'开机成功' return True # 按下Power键; self.redhat3.sendKey("POWER") # 等待60秒; time.sleep(60) tryCount = tryCount -1 # 尝试连接; # self.__adbConnect() print u'开机失败' return False if __name__ == "__main__": smp = SomkePretreatment() smp.getTaskInfo() smp.adbCheck() # print smp.getWindowsUsbPath() # if smp.isOTAImageOnServer(): # if smp.downloadOTAImage(): # print u'下载完成' # else: # print u'下载失败' # smp.taskInfo = {"deviceSerial":'192.168.1.101'} # #if smp.installApk('F:\SAT\SAT_Runner\ota_apk\RT2841\ota_install.apk') is True: # #if smp.uploadOTAImage2TV('F:\signed-ota_rt2841_update.zip') is True: # smp.installOTAImage() # time.sleep(30) # upStatus = False # for i in range(30): # if smp.checkOTAInstall() is True: # upStatus = True # break # if upStatus is False: # print "OTA update time out 15min"