| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 | # -*- coding:utf-8 -*-import os, sys, timeimport abcimport socketimport jsonimport numpy as npimport structimport TestWizardBLKimport binascii# 创建ProHeader数据类型ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})class TestWizardClient():    def __init__(self):        '''设备通讯超时值:毫秒'''        self.device_timeout = 300        '''通信sock'''        self.sock = None        '''通信端口号'''        self.port = 5566        '''通信状态'''        self.constate = False        '''连接服务器'''        self.connect()    def __def__(self):        self.disconnect()    '''连接服务器'''    def connect(self):        try:            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)            self.sock.connect(('127.0.0.1', self.port))  # 没有返回值;            self.sock.sendall('111')            self.constate = True        except Exception, e:            print "TestWizardClient=> socket connect error:", e, self.port            self.constate = False        return self.constate    # 发送消息;    def __sendCmd(self, cmdType, command):        '''是否连接'''        if self.constate is False:            if self.connect() is False:                return False        '''发送请求数据'''        try:            self.sock.settimeout(3)            command = cmdType + ">" + command            phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)            '''一次性发送完整包'''            self.sock.sendall(bytearray(phead.tobytes() + bytes(command)))            self.sock.settimeout(None)        except Exception, e:            print "__send " + cmdType + "=> send Error:", e            import traceback            traceback.print_exc()            self.disconnect()            self.connect()            return False        try:            self.sock.settimeout(2)            recvBytes = bytearray(self.sock.recv(1024))            data = self.parseRetData(recvBytes, phead.itemsize)            if int(data) == 1:                print "Info:__send %s: %s success!" % (cmdType, command)                return True            print "Error:__send %s: %s fail!" % (cmdType, command)        except Exception, e:            print "__send " + cmdType + "=> recv Error:", e            return False    def parseRetData(self, recvByteArray, headSize):        version = recvByteArray[0] & 0xff        recvLen = self.byte4ToInt(recvByteArray, 1)        if version == 0xAC and recvLen == recvByteArray.__len__():            byteArr = bytes(recvByteArray)            data = byteArr[headSize: recvByteArray.__len__()]            return data        return "0"    def byte4ToInt(self, bytearray, start):        i = (bytearray[start] & 0xff) \            | (bytearray[start + 1] & 0xff) << 8 \            | (bytearray[start + 2] & 0xff) << 16 \            | (bytearray[start + 3] & 0xff) << 24        return i    # 发送消息;    def sendKey(self, keyname, times=1, duration=1):        while times > 0:            times -= 1            self.__sendCmd("ir", keyname)            time.sleep(duration)    # 发送多个按键;    def sendKeys(self, keyNames, duration):        for key in keyNames:            self.__sendCmd("ir", key)            time.sleep(duration)    def sendBLEKey(self, bleKeyName):        if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):            cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]            ret = self.__sendCmd("cmd", cmdStr)            # time.sleep(1)            return ret        else:            return False    '''    函数:电源开关,如果lanNum=-1设置所有LAN口;    参数:        onOff:字符串,只能是ON或OFF        lanNum:整型或None    返回:设置成功返回True    '''    def setPower(self, onOff, lanNum=-1):        cmdStr = None        if onOff <> "ON" and onOff <> "OFF":            return False        if isinstance(lanNum, int) == False:            return False        if lanNum <= -1:  # 认为负数都是全控制;            cmdStr = "*SET POWER " + onOff        elif lanNum > 0:  # LAN0不是电源控制,排除掉;            cmdStr = "*SET LAN %d POWER %s" % (lanNum, onOff)        if cmdStr is None:            return False        ret = self.__sendCmd("cmd", cmdStr)        # time.sleep(1)        return ret    def sendUsbSwitch(self, index):        if index < 0 or index > 1:            print u"usb 索引只能0、1。当前=", index            return False        return self.__sendCmd('usb', str(index))    def loadSignalDataSet(self, file):        pass    def addSignalDataSet(self, file):        pass    def getDeviceName(self):        pass    def getScriptName(self):        pass    def getKeyName(self):        pass    def close(self):        self.disconnect()    def getCurrentDeviceName(self):        pass    '''断开连接'''    def disconnect(self):        try:            self.sock.shutdown(2)            self.sock.close()        except Exception, e:            print "TestWizardClient=> socket disconnect error:", e# 单例模块;tw_singleton = TestWizardClient()print u"tw_singleton地址", tw_singletondef API_UsbSwitch(index):    return tw_singleton.sendUsbSwitch(index)def API_SetPower(onOff, lanNum):    return tw_singleton.setPower(onOff, lanNum)if __name__ == "__main__":    API_SetPower("ON", -1)    time.sleep(3)    API_SetPower("OFF", -1)    time.sleep(3)    API_SetPower("ON", 3)    time.sleep(3)    API_SetPower("OFF", 3)    # 切换usb;    time.sleep(3)    API_UsbSwitch(0)    time.sleep(10)    API_UsbSwitch(1)    # 发送红外;    tw_singleton.sendKey("setting")
 |