123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- # coding=utf-8
- #
- # Simple Python test program to use the RedRatHub.
- #
- # Ethan Johnson, David Blight, Chris Dodge - RedRat Ltd.
- #
- import sys
- import time
- import socket
- from ssat_sdk.service import service_config
- from ssat_sdk.utils import LoggingUtil
- from ssat_sdk.utils.AbnormalClient import abnormal_client
- class RedRat4():
- # 构造函数
- def __init__(self):
- LoggingUtil.printLog("初始化RedRat4客户端")
- self.sock = socket.socket()
- self.socketOpen = False
- self.Port = service_config.ServiceConfig().getRedRatHubPort()
- self.deviceName = ""
- print u'\n\n\n \t\t Init RedRat3 \n\n\n'.encode('gbk')
- # self.__OpenSocket("127.0.0.1")
- # LoggingUtil.printLog("初始化RedRat3客户端")
- # LoggingUtil.printLog("初始化RedRat4客户端 完成")
- # 析构函数
- def __def__(self):
- print u'\n\n\n \t\t Close RedRat3 \n\n\n'.encode('gbk')
- self.__CloseSocket()
- # 打开连接
- def __OpenSocket(self, ip="127.0.0.1"):
- # print "RedRatHub4.__OpenSocket,ip and port:", ip,self.Port
- try:
- status = self.sock.connect((ip, self.Port))
- self.socketOpen = True
- return status != 0
- except Exception, e:
- abnormal_client.sendAbnormal('RCU', str(e))
- self.socketOpen = False
- print u'\n\n\n \t\t Init RedRat3 Error \n\n\n'.encode('gbk')
- # LoggingUtil.printLog(u"连接RedRatHubCmd.exe服务端失败")
- print u"连接RedRatHubCmd.exe服务端失败"
- return False
- # 关闭链接
- def __CloseSocket(self):
- if self.socketOpen:
- self.sock.close()
- self.socketOpen = False
- else:
- print("\n\n\n \t\t Socket was to close.\n\n\n")
- # 发送消息
- def __ReadData(self, message):
- # print "RedRatHub3.__ReadData, message:",message
- if not self.socketOpen:
- self.__CloseSocket()
- self.__OpenSocket()
- if not self.socketOpen:
- print("\tRedratHub4 Socket has not been opened. Call 'OpenSocket()' first.")
- return
- try:
- # Send message
- self.sock.settimeout(100)
- self.sock.send((message + '\n').encode())
- received = ""
- # Check response. This is either a single line, e.g. "OK\n", or a multi-line response with
- # '{' and '}' start/end delimiters.
- count = 0
- while True:
- count += 1
- # Receives data
- received += self.sock.recv(64).decode()
- if self.__CheckEOM(received):
- return received
- if count > 10:
- return received
- except Exception, e:
- abnormal_client.sendAbnormal('RCU', str(e))
- # 检查是否到消息末端
- def __CheckEOM(self, message):
- # Multi-line message
- if ('{' in message):
- return ('}' in message)
- # Single line message
- if ("\n" in message):
- return True
- # 添加信号数据集xml文件,相同数据集名称的将会被覆盖
- def addSignalDataSet(self, file):
- self.__ReadData('hubquery="add ir-file",file="' + file + '"')
- return
- # 加载信号数据集xml文件,将会删除以前所有加载过的xml文件
- def loadSignalDataSet(self, file):
- self.__ReadData('hubquery="load ir-file",file="' + file + '"')
- return
- # 返回遥控设备名称
- def getDeviceName(self):
- devs = self.__ReadData('hubquery="list redrats"')
- if devs is not None and devs.strip():
- devs = devs.split('\n')[1]
- # print "getDeviceName,devs:",devs
- index1 = devs.find('] ') + 2
- index2 = devs.find('(connected)') - 1
- if index2 < 0:
- index2 = None
- self.deviceName = devs[index1:index2]
- return self.deviceName
- return ""
- def getCurrentDeviceName(self):
- if not self.deviceName.strip():
- self.getDeviceName()
- return self.deviceName
- # 返回遥控键值数据文件名
- def getScriptName(self):
- list = self.__ReadData('hubquery="list datasets"')
- if list is not None and list.strip():
- list = list.lstrip('{\n')
- list = list.rstrip('\n}\n')
- return list.split('\n')
- return ""
- # 返回遥控键值数据文件中包含的所有按键名称
- def getKeyName(self):
- list = self.__ReadData(
- 'hubquery="list signals in dataset",dataset="' + self.getScriptName()[0] + '"')
- if list is not None and list.strip():
- list = list.lstrip('{\n')
- list = list.rstrip('\n}\n')
- return list.split('\n')
- return ""
- # 返回遥控键值对应的键名称
- def getKeyNumber(self):
- list = self.__ReadData(
- 'hubquery="list signals in dataset",dataset="' + self.getScriptName()[0] + '"')
- if list is not None and list.strip():
- list = list.lstrip('{\n')
- list = list.rstrip('\n}\n')
- return list.split('\n')
- return ""
- # 发送指定的按键;成功返回0,失败返回失败次数
- def sendKey(self, KeyName, Times=1, sleep_time=1):
- status = 0
- if not self.deviceName.strip():
- self.getDeviceName()
- while Times > 0:
- Times -= 1
- try:
- result = self.__ReadData(
- 'name="' + self.deviceName + '",dataset="' + self.getScriptName()[0] + '",signal="' + KeyName + '"')
- print "RedRatHub4.sendKey:%s, result:%s"%(KeyName,result)
- except Exception, e:
- print u"执行按键:%s异常!error:%s" % (str(KeyName), str(e))
- result = ""
- time.sleep(sleep_time)
- if result != "OK":
- status += 1
- return status
- # 发送指定的按键(多个),所有按钮都发送成功返回0;失败返回失败次数
- def sendKeys(self, KeyNames, sleep_time=0.1):
- print "RedRatHub4:sendKeys:", KeyNames, sleep_time
- status = 0
- for key in KeyNames:
- result = self.sendKey(key, sleep_time=sleep_time)
- if result != "OK":
- status += 1
- return status
- def sendrepeats(self, KeyName, repeats=2):
- status = 0
- if not self.deviceName.strip():
- self.getDeviceName()
- result = self.__ReadData('name="' + self.deviceName + '",dataset="' + self.getScriptName()[
- 0] + '",signal="' + KeyName + '",repeats="' + str(repeats) + '"')
- time.sleep(1)
- if result != "OK":
- status += 1
- return status
- def sendduration(self, KeyName, duration=1500, sleep_time=0.1):
- status = 0
- if not self.deviceName.strip():
- self.getDeviceName()
- result = self.__ReadData('name="' + self.deviceName + '",dataset="' + self.getScriptName()[
- 0] + '",signal="' + KeyName + '",duration="' + str(duration) + '"')
- time.sleep(sleep_time)
- if result != "OK":
- status += 1
- return status
- def sendrepeats2(self, KeyName, repeats=2, duration=1000):
- status = 0
- if not self.deviceName.strip():
- self.getDeviceName()
- result = self.__ReadData('name="' + self.deviceName + '",dataset="' + self.getScriptName()[
- 0] + '",signal="' + KeyName + '",repeats="' + str(repeats) + '",duration="' + str(duration) + '"')
- time.sleep(1)
- if result != "OK":
- status += 1
- return status
- # 关闭socket通讯
- def close(self):
- self.__CloseSocket()
- return
- # 单例模块;
- redrat4_singleton = RedRat4()
- if __name__ == "__main__":
- redrat = RedRat4()
- # 双击效果;
- keyList = redrat.getKeyName()
- print "keyList:",keyList
- # redrat.sendKey('power')
- # redrat.sendKey('home', 2, 0.1)
- # redrat.sendrepeats('home', 2)
|