| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 | # -*- coding:utf-8 -*-import osimport sysimport timeimport re# import getoptimport jsonimport chardetimport datetimeimport hashlib  # md5验证;import subprocessimport urllib2import threadingfrom ssat_sdk.basessh2 import baseSSH2from ssat_sdk.tv_operator import TvOperator# 导入测试精灵;from ssat_sdk.device_manage.testWizardClient import TestWizardClientfrom SATClient import SATClient# 导入http相关库:python2.x中2个一起用,python3.x合并成一个urllibimport urllibimport urllib2class SomkePretreatment:    def __init__(self, taskId=None):        self.taskId = taskId        if self.taskId is None:            self.__parseCommandLine()        # 任务信息;        self.taskInfo = {}        self.ota_local_name = 'usb_ota.zip'        # 上传升级包的位置;        self.updateZip = ' /storage/905A-30AB/usb_ota.zip' # 弃用:改由usb切换器切换;        # ota路径;        self.ota_server_path = "/home/RT2841_2851_dailybuild/DailyBuild_RT2841_%s/" % str(time.strftime("%m%d"))        # ota名称;        self.ota_server_name = 'signed-ota_rt2841_update.zip'        # apk安装ota的命令;        self.installCmd = ' shell am start -n com.apps.ota/.ui.MainActivity'        # ssh2;        self.ssh2 = baseSSH2()        # 遥控器;        self.redhat3 = TvOperator()        # 获取任务信息;        self.getTaskInfo()    '''    函数:解析命令行参数    参数:    返回:    注意:argv[0]属于程序本身名称    '''    def __parseCommandLine(self):        if sys.argv.__len__() > 1:            try:                args = json.loads(sys.argv[1].replace("'", '"'))                # 解析出taskId;                if "TaskId" in args:                    self.taskId = args["TaskId"]                print "TaskId=", self.taskId            except Exception, e:                print u"解析json失败:%s" % sys.argv[1]    # 不使用重定向;    def __cmdExecute2(self, cmd):        # 定义文件名称;        file_name = str(time.time())        # 输出到文件中;        cmd = cmd + ' >> ' + file_name        print u'执行cmd:', cmd        proc = subprocess.Popen(cmd, shell=True)        # 等待完成;        proc.wait()        # 读取文件内容;        data = ''        with open(file_name, mode="rb") as f:            data = f.read()        os.remove(file_name)        print u"cmd结果:", data        return data    def __cmdExecute(self, cmd):        print u'执行cmd:', cmd        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)        # 等待完成;        stdout,stderr = proc.communicate()        if proc.returncode == 0:            print u'执行成功'        # 返回执行结果;        # print u"__cmdExecute结果:%s" % stdout        return stdout    def __adbConnect(self):        try:            # 启用adb的root权限;            cmd = "adb connect " + self.taskInfo["deviceSerial"]            proc = self.__cmdExecute(cmd)            if proc.find("connected to " + self.taskInfo["deviceSerial"] + ":5555") < 0 and proc.find(                    "already connected to " + self.taskInfo["deviceSerial"] + ":5555") < 0:                return False            print "adb connect success"            return True        except Exception, e:            print 'adb connect error:', e            return False    def adbCheck(self):        try:            cmd = "adb devices"            proc = self.__cmdExecute(cmd)            if proc.find(self.taskInfo["deviceSerial"] + ":5555\tdevice\r\n") > -1:                print "adb connected"                return True            print "adb didn't connected"            return False        except Exception,e:            return False    def __adbDisconnect(self):        # 启用adb的root权限;        cmd = "adb disconnect " + self.taskInfo["deviceSerial"]        proc = self.__cmdExecute(cmd)        if proc.find("disconnected " + self.taskInfo["deviceSerial"]) < 0 and proc.find(                "error: no such device '" + self.taskInfo["deviceSerial"] + ":5555'") < 0:            return False        print "adb disConnect success"        return True    def __adbRoot(self):        try:            # 启用adb的root权限;            cmd = "adb -s " + self.taskInfo["deviceSerial"] + " root"            proc = self.__cmdExecute(cmd)            if proc.find("adbd is already running as root") < 0 and proc.find(                    "restarting adbd as root") < 0 and proc != '':                return False            print "adb root success"            return True        except Exception, e:            print 'adb root error:', e            return False    def __adbRemount(self):        # 启用adb的root权限;        cmd = "adb -s " + self.taskInfo["deviceSerial"] + " remount /data"        proc = self.__cmdExecute(cmd)        if proc.find("adbd is already running as root") < 0 and proc.find("restarting adbd as root") < 0 and proc != '':            return False        print "adb root success"        return True    def __adbInstallApk(self, apk_path):        if os.path.exists(apk_path):            cmd = 'adb -s ' + self.taskInfo['deviceSerial'] + ' install -r -t ' + apk_path            proc = self.__cmdExecute(cmd)            if (proc.find('Success') >= 0):                print "install ota_install.apk success"                # 启动app;                cmd = 'adb -s ' + self.taskInfo[                    'deviceSerial'] + r' shell am start -n com.apps.ota.myapplication/.DeleteZipActivity'                proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).communicate()[                    0]                return True        return False    def __adbUninstallApk(self, apk_name):        return False        def __requestTaskInfo(self):        # 请求服务器;        data = {'data':{"taskId" : self.taskId}}        msg = {'requestMsg' : json.dumps(data)}        try:            post_data = urllib.urlencode(msg)            url = 'http://10.126.16.60:8580/btc_task_se/ajaxExternalInteractiveManage!getSmokeTaskInfo.action'            response = urllib2.urlopen(url, post_data)            data = response.read().decode("utf-8")            res = json.loads(data)            if str(res['responseMsg']['code']) == '00':                self.taskInfo = {                    "host": res['responseMsg']['data']['compileInfo']['serverName'],                    "hostPort": res['responseMsg']['data']['compileInfo']['serverPort'],                    "hostUser": res['responseMsg']['data']['compileInfo']['serverAccount'],                    "hostPwd": res['responseMsg']['data']['compileInfo']['serverPassword'],                    "deviceSerial": res['responseMsg']['data']['deviceId']                }                # 服务器路径;                ser_path = res['responseMsg']['data']['compileInfo']['resultPath']                self.ota_server_path = "%sDailyBuild_RT2841_%s/" % (ser_path, str(time.strftime("%m%d")))                print u"taskInfo",self.taskInfo                print u"ser_path",ser_path                print u"ota_server_path", self.ota_server_path                # 返回结果;                return True        except Exception, e:            print u"获取任务信息失败",e        # 默认返回False        return False     # 获取Windows下的USB路径(多个时默认使用第一个识别的)    def getWindowsUsbPath(self):        disks = self.__cmdExecute("wmic logicaldisk get deviceid, description")        charsInfo = chardet.detect(disks)        print u'当前编码信息=',charsInfo        if charsInfo['encoding'] == "GB2312":            disks = disks.decode("GB2312").encode('utf-8')        elif charsInfo['encoding'] == "UTF-16":            disks = disks.decode("utf-16").encode('utf-8').decode('utf-8-sig')        disks = disks.split('\r\n')        for disk in disks:            if u'可移动磁盘' in disk:                return re.search(r'\w:', disk).group()        return None    # 适用于Android 9以上    def getTVUsbPath(self):        # 获取storage下的目录;        sdcards = self.__cmdExecute('adb shell ls storage')        if sdcards.__len__() == 0:            return None        # 按换行符分隔;        sdcards = sdcards.split('\r\n')        # 要过滤的目录;        sdcards = list(set(sdcards) ^ set(['emulated','self','']))        if sdcards.__len__() == 0:            return None        # 返回第一个元素;        return sdcards[0]    def __AdbPush(self, file, tvdir):        data = self.__cmdExecute('adb push %s %s\n'%(file, tvdir))        str = '%s: 1 file pushed.'%file        if data.find(str) < 0 or data.find("[100%%] %s" % tvdir) < 0:            return False        print u'push %s 成功'%file        return True    def AdbPush(self, file, tvdir, trys=3):        if trys <= 0:            trys = 1                while trys > 0:            trys = trys - 1            if self.__AdbPush(file, tvdir) is True:                return True        return False        def __AdbInstallAPK(self, file):        data = self.__cmdExecute('adb install -r %s'%file)        if data != "Success\r\n":            return False        return True        def AdbInstallAPK(self, file, trys=3):        if trys <= 0:            trys = 1                while trys > 0:            trys = trys - 1            if self.__AdbInstallAPK(file) is True:                return True        return False    # adb shell pm list packages | findstr /x "<package name>"    def IsAPKInstalled(self, apk_name):        apk_name = 'package:%s'%apk_name        data = self.__cmdExecute('adb shell pm list packages | findstr /x "%s"'%apk_name)        return True if data.lower() == apk_name.lower() else False        '''    函数:获取任务信息;    参数:    返回:    '''    def getTaskInfo(self):        # 任务信息;        if self.taskId is None or self.taskInfo.__len__() == 0:  #测试开关;            self.taskId = 5520            self.taskInfo = {                "host": "10.201.251.254",                "hostPort": 22,                "hostUser": "wjf",                "hostPwd": "wjf2019",                "deviceSerial": '192.168.18.210'            }        # 请求服务器;        if self.__requestTaskInfo() is False:            return        # 初始化ssh2;        self.ssh2.init_ssh2(self.taskInfo['hostUser'], self.taskInfo['hostPwd'], self.taskInfo['host'], self.taskInfo['hostPort'])        self.redhat3.sendKey('home',2)        self.redhat3.sendKey('home',2)        self.__adbConnect()    '''    函数:下载apk    参数:    返回:    '''    def downloadApk(self, apk_url, save_path):        pass    '''    函数:apk是否已在本地;    参数:    返回:    '''    def isApkExist(self, apk_md5, apk_path):        local_md5 = ''        file_data = []        md5_local = hashlib.md5()        if os.path.exists(apk_path):            with open(apk_path, mode='rb') as f:                while True:                    data = f.read(8192)                    if not data:                        break                    md5_local.update(data)            local_md5 = md5_local.hexdigest()            print u"本地MD5=", local_md5        return True if apk_md5 == local_md5 else False    def installApk(self, apk_path):        # 先连接才能root;        if self.__adbConnect() is False:            return False        # 安装apk        if self.__adbInstallApk(apk_path) is False:            return False        # 断开adb连接;        # self.__adbDisconnect()        return True    def unInstallApk(self, apk_name):        # 先连接才能root;        if self.__adbConnect() is False:            return            # 启用adb的root权限;        if self.__adbRoot() is False:            return        if self.__adbUninstallApk(apk_name) is False:            return    def parseOTAImageName(self):        pass    def isOTAImageOnServer(self):        cmd = "ls %s" % self.ota_server_path        status, data = self.ssh2.execute_cmd(cmd)        if status is True and data.find(self.ota_server_name) >= 0:            return True        return False    def downloadOTAImage(self):        print u"downloadOTAImage start"        sftp_path = self.ota_server_path + self.ota_server_name        usbSwitch = TestWizardClient()        '''        尝试5次切换U盘        '''        tryCount = 5        switchRet = False        while tryCount > 0:            if usbSwitch.sendUsbSwitch(1) is True:                switchRet = True                break            time.sleep(2)            tryCount = tryCount - 1        if switchRet is False:            print u"切换U盘到电脑失败"            return False        time.sleep(5) # 等待Windows识别U盘。        usbPath = self.getWindowsUsbPath()        if usbPath is None:            print u"获取U盘盘符失败"            return False        local_path = usbPath + "\\" + self.ota_local_name        return self.ssh2.sftp_download_md5(sftp_path, local_path)    def isOTAImageOnLocal(self):        pass    def uploadOTAImage2TV(self):        # 必须先root才能push;        if self.__adbRoot() is False:            print u"adb root失败"            return False        '''        尝试5次切换U盘        '''        tryCount = 5        switchRet = False        usbSwitch = TestWizardClient()        while tryCount > 0:            if usbSwitch.sendUsbSwitch(0) is True:                switchRet = True                break            time.sleep(2)            tryCount = tryCount - 1        if switchRet is False:            print u"切换U盘到TV失败"            return False                return True    def reomveOTAImageOnTV(self):        pass    def installOTAImage(self):        print u"installOTAImage start"        try:            self.redhat3.sendKey("home")            time.sleep(1.5)            cmd = 'adb -s ' + self.taskInfo['deviceSerial'] + self.installCmd            proc = self.__cmdExecute(cmd)            print proc            self.redhat3.sendKey("down")            time.sleep(1.5)            self.redhat3.sendKey("ok")            time.sleep(1.5)            self.redhat3.sendKey("ok")            # 等待60秒启动安装;            time.sleep(60)        except Exception, e:            print u'安装失败=', e    def checkOTAInstall(self):        try:            # 先connect,再devices            cmd = 'adb connect ' + self.taskInfo['deviceSerial']            proc = self.__cmdExecute(cmd)            if proc.find('connected') >= 0:                cmd = 'adb devices'                proc = self.__cmdExecute(cmd)                if proc.find(self.taskInfo['deviceSerial'] + ':5555\tdevice') >= 0:                    print "device connect success"                    print "OTA update success!"                    return True            time.sleep(10)            return False        except Exception, e:            print u'检测失败=', e    def initAtx(self):        # 重启后,重连adb;        client = SATClient()        if client.send_add_device(self.taskInfo['deviceSerial']) is False:            print u"重连adb设备失败",self.taskInfo['deviceSerial']            self.__adbConnect()        # 等待10;        print "等待30秒,让TV完全启动。"        time.sleep(30)        # os.system(r'D:\SAT\tools\atx-init\atx-init.bat')        # push 文件;        if self.AdbPush(r'D:\SAT\tools\atx-init\atx-agent', '/data/local/tmp') is False:            return False        if self.AdbPush(r'D:\SAT\tools\atx-init\minicap', '/data/local/tmp') is False:            return False        if self.AdbPush(r'D:\SAT\tools\atx-init\minicap.so', '/data/local/tmp') is False:            return False        if self.AdbPush(r'D:\SAT\tools\atx-init\minitouch', '/data/local/tmp') is False:            return False        self.__cmdExecute('adb shell chmod 755 /data/local/tmp/atx-agent')        self.__cmdExecute('adb shell chmod 755 /data/local/tmp/minicap')        self.__cmdExecute('adb shell chmod 755 /data/local/tmp/minicap.so')        self.__cmdExecute('adb shell chmod 755 /data/local/tmp/minitouch')        if self.IsAPKInstalled('com.github.uiautomator') is True:            print u'卸载:uiautomator'            self.__cmdExecute('adb uninstall com.github.uiautomator')        else:            print 'uiautomator false'        if self.IsAPKInstalled('com.github.uiautomator.test') is True:            print u'卸载:uiautomator.test'            self.__cmdExecute('adb uninstall com.github.uiautomator.test')        else:            print 'uiautomator.test false'                # 安装apk        if self.AdbInstallAPK(r'D:\SAT\tools\atx-init\app-uiautomator.apk') is False:            return False        if self.AdbInstallAPK(r'D:\SAT\tools\atx-init\app-uiautomator-test.apk') is False:            return False        if self.AdbInstallAPK(r'D:\SAT\tools\atx-init\exoplayer-demo-2.8.4.apk') is False:            return False    def turnOnTV(self):        tryCount = 5        while tryCount > 0:             if self.adbCheck() is True:                print u'开机成功'                return True                        # 按下Power键;            self.redhat3.sendKey("POWER")            # 等待60秒;            time.sleep(60)            tryCount = tryCount -1            # 尝试连接;            # self.__adbConnect()                print u'开机失败'        return Falseif __name__ == "__main__":    smp = SomkePretreatment()    smp.getTaskInfo()    smp.adbCheck()    # print smp.getWindowsUsbPath()    # if smp.isOTAImageOnServer():    #     if smp.downloadOTAImage():    #         print u'下载完成'    #     else:    #         print u'下载失败'    # smp.taskInfo = {"deviceSerial":'192.168.1.101'}    # #if smp.installApk('F:\SAT\SAT_Runner\ota_apk\RT2841\ota_install.apk') is True:    # #if smp.uploadOTAImage2TV('F:\signed-ota_rt2841_update.zip') is True:    # smp.installOTAImage()    # time.sleep(30)    # upStatus = False    # for i in range(30):    #     if smp.checkOTAInstall() is True:    #         upStatus = True    #         break    # if upStatus is False:    #     print "OTA update time out 15min"
 |