123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- # -*- coding:utf-8 -*-
- import os, sys, time
- import abc
- import socket
- import json
- import numpy as np
- import struct
- import TestWizardBLK
- import binascii
- # 创建ProHeader数据类型
- ProHead = np.dtype({'names': ['version', 'len'], 'formats': ['u1', 'u4']})
- class TestWizardClient():
- def __init__(self):
- '''设备通讯超时值:毫秒'''
- self.device_timeout = 300
- '''通信sock'''
- self.sock = None
- '''通信端口号'''
- self.port = 5566
- '''通信状态'''
- self.constate = False
- '''连接服务器'''
- self.connect()
- def __def__(self):
- self.disconnect()
- '''连接服务器'''
- def connect(self):
- try:
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect(('127.0.0.1', self.port)) # 没有返回值;
- self.sock.sendall('111')
- self.constate = True
- except Exception, e:
- print "TestWizardClient=> socket connect error:", e, self.port
- self.constate = False
- return self.constate
- # 发送消息;
- def __sendCmd(self, cmdType, command):
- '''是否连接'''
- if self.constate is False:
- if self.connect() is False:
- return False
- '''发送请求数据'''
- try:
- self.sock.settimeout(3)
- command = cmdType + ">" + command
- phead = np.array([(0xAC, command.__len__() + np.array([(0xAA, 0)], dtype=ProHead).itemsize)], dtype=ProHead)
- '''一次性发送完整包'''
- self.sock.sendall(bytearray(phead.tobytes() + bytes(command)))
- self.sock.settimeout(None)
- except Exception, e:
- print "__send " + cmdType + "=> send Error:", e
- import traceback
- traceback.print_exc()
- self.disconnect()
- self.connect()
- return False
- try:
- self.sock.settimeout(2)
- recvBytes = bytearray(self.sock.recv(1024))
- data = self.parseRetData(recvBytes, phead.itemsize)
- if int(data) == 1:
- print "Info:__send %s: %s success!"%(cmdType, command)
- return True
- print "Error:__send %s: %s fail!"%(cmdType, command)
- except Exception, e:
- print "__send " + cmdType + "=> recv Error:", e
- return False
- def parseRetData(self, recvByteArray, headSize):
- version = recvByteArray[0]&0xff
- recvLen = self.byte4ToInt(recvByteArray, 1)
- if version == 0xAC and recvLen == recvByteArray.__len__():
- byteArr = bytes(recvByteArray)
- data = byteArr[headSize: recvByteArray.__len__()]
- return data
- return "0"
- def byte4ToInt(self,bytearray, start):
- i = (bytearray[start] & 0xff) \
- | (bytearray[start+1] & 0xff) << 8 \
- | (bytearray[start+2] & 0xff) << 16\
- | (bytearray[start+3] & 0xff) << 24
- return i
- # 发送消息;
- def sendKey(self, keyname, times=1, duration=1):
- while times > 0:
- times -= 1
- self.__sendkey(keyname)
- time.sleep(duration)
-
- # 发送多个按键;
- def sendKeys(self, keyNames, duration):
- for key in keyNames:
- self.__sendCmd("ir", key)
- time.sleep(duration)
- def sendBLEKey(self, bleKeyName):
- if TestWizardBLK.BLK_DIC.has_key(bleKeyName.lower()):
- cmdStr = "*INPUT BLEKEY " + TestWizardBLK.BLK_DIC[bleKeyName.lower()]
- ret = self.__sendCmd("cmd", cmdStr)
- time.sleep(1)
- return ret
- else:
- return False
- '''
- 电源开关,所有电源同时控制,不区分LAN口
- :param onOff: ON代表开,OFF代表关,区分大小写。
- :return True代表成功,False达标失败
- '''
- def setAllPower(self, onOff):
- if onOff <> "ON" and onOff <> "OFF":
- return False
- cmdStr = "*SET " + onOff
- ret = self.__sendCmd("cmd", cmdStr)
- time.sleep(1)
- return ret
- def sendUsbSwitch(self, index):
- if index < 0 or index > 1:
- print u"usb 索引只能0、1。当前=", index
- return False
- return self.__sendCmd('usb', str(index))
- def loadSignalDataSet(self, file):
- pass
- def addSignalDataSet(self, file):
- pass
- def getDeviceName(self):
- pass
- def getScriptName(self):
- pass
- def getKeyName(self):
- pass
- def close(self):
- self.disconnect()
- def getCurrentDeviceName(self):
- pass
- '''断开连接'''
- def disconnect(self):
- try:
- self.sock.shutdown(2)
- self.sock.close()
- except Exception, e:
- print "TestWizardClient=> socket disconnect error:", e
- if __name__ == "__main__":
- tw = TestWizardClient()
- tw.sendUsbSwitch(0)
- time.sleep(2)
- tw.sendUsbSwitch(1)
- # tw.sendKey('POWER')
- # tw.sendKey('down')
- # print 'sleep'
- # time.sleep(3)
- # tw.sendKey('up')
- # print 111111
- # print tw.sendBLEKey("uparrow")
- # time.sleep(3)
- # print tw.sendBLEKey("downarrow")
- # time.sleep(3)
- # print 2222222
- # print tw.setAllPower("OFF")
- # print tw.setAllPower("ON")
|