# -*- coding:utf-8 -*- import os import sys import time # from device_manager import * from ssat_sdk.utils import LoggingUtil from ssat_sdk import sat_environment from ssat_sdk.utils import string_util from ssat_sdk.service.service_manager import ServiceManager from ssat_sdk.device_manage.ub530_manager import UB530Manager import CD750SSDK import UB530SDK UB530 = 'TC-UB530' UB530EX = 'TC-UB530EX' CD750 = 'CD750' class CCardManager(): status = False exe_path = sys.prefix + r"/Tools/CD750/AVerCapSDKDemo.exe" sManager = ServiceManager() DEVICE_NAME = sat_environment.getCCard_Selected() print u"视频采集卡名称:", DEVICE_NAME def __init__(self): if string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本; self.ccard = UB530Manager() elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本; CCardManager.exe_path = sys.prefix + r"/Tools/SATHelper/SATHelper.exe" else: CCardManager.exe_path = sys.prefix + r"/Tools/CD750/AVerCapSDKDemo.exe" def start(self): print "CCardManager ssdk" if string_util.strcmp(self.DEVICE_NAME, CD750): self.status = CD750SSDK.StartApp(CCardManager.exe_path) time.sleep(3) if self.status == True: LoggingUtil.printLog("打开CD750 AVerCapSDKDemo成功") # 连接设备 CD750SSDK.ConnectDevice(0) CCardManager.status = True # 开始流操作 time.sleep(1) CD750SSDK.StreamOption(True) else: LoggingUtil.printLog("打开CD750 AVerCapSDKDemo失败") return self.status elif string_util.strcmp(self.DEVICE_NAME, UB530): # Python版本; self.sManager.startServiceThread("ub530") time.sleep(2) return self.sManager.getServiceStatus("ub530") elif string_util.strcmp(self.DEVICE_NAME, UB530EX): # VC++版本; self.status = UB530SDK.StartApp(CCardManager.exe_path) time.sleep(3) if self.status == True: LoggingUtil.printLog("打开UB530 VideoCapture成功") # 连接设备 UB530SDK.ConnectDevice() CCardManager.status = True else: LoggingUtil.printLog("打开UB530 VideoCapture失败") return self.status def isExeRunning(self): if string_util.strcmp(self.DEVICE_NAME, CD750): return CD750SSDK.IsAppRunning(CCardManager.exe_path) elif string_util.strcmp(self.DEVICE_NAME, UB530EX): # VC++版本; return UB530SDK.IsAppRunning(CCardManager.exe_path) elif string_util.strcmp(self.DEVICE_NAME, UB530): # Python版本; return self.sManager.getServiceStatus("ub530") # start app def startApp(self): self.status = CD750SSDK.StartApp(CCardManager.exe_path) return self.status # close app; def stopApp(self): self.status = not CD750SSDK.StopApp(CCardManager.exe_path) return self.status # connect device def connect(self, index=0): return CD750SSDK.ConnectDevice(index) # disconnect device def disconnect(self, index=0): return CD750SSDK.DisconnectDevice(index) # show app def showApp(self): CD750SSDK.ShowApp() # hide app def hideApp(self): CD750SSDK.HideApp() # stream option,开始/停止流操作 def streamOption(self, enable=True): return CD750SSDK.StreamOption(enable) # 截图单张 def captureSingleImage(self, picpath): return CD750SSDK.CaptureSingleImage(picpath, 3, True, 0, 0, 0, 0) # 按张数截图:picpath图片路径,count截图张数 def captureImageByCount(self, count, picpath, prefix="CD750"): return CD750SSDK.CaptureImageByCount(count, picpath, prefix, 3, True, 0, 0, 0, 0) # 按时间截图:times持续时间,单位毫秒; perCount每秒截图的张数; picpath图片保存路径; def captureImageByTime(self, times, perCount, picpath, prefix="CD750"): if string_util.strcmp(CD750, self.DEVICE_NAME): return CD750SSDK.CaptureImageByTime(times, perCount, picpath, prefix, 3, True, 0, 0, 0, 0) elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本; return UB530SDK.CaptureImageByTime(times, 0, picpath, prefix, 2) elif string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本; count = 0 runTime = time.time() endWork = False lastIndex = 1 times = times/1000 while True: lastCount = lastIndex + perCount for i in range(lastIndex, lastCount): path = "%s%s_%d.jpg" % (picpath, prefix, i) if self.ccard.takePicture(path) == 0: count += 1 time.sleep(0.2) # 回调响应时间; if (time.time() - runTime >= times): endWork = True break lastIndex += perCount if endWork: break # print '实际生成数量%d,目标生成数量%d'%(count,perCount*times) return True if count == perCount*times else False # 抓单张图; def takePicture(self, picpath): status = -1 if string_util.strcmp(CD750, self.DEVICE_NAME): status = CD750SSDK.CaptureSingleImage(picpath, 3, True, 0, 0, 0, 0) time.sleep(0.5) elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本; status = UB530SDK.CaptureSingleImage(picpath, 2) count = 0 while os.path.exists(picpath) is False: time.sleep(0.1) # 等待磁盘完成响应; if count == 5: break count = count + 1 # 超过5次认为截图失败; status = True if count < 5 else False elif string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本; status = self.ccard.takePicture(picpath) if status == 0: status = 1 else: status = 0 time.sleep(1) return status # 返回保存的文件列表; def getCaptureImageList(self, picpath): list = [] if string_util.strcmp(CD750, self.DEVICE_NAME): list = CD750SSDK.GetCaptureImageList(picpath) elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本; list = UB530SDK.GetCaptureImageList(picpath) elif string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本; list = [] # 过滤掉多余的'/' newpath = '' for i in range(0, picpath.__len__(), 2): char1 = picpath[i] if i < picpath.__len__()-1: char2 = picpath[i+1] newpath += char1 if char1 == char2 and char1 == '/': continue newpath += char2 else: newpath += char1 picpath = newpath.replace('/', '\\') index = picpath.rfind('\\') # 路径; dir = picpath[:index] # 文件名; name = picpath[index+1:] # 前缀; preFix = name[:name.rfind("_")] for dirpath, dirname, filenames in os.walk(dir): for filepath in filenames: list.append(os.path.join(dirpath, filepath)) # endif list.sort() return list # 停止抓图; def stopCaptureImage(self): CD750SSDK.StopCaptureImage() # 开始录制视频; def startCaptureAudio(self, audiopath): CD750SSDK.StartCaptureAudio(audiopath) # 停止录制视频; def stopCaptureAudio(self): CD750SSDK.StopCaptureAudio() # 异步录制视频; def asyStartCaptureAudio(self, keeptime, audiopath): CD750SSDK.AsyCaptureAudio(keeptime, audiopath) if __name__ == "__main__": cardManager = CCardManager() print cardManager.start() # time.sleep(1) # print u"开始截图" time_start = time.time() cardManager.takePicture("D:\\SAT\\test1.png") time_end = time.time() print('totally cost', time_end-time_start) # print u"按时间截图" # cardManager.captureImageByTime(5000, 5, "D:/SAT/aaa/") # time.sleep(5) # print u"遍历设备" # list = CD750SSDK.GetCaptureImageList("D:/SAT/aaa/") # if list != None: # for file in list: # print file # time.sleep(2) # # 断开连接 # print u"断开设备" # cardManager.disconnect(0) # time.sleep(3) # # 关闭app # print u"关闭程序(宿主程序需要管理员权限)" # cardManager.stopApp()