# -*- coding:utf-8 -*- import os, sys, time import abc import socket import json import numpy as np import struct import TestWizardBLK import binascii # 创建ProHeader数据类型 ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']}) class TestWizardClient(): def __init__(self): '''设备通讯超时值:毫秒''' self.device_timeout = 300 '''通信sock''' self.sock = None '''通信端口号''' self.port = 5566 '''通信状态''' self.constate = False '''连接服务器''' self.connect() def __def__(self): self.disconnect() '''连接服务器''' def connect(self): try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(('127.0.0.1', self.port)) # 没有返回值; self.sock.sendall('111') self.constate = True except Exception, e: print "TestWizardClient=> socket connect error:", e, self.port self.constate = False return self.constate # 发送消息; def __sendCmd(self, cmdType, command): '''是否连接''' if self.constate is False: if self.connect() is False: return False '''发送请求数据''' try: self.sock.settimeout(3) command = cmdType + ">" + command phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead) '''一次性发送完整包''' self.sock.sendall(bytearray(phead.tobytes() + bytes(command))) self.sock.settimeout(None) except Exception, e: print "__send " + cmdType + "=> send Error:", e import traceback traceback.print_exc() self.disconnect() self.connect() return False try: self.sock.settimeout(2) recvBytes = bytearray(self.sock.recv(1024)) data = self.parseRetData(recvBytes, phead.itemsize) if int(data) == 1: print "Info:__send %s: %s success!" % (cmdType, command) return True print "Error:__send %s: %s fail!" % (cmdType, command) except Exception, e: print "__send " + cmdType + "=> recv Error:", e return False def parseRetData(self, recvByteArray, headSize): version = recvByteArray[0] & 0xff recvLen = self.byte4ToInt(recvByteArray, 1) if version == 0xAC and recvLen == recvByteArray.__len__(): byteArr = bytes(recvByteArray) data = byteArr[headSize: recvByteArray.__len__()] return data return "0" def byte4ToInt(self, bytearray, start): i = (bytearray[start] & 0xff) \ | (bytearray[start + 1] & 0xff) << 8 \ | (bytearray[start + 2] & 0xff) << 16 \ | (bytearray[start + 3] & 0xff) << 24 return i # 发送消息; def sendKey(self, keyname, times=1, duration=1): while times > 0: times -= 1 self.__sendCmd("ir", keyname) time.sleep(duration) # 发送多个按键; def sendKeys(self, keyNames, duration): for key in keyNames: self.__sendCmd("ir", key) time.sleep(duration) def sendBLEKey(self, bleKeyName): if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()): cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()] ret = self.__sendCmd("cmd", cmdStr) # time.sleep(1) return ret else: return False ''' 函数:电源开关,如果lanNum=-1设置所有LAN口; 参数: onOff:字符串,只能是ON或OFF lanNum:整型或None 返回:设置成功返回True ''' def setPower(self, onOff, lanNum=-1): cmdStr = None if onOff <> "ON" and onOff <> "OFF": return False if isinstance(lanNum, int) == False: return False if lanNum <= -1: # 认为负数都是全控制; cmdStr = "*SET POWER " + onOff elif lanNum > 0: # LAN0不是电源控制,排除掉; cmdStr = "*SET LAN %d POWER %s" % (lanNum, onOff) if cmdStr is None: return False ret = self.__sendCmd("cmd", cmdStr) # time.sleep(1) return ret def sendUsbSwitch(self, index): if index < 0 or index > 1: print u"usb 索引只能0、1。当前=", index return False return self.__sendCmd('usb', str(index)) def loadSignalDataSet(self, file): pass def addSignalDataSet(self, file): pass def getDeviceName(self): pass def getScriptName(self): pass def getKeyName(self): pass def close(self): self.disconnect() def getCurrentDeviceName(self): pass '''断开连接''' def disconnect(self): try: self.sock.shutdown(2) self.sock.close() except Exception, e: print "TestWizardClient=> socket disconnect error:", e # 单例模块; tw_singleton = TestWizardClient() print u"tw_singleton地址", tw_singleton def API_UsbSwitch(index): return tw_singleton.sendUsbSwitch(index) def API_SetPower(onOff, lanNum): return tw_singleton.setPower(onOff, lanNum) if __name__ == "__main__": API_SetPower("ON", -1) time.sleep(3) API_SetPower("OFF", -1) time.sleep(3) API_SetPower("ON", 3) time.sleep(3) API_SetPower("OFF", 3) # 切换usb; time.sleep(3) API_UsbSwitch(0) time.sleep(10) API_UsbSwitch(1) # 发送红外; tw_singleton.sendKey("setting")