123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- # -*- coding:utf-8 -*-
- import os, sys, time
- import abc
- import socket
- import json
- import numpy as np
- import struct
- import TestWizardBLK
- import binascii
- # 创建ProHeader数据类型
- ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})
- class TestWizardClient():
- def __init__(self):
- '''设备通讯超时值:毫秒'''
- self.device_timeout = 300
- '''通信sock'''
- self.sock = None
- '''通信端口号'''
- self.port = 5566
- '''通信状态'''
- self.constate = False
- '''连接服务器'''
- self.connect()
- def __def__(self):
- self.disconnect()
- '''连接服务器'''
- def connect(self):
- try:
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect(('127.0.0.1', self.port)) # 没有返回值;
- self.sock.sendall('111')
- self.constate = True
- except Exception, e:
- print "TestWizardClient=> socket connect error:", e, self.port
- self.constate = False
- return self.constate
- # 发送消息;
- def __sendCmd(self, cmdType, command):
- '''是否连接'''
- if self.constate is False:
- if self.connect() is False:
- return False
- '''发送请求数据'''
- try:
- self.sock.settimeout(3)
- command = cmdType + ">" + command
- phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
- '''一次性发送完整包'''
- self.sock.sendall(bytearray(phead.tobytes() + bytes(command)))
- self.sock.settimeout(None)
- except Exception, e:
- print "__send " + cmdType + "=> send Error:", e
- import traceback
- traceback.print_exc()
- self.disconnect()
- self.connect()
- return False
- try:
- self.sock.settimeout(2)
- recvBytes = bytearray(self.sock.recv(1024))
- data = self.parseRetData(recvBytes, phead.itemsize)
- if int(data) == 1:
- print "Info:__send %s: %s success!" % (cmdType, command)
- return True
- print "Error:__send %s: %s fail!" % (cmdType, command)
- except Exception, e:
- print "__send " + cmdType + "=> recv Error:", e
- return False
- def parseRetData(self, recvByteArray, headSize):
- version = recvByteArray[0] & 0xff
- recvLen = self.byte4ToInt(recvByteArray, 1)
- if version == 0xAC and recvLen == recvByteArray.__len__():
- byteArr = bytes(recvByteArray)
- data = byteArr[headSize: recvByteArray.__len__()]
- return data
- return "0"
- def byte4ToInt(self, bytearray, start):
- i = (bytearray[start] & 0xff) \
- | (bytearray[start + 1] & 0xff) << 8 \
- | (bytearray[start + 2] & 0xff) << 16 \
- | (bytearray[start + 3] & 0xff) << 24
- return i
- # 发送消息;
- def sendKey(self, keyname, times=1, duration=1):
- while times > 0:
- times -= 1
- self.__sendCmd("ir", keyname)
- time.sleep(duration)
- # 发送多个按键;
- def sendKeys(self, keyNames, duration):
- for key in keyNames:
- self.__sendCmd("ir", key)
- time.sleep(duration)
- def sendBLEKey(self, bleKeyName):
- if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
- cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
- ret = self.__sendCmd("cmd", cmdStr)
- # time.sleep(1)
- return ret
- else:
- return False
- '''
- 函数:电源开关,如果lanNum=-1设置所有LAN口;
- 参数:
- onOff:字符串,只能是ON或OFF
- lanNum:整型或None
- 返回:设置成功返回True
- '''
- def setPower(self, onOff, lanNum=-1):
- cmdStr = None
- if onOff <> "ON" and onOff <> "OFF":
- return False
- if isinstance(lanNum, int) == False:
- return False
- if lanNum <= -1: # 认为负数都是全控制;
- cmdStr = "*SET POWER " + onOff
- elif lanNum > 0: # LAN0不是电源控制,排除掉;
- cmdStr = "*SET LAN %d POWER %s" % (lanNum, onOff)
- if cmdStr is None:
- return False
- ret = self.__sendCmd("cmd", cmdStr)
- # time.sleep(1)
- return ret
- def sendUsbSwitch(self, index):
- if index < 0 or index > 1:
- print u"usb 索引只能0、1。当前=", index
- return False
- return self.__sendCmd('usb', str(index))
- def loadSignalDataSet(self, file):
- pass
- def addSignalDataSet(self, file):
- pass
- def getDeviceName(self):
- pass
- def getScriptName(self):
- pass
- def getKeyName(self):
- pass
- def close(self):
- self.disconnect()
- def getCurrentDeviceName(self):
- pass
- '''断开连接'''
- def disconnect(self):
- try:
- self.sock.shutdown(2)
- self.sock.close()
- except Exception, e:
- print "TestWizardClient=> socket disconnect error:", e
- # 单例模块;
- tw_singleton = TestWizardClient()
- print u"tw_singleton地址", tw_singleton
- def API_UsbSwitch(index):
- return tw_singleton.sendUsbSwitch(index)
- def API_SetPower(onOff, lanNum):
- return tw_singleton.setPower(onOff, lanNum)
- if __name__ == "__main__":
- API_SetPower("ON", -1)
- time.sleep(3)
- API_SetPower("OFF", -1)
- time.sleep(3)
- API_SetPower("ON", 3)
- time.sleep(3)
- API_SetPower("OFF", 3)
- # 切换usb;
- time.sleep(3)
- API_UsbSwitch(0)
- time.sleep(10)
- API_UsbSwitch(1)
- # 发送红外;
- tw_singleton.sendKey("setting")
|