Browse Source

同步sdk.

sat23 4 years ago
parent
commit
a62134aa96

+ 14 - 0
ssat_sdk/basessh2.py

@@ -155,6 +155,8 @@ class baseSSH2:
             print 'sftp_md5值空'
             return False
         
+        # 删除本地文件;
+        os.remove(local_path)
         print "SFTP MD5=", sftp_md5
         # 下载文件后计算;
         if self.sftp_download(ftp_path, local_path) is True:
@@ -177,6 +179,18 @@ class baseSSH2:
         print u'下载文件失败'
         return False
 
+def API_sftp_download(host, user, pwd, ftp_path, ftp_file, local_path, local_file):
+    myssh2 = baseSSH2()
+    myssh2.init_ssh2(user, pwd, host)
+    # 判断ftp上的文件是否存在;
+    cmd = "ls %s" % ftp_path
+    status, data = myssh2.execute_cmd(cmd)
+    if status is True and data.find(ftp_file) >= 0:
+        # 下载后验证md5是否正确;
+        return myssh2.sftp_download_md5(ftp_path + ftp_file, local_path + local_file)
+    return False
+        
+
 
 if __name__ == "__main__":
     host = "10.201.251.254"

+ 127 - 0
ssat_sdk/buildprop.py

@@ -0,0 +1,127 @@
+# -*- coding:utf-8 -*-
+import os
+import re # 正则表达式;
+import sys
+import time
+import datetime
+import shutil
+from ssat_sdk.basessh2 import baseSSH2
+
+cts_ssh2 = baseSSH2()
+
+def Init(host, user, pwd):
+    cts_ssh2.init_ssh2(user, pwd, host)
+
+
+def get_props(path, prefix):
+    cmd = "grep -rn 'ro.product.model\|ro.product.brand\|ro.product.name\|ro.build.flavor\|ro.build.date\|ro.build.fingerprint\|ro.build.date.utc\|ro.build.description\|ro.build.version.incremental' %s*/build.prop" % path
+    bolean, data = cts_ssh2.execute_cmd(cmd)
+    # 按换行分组;
+    props = data.split('\n')
+    # 正则表达式;
+    p = re.compile(r"(\w*)_(.*)_(\w*)/build.prop:(\d+):(.*)=(.*)", re.DOTALL)
+    ps = re.compile(r"(\d+):(.*)=(.*)", re.DOTALL) # 短正则;
+    # list
+    brand_props = {}
+    for prop in props:
+        # 减去多余字符串;
+        print u"完整属性:",prop
+        prop = prop.replace(path + prefix, "")
+        
+        if prop.__len__() == 0:
+            continue
+    
+        prop_group = {}
+        # 版本号_品牌名_日期/build.prop:行号:属性=值
+        mo = p.search(prop)
+        # 如果目录下只有一个品牌;
+        if mo is None:
+            print u"长正则失败,使用短正则(目录只有一文件夹):", prop
+            mo = ps.search(prop)
+            # 统一命名为none
+            prop_group['brand'] = 'none'
+            prop_group['version'] = 'none'
+            prop_group['date'] = 'none'
+            prop_group['property'] = {'name':mo.group(2), 'value':mo.group(3)}
+            if prop_group['property']['value'].find("test-keys"):
+                prop_group['property']['value'] = prop_group['property']['value'].replace("test-keys", "release-keys")
+
+            # 字典数组;
+            if 'none' in brand_props:
+                brand_props['none']['property'].append(prop_group['property'])
+            else:
+                brand_props['none'] = {}
+                brand_props['none']['property'] = []
+                brand_props['none']['property'].append(prop_group['property'])
+
+            continue
+
+        prop_group['brand'] = mo.group(2)
+        prop_group['version'] = mo.group(1)
+        prop_group['date'] = mo.group(3)
+        prop_group['property'] = {'name':mo.group(5), 'value':mo.group(6)}
+        if prop_group['property']['value'].find("test-keys"):
+            prop_group['property']['value'] = prop_group['property']['value'].replace("test-keys", "release-keys")
+
+        # 字典数组;
+        if mo.group(2) in brand_props:
+            brand_props[mo.group(2)]['property'].append(prop_group['property'])
+        else:
+            brand_props[mo.group(2)] = {}
+            brand_props[mo.group(2)]['property'] = []
+            brand_props[mo.group(2)]['property'].append(prop_group['property'])
+
+    # 替换none;
+    if 'none' in brand_props:
+        for item in brand_props['none']['property']:
+            if item['name'] == 'ro.product.brand':
+                brand_props[item['value']] = brand_props['none']
+                del brand_props['none']
+                break
+
+    return brand_props
+
+
+def save_props(brand_props, save_path):
+    # 如果保存路径不存在,创建;
+    if not os.path.exists(save_path):
+        os.makedirs(save_path)
+        
+    # 遍历字典;
+    prop_file = save_path
+    for brand in brand_props:
+        prop_file = "%s\\%s.prop" % (save_path, brand.lower())
+        print u"prop文件:",prop_file
+        with open(prop_file, 'ab+') as f:
+            for prop in brand_props[brand]['property']:
+                line = "%s=%s\n" % (prop['name'], prop['value'])
+                f.write(line)     
+
+
+if __name__ == "__main__":
+    print u"build-prop"
+    Init("10.118.1.89", "your name", "your password")
+
+    if 1: # 41目录
+        prefix = "V8-T841T01-LF1V"
+        save_path = r"F:\Q3-buildprop\41"
+        if os.path.exists(save_path):
+            shutil.rmtree(save_path)
+
+        brand_props = get_props(r"/home/ftpdata/SoftwareRelease/TEST/RT2841/CTS认证/RT2841P送测资料/SPL20200705/Approved/", prefix)
+        save_props(brand_props, save_path)
+
+        brand_props = get_props(r"/home/ftpdata/SoftwareRelease/TEST/RT2841/CTS认证/RT2841P送测资料/SPL20200705/WITH_HBBTV/Approved/", prefix)
+        save_props(brand_props, save_path)
+
+    if 1: # 51目录
+        prefix = "V8-T851T01-LF1V"
+        save_path = r"F:\Q3-buildprop\51"
+        if os.path.exists(save_path):
+            shutil.rmtree(save_path)
+
+        brand_props = get_props(r"/home/ftpdata/SoftwareRelease/TEST/RT2851/CTS认证/RT2851P送测资料/SPL20200705/Approved/", prefix)
+        save_props(brand_props, save_path)
+
+        brand_props = get_props(r"/home/ftpdata/SoftwareRelease/TEST/RT2851/CTS认证/RT2851P送测资料/SPL20200705/WITH_HBBTV/Approved/", prefix)
+        save_props(brand_props, save_path)

+ 2 - 2
ssat_sdk/client/ota_client.py

@@ -22,10 +22,10 @@ SAT_TYPE = "1"
 
 
 
-class OtaClient():
+class Ota_Client():
     def __init__(self):
         # 各服务器getParam接口
-        self.getParamDict = getOtaParamUrl_dict()
+        # self.getParamDict = getOtaParamUrl_dict()
 
         self.getParamDict = {
             "CN": "https://cn.ota.qhmoka.com/ota-services/strategy/getParam",

+ 6 - 0
ssat_sdk/debug_serial.py

@@ -24,9 +24,15 @@ class DebugSerial():
     :return 无
     '''
     def excuteCmd(self, command):
+        self.serialTool.sendCommand(command)
+        # ret = self.serialTool.readData(1024)
+        # print "excuteCmd, return:",ret
+
+    def excuteCmdlog(self, command):
         self.serialTool.sendCommand(command)
         ret = self.serialTool.readData(1024)
         print "excuteCmd, return:",ret
+        return ret
 
     '''
     另起一个线程,在终端显示日志。

+ 56 - 34
ssat_sdk/device_manage/testWizardClient.py

@@ -70,15 +70,15 @@ class TestWizardClient():
             recvBytes = bytearray(self.sock.recv(1024))
             data = self.parseRetData(recvBytes, phead.itemsize)
             if int(data) == 1:
-                print "Info:__send %s: %s success!"%(cmdType, command)
+                print "Info:__send %s: %s success!" % (cmdType, command)
                 return True
-            print "Error:__send %s: %s fail!"%(cmdType, command)
+            print "Error:__send %s: %s fail!" % (cmdType, command)
         except Exception, e:
             print "__send " + cmdType + "=> recv Error:", e
             return False
 
     def parseRetData(self, recvByteArray, headSize):
-        version = recvByteArray[0]&0xff
+        version = recvByteArray[0] & 0xff
         recvLen = self.byte4ToInt(recvByteArray, 1)
         if version == 0xAC and recvLen == recvByteArray.__len__():
             byteArr = bytes(recvByteArray)
@@ -86,20 +86,20 @@ class TestWizardClient():
             return data
         return "0"
 
-    def byte4ToInt(self,bytearray, start):
+    def byte4ToInt(self, bytearray, start):
         i = (bytearray[start] & 0xff) \
-            | (bytearray[start+1] & 0xff) << 8 \
-            | (bytearray[start+2] & 0xff) << 16\
-            | (bytearray[start+3] & 0xff) << 24
+            | (bytearray[start + 1] & 0xff) << 8 \
+            | (bytearray[start + 2] & 0xff) << 16 \
+            | (bytearray[start + 3] & 0xff) << 24
         return i
 
     # 发送消息;
     def sendKey(self, keyname, times=1, duration=1):
         while times > 0:
             times -= 1
-            self.__sendkey(keyname)
+            self.__sendCmd("ir", keyname)
             time.sleep(duration)
-    
+
     # 发送多个按键;
     def sendKeys(self, keyNames, duration):
         for key in keyNames:
@@ -110,22 +110,35 @@ class TestWizardClient():
         if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
             cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
             ret = self.__sendCmd("cmd", cmdStr)
-            time.sleep(1)
+            # time.sleep(1)
             return ret
         else:
             return False
 
     '''
-    电源开关,所有电源同时控制,不区分LAN口
-    :param onOff: ON代表开,OFF代表关,区分大小写。
-    :return True代表成功,False达标失败
+    函数:电源开关,如果lanNum=-1设置所有LAN口;
+    参数:
+        onOff:字符串,只能是ON或OFF
+        lanNum:整型或None
+    返回:设置成功返回True
     '''
-    def setAllPower(self, onOff):
+
+    def setPower(self, onOff, lanNum=-1):
+        cmdStr = None
         if onOff <> "ON" and onOff <> "OFF":
             return False
-        cmdStr = "*SET " + onOff
+        if isinstance(lanNum, int) == False:
+            return False
+
+        if lanNum <= -1:  # 认为负数都是全控制;
+            cmdStr = "*SET POWER " + onOff
+        elif lanNum > 0:  # LAN0不是电源控制,排除掉;
+            cmdStr = "*SET LAN %d POWER %s" % (lanNum, onOff)
+
+        if cmdStr is None:
+            return False
         ret = self.__sendCmd("cmd", cmdStr)
-        time.sleep(1)
+        # time.sleep(1)
         return ret
 
     def sendUsbSwitch(self, index):
@@ -155,7 +168,6 @@ class TestWizardClient():
     def getCurrentDeviceName(self):
         pass
 
-
     '''断开连接'''
 
     def disconnect(self):
@@ -166,21 +178,31 @@ class TestWizardClient():
             print "TestWizardClient=> socket disconnect error:", e
 
 
+# 单例模块;
+tw_singleton = TestWizardClient()
+print u"tw_singleton地址", tw_singleton
+
+
+def API_UsbSwitch(index):
+    return tw_singleton.sendUsbSwitch(index)
+
+
+def API_SetPower(onOff, lanNum):
+    return tw_singleton.setPower(onOff, lanNum)
+
+
 if __name__ == "__main__":
-    tw = TestWizardClient()
-    tw.sendUsbSwitch(0)
-    time.sleep(2)
-    tw.sendUsbSwitch(1)
-    # tw.sendKey('POWER')
-    # tw.sendKey('down')
-    # print 'sleep'
-    # time.sleep(3)
-    # tw.sendKey('up')
-    # print 111111
-    # print tw.sendBLEKey("uparrow")
-    # time.sleep(3)
-    # print tw.sendBLEKey("downarrow")
-    # time.sleep(3)
-    # print 2222222
-    # print tw.setAllPower("OFF")
-    # print tw.setAllPower("ON")
+    API_SetPower("ON", -1)
+    time.sleep(3)
+    API_SetPower("OFF", -1)
+    time.sleep(3)
+    API_SetPower("ON", 3)
+    time.sleep(3)
+    API_SetPower("OFF", 3)
+    # 切换usb;
+    time.sleep(3)
+    API_UsbSwitch(0)
+    time.sleep(10)
+    API_UsbSwitch(1)
+    # 发送红外;
+    tw_singleton.sendKey("setting")

+ 1 - 1
ssat_sdk/somking/SomkePretreatment.py

@@ -89,7 +89,7 @@ class SomkePretreatment:
         if proc.returncode == 0:
             print u'执行成功'
         # 返回执行结果;
-        # print u"结果:\033[32m%s\033[0m" % stdout
+        # print u"__cmdExecute结果:%s" % stdout
         return stdout
 
     def __adbConnect(self):

+ 5 - 3
ssat_sdk/tv_operator.py

@@ -13,7 +13,8 @@ from ssat_sdk.uiautomator2.adb_keyevent import AdbKeyevent
 from ssat_sdk.utils import string_util
 from ssat_sdk.sat_environment import getRCUDeviceSelected
 # 导入测试精灵;
-from ssat_sdk.device_manage.testWizardClient import TestWizardClient
+#from ssat_sdk.device_manage.testWizardClient import TestWizardClient
+from ssat_sdk.device_manage import testWizardClient
 
 Operator_Device = "redrat2"
 
@@ -35,7 +36,8 @@ class TvOperator():
                 self.operator = RedRatHub4.redrat4_singleton
             LoggingUtil.printLog(u"初始化TvOperator(redrat3)")
         elif "tw" == rcuDevice:
-            self.operator = TestWizardClient()
+            print u"当前红外设备:测试精灵"
+            self.operator = testWizardClient.tw_singleton
         self.adbEvent = AdbKeyevent()
         # 创建视频采集对象
         self.ccard = CCardManager()
@@ -55,7 +57,7 @@ class TvOperator():
         elif string_util.strcmp("adb", deviceType):
             self.operator = self.adbEvent
         elif string_util.strcmp("tw", deviceType):
-            self.operator = TestWizardClient()
+            self.operator = testWizardClient.tw_singleton
 
     def sendKey(self, keyName, times=1, duration=1):
         try: