# coding=utf-8 # # Simple Python test program to use the RedRatHub. # # Ethan Johnson, David Blight, Chris Dodge - RedRat Ltd. # import sys import time import socket from ssat_sdk.service import service_config from ssat_sdk.utils import LoggingUtil from ssat_sdk.utils.AbnormalClient import abnormal_client class RedRat4(): # 构造函数 def __init__(self): LoggingUtil.printLog("初始化RedRat4客户端") self.sock = socket.socket() self.socketOpen = False self.Port = service_config.ServiceConfig().getRedRatHubPort() self.deviceName = "" print u'\n\n\n \t\t Init RedRat3 \n\n\n'.encode('gbk') # self.__OpenSocket("127.0.0.1") # LoggingUtil.printLog("初始化RedRat3客户端") # LoggingUtil.printLog("初始化RedRat4客户端 完成") # 析构函数 def __def__(self): print u'\n\n\n \t\t Close RedRat3 \n\n\n'.encode('gbk') self.__CloseSocket() # 打开连接 def __OpenSocket(self, ip="127.0.0.1"): # print "RedRatHub4.__OpenSocket,ip and port:", ip,self.Port try: status = self.sock.connect((ip, self.Port)) self.socketOpen = True return status != 0 except Exception, e: abnormal_client.sendAbnormal('RCU', str(e)) self.socketOpen = False print u'\n\n\n \t\t Init RedRat3 Error \n\n\n'.encode('gbk') # LoggingUtil.printLog(u"连接RedRatHubCmd.exe服务端失败") print u"连接RedRatHubCmd.exe服务端失败" return False # 关闭链接 def __CloseSocket(self): if self.socketOpen: self.sock.close() self.socketOpen = False else: print("\n\n\n \t\t Socket was to close.\n\n\n") # 发送消息 def __ReadData(self, message): # print "RedRatHub3.__ReadData, message:",message if not self.socketOpen: self.__CloseSocket() self.__OpenSocket() if not self.socketOpen: print("\tRedratHub4 Socket has not been opened. Call 'OpenSocket()' first.") return try: # Send message self.sock.settimeout(100) self.sock.send((message + '\n').encode()) received = "" # Check response. This is either a single line, e.g. "OK\n", or a multi-line response with # '{' and '}' start/end delimiters. count = 0 while True: count += 1 # Receives data received += self.sock.recv(64).decode() if self.__CheckEOM(received): return received if count > 10: return received except Exception, e: abnormal_client.sendAbnormal('RCU', str(e)) # 检查是否到消息末端 def __CheckEOM(self, message): # Multi-line message if ('{' in message): return ('}' in message) # Single line message if ("\n" in message): return True # 添加信号数据集xml文件,相同数据集名称的将会被覆盖 def addSignalDataSet(self, file): self.__ReadData('hubquery="add ir-file",file="' + file + '"') return # 加载信号数据集xml文件,将会删除以前所有加载过的xml文件 def loadSignalDataSet(self, file): self.__ReadData('hubquery="load ir-file",file="' + file + '"') return # 返回遥控设备名称 def getDeviceName(self): devs = self.__ReadData('hubquery="list redrats"') if devs is not None and devs.strip(): devs = devs.split('\n')[1] # print "getDeviceName,devs:",devs index1 = devs.find('] ') + 2 index2 = devs.find('(connected)') - 1 if index2 < 0: index2 = None self.deviceName = devs[index1:index2] return self.deviceName return "" def getCurrentDeviceName(self): if not self.deviceName.strip(): self.getDeviceName() return self.deviceName # 返回遥控键值数据文件名 def getScriptName(self): list = self.__ReadData('hubquery="list datasets"') if list is not None and list.strip(): list = list.lstrip('{\n') list = list.rstrip('\n}\n') return list.split('\n') return "" # 返回遥控键值数据文件中包含的所有按键名称 def getKeyName(self): list = self.__ReadData( 'hubquery="list signals in dataset",dataset="' + self.getScriptName()[0] + '"') if list is not None and list.strip(): list = list.lstrip('{\n') list = list.rstrip('\n}\n') return list.split('\n') return "" # 返回遥控键值对应的键名称 def getKeyNumber(self): list = self.__ReadData( 'hubquery="list signals in dataset",dataset="' + self.getScriptName()[0] + '"') if list is not None and list.strip(): list = list.lstrip('{\n') list = list.rstrip('\n}\n') return list.split('\n') return "" # 发送指定的按键;成功返回0,失败返回失败次数 def sendKey(self, KeyName, Times=1, sleep_time=1): status = 0 if not self.deviceName.strip(): self.getDeviceName() while Times > 0: Times -= 1 try: result = self.__ReadData( 'name="' + self.deviceName + '",dataset="' + self.getScriptName()[0] + '",signal="' + KeyName + '"') print "RedRatHub4.sendKey:%s, result:%s"%(KeyName,result) except Exception, e: print u"执行按键:%s异常!error:%s" % (str(KeyName), str(e)) result = "" time.sleep(sleep_time) if result != "OK": status += 1 return status # 发送指定的按键(多个),所有按钮都发送成功返回0;失败返回失败次数 def sendKeys(self, KeyNames, sleep_time=0.1): print "RedRatHub4:sendKeys:", KeyNames, sleep_time status = 0 for key in KeyNames: result = self.sendKey(key, sleep_time=sleep_time) if result != "OK": status += 1 return status def sendrepeats(self, KeyName, repeats=2): status = 0 if not self.deviceName.strip(): self.getDeviceName() result = self.__ReadData('name="' + self.deviceName + '",dataset="' + self.getScriptName()[ 0] + '",signal="' + KeyName + '",repeats="' + str(repeats) + '"') time.sleep(1) if result != "OK": status += 1 return status def sendduration(self, KeyName, duration=1500, sleep_time=0.1): status = 0 if not self.deviceName.strip(): self.getDeviceName() result = self.__ReadData('name="' + self.deviceName + '",dataset="' + self.getScriptName()[ 0] + '",signal="' + KeyName + '",duration="' + str(duration) + '"') time.sleep(sleep_time) if result != "OK": status += 1 return status def sendrepeats2(self, KeyName, repeats=2, duration=1000): status = 0 if not self.deviceName.strip(): self.getDeviceName() result = self.__ReadData('name="' + self.deviceName + '",dataset="' + self.getScriptName()[ 0] + '",signal="' + KeyName + '",repeats="' + str(repeats) + '",duration="' + str(duration) + '"') time.sleep(1) if result != "OK": status += 1 return status # 关闭socket通讯 def close(self): self.__CloseSocket() return # 单例模块; redrat4_singleton = RedRat4() if __name__ == "__main__": redrat = RedRat4() # 双击效果; keyList = redrat.getKeyName() print "keyList:",keyList # redrat.sendKey('power') # redrat.sendKey('home', 2, 0.1) # redrat.sendrepeats('home', 2)