123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- # -*- coding:utf-8 -*-
- import os
- import sys
- import time
- # from device_manager import *
- from ssat_sdk.utils import LoggingUtil
- from ssat_sdk import sat_environment
- from ssat_sdk.utils import string_util
- from ssat_sdk.service.service_manager import ServiceManager
- from ssat_sdk.device_manage.ub530_manager import UB530Manager
- import CD750SSDK
- import UB530SDK
- UB530 = 'TC-UB530'
- UB530EX = 'TC-UB530EX'
- CD750 = 'CD750'
- class CCardManager():
- status = False
- exe_path = sys.prefix + r"/Tools/CD750/AVerCapSDKDemo.exe"
- sManager = ServiceManager()
- DEVICE_NAME = sat_environment.getCCard_Selected()
- print u"视频采集卡名称:", DEVICE_NAME
- def __init__(self):
- if string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本;
- self.ccard = UB530Manager()
- elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本;
- CCardManager.exe_path = sys.prefix + r"/Tools/SATHelper/SATHelper.exe"
- else:
- CCardManager.exe_path = sys.prefix + r"/Tools/CD750/AVerCapSDKDemo.exe"
- def start(self):
- print "CCardManager ssdk"
- if string_util.strcmp(self.DEVICE_NAME, CD750):
- self.status = CD750SSDK.StartApp(CCardManager.exe_path)
- time.sleep(3)
- if self.status == True:
- LoggingUtil.printLog("打开CD750 AVerCapSDKDemo成功")
- # 连接设备
- CD750SSDK.ConnectDevice(0)
- CCardManager.status = True
- # 开始流操作
- time.sleep(1)
- CD750SSDK.StreamOption(True)
- else:
- LoggingUtil.printLog("打开CD750 AVerCapSDKDemo失败")
- return self.status
- elif string_util.strcmp(self.DEVICE_NAME, UB530): # Python版本;
- self.sManager.startServiceThread("ub530")
- time.sleep(2)
- return self.sManager.getServiceStatus("ub530")
- elif string_util.strcmp(self.DEVICE_NAME, UB530EX): # VC++版本;
- self.status = UB530SDK.StartApp(CCardManager.exe_path)
- time.sleep(3)
- if self.status == True:
- LoggingUtil.printLog("打开UB530 VideoCapture成功")
- # 连接设备
- UB530SDK.ConnectDevice()
- CCardManager.status = True
- else:
- LoggingUtil.printLog("打开UB530 VideoCapture失败")
- return self.status
- def isExeRunning(self):
- if string_util.strcmp(self.DEVICE_NAME, CD750):
- return CD750SSDK.IsAppRunning(CCardManager.exe_path)
- elif string_util.strcmp(self.DEVICE_NAME, UB530EX): # VC++版本;
- return UB530SDK.IsAppRunning(CCardManager.exe_path)
- elif string_util.strcmp(self.DEVICE_NAME, UB530): # Python版本;
- return self.sManager.getServiceStatus("ub530")
- # start app
- def startApp(self):
- self.status = CD750SSDK.StartApp(CCardManager.exe_path)
- return self.status
- # close app;
- def stopApp(self):
- self.status = not CD750SSDK.StopApp(CCardManager.exe_path)
- return self.status
- # connect device
- def connect(self, index=0):
- return CD750SSDK.ConnectDevice(index)
- # disconnect device
- def disconnect(self, index=0):
- return CD750SSDK.DisconnectDevice(index)
- # show app
- def showApp(self):
- CD750SSDK.ShowApp()
- # hide app
- def hideApp(self):
- CD750SSDK.HideApp()
- # stream option,开始/停止流操作
- def streamOption(self, enable=True):
- return CD750SSDK.StreamOption(enable)
- # 截图单张
- def captureSingleImage(self, picpath):
- return CD750SSDK.CaptureSingleImage(picpath, 3, True, 0, 0, 0, 0)
- # 按张数截图:picpath图片路径,count截图张数
- def captureImageByCount(self, count, picpath, prefix="CD750"):
- return CD750SSDK.CaptureImageByCount(count, picpath, prefix, 3, True, 0, 0, 0, 0)
- # 按时间截图:times持续时间,单位毫秒; perCount每秒截图的张数; picpath图片保存路径;
- def captureImageByTime(self, times, perCount, picpath, prefix="CD750"):
- if string_util.strcmp(CD750, self.DEVICE_NAME):
- return CD750SSDK.CaptureImageByTime(times, perCount, picpath, prefix, 3, True, 0, 0, 0, 0)
- elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本;
- return UB530SDK.CaptureImageByTime(times, 0, picpath, prefix, 2)
- elif string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本;
- count = 0
- runTime = time.time()
- endWork = False
- lastIndex = 1
- times = times/1000
- while True:
- lastCount = lastIndex + perCount
- for i in range(lastIndex, lastCount):
- path = "%s%s_%d.jpg" % (picpath, prefix, i)
- if self.ccard.takePicture(path) == 0:
- count += 1
- time.sleep(0.2) # 回调响应时间;
- if (time.time() - runTime >= times):
- endWork = True
- break
- lastIndex += perCount
- if endWork:
- break
- # print '实际生成数量%d,目标生成数量%d'%(count,perCount*times)
- return True if count == perCount*times else False
- # 抓单张图;
- def takePicture(self, picpath):
- status = -1
- if string_util.strcmp(CD750, self.DEVICE_NAME):
- status = CD750SSDK.CaptureSingleImage(picpath, 3, True, 0, 0, 0, 0)
- time.sleep(0.5)
- elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本;
- status = UB530SDK.CaptureSingleImage(picpath, 2)
- count = 0
- while os.path.exists(picpath) is False:
- time.sleep(0.1) # 等待磁盘完成响应;
- if count == 5:
- break
- count = count + 1
- # 超过5次认为截图失败;
- status = True if count < 5 else False
- elif string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本;
- status = self.ccard.takePicture(picpath)
- if status == 0:
- status = 1
- else:
- status = 0
- time.sleep(1)
- return status
- # 返回保存的文件列表;
- def getCaptureImageList(self, picpath):
- list = []
- if string_util.strcmp(CD750, self.DEVICE_NAME):
- list = CD750SSDK.GetCaptureImageList(picpath)
- elif string_util.strcmp(UB530EX, self.DEVICE_NAME): # VC++版本;
- list = UB530SDK.GetCaptureImageList(picpath)
- elif string_util.strcmp(UB530, self.DEVICE_NAME): # Python版本;
- list = []
- # 过滤掉多余的'/'
- newpath = ''
- for i in range(0, picpath.__len__(), 2):
- char1 = picpath[i]
- if i < picpath.__len__()-1:
- char2 = picpath[i+1]
- newpath += char1
- if char1 == char2 and char1 == '/':
- continue
- newpath += char2
- else:
- newpath += char1
- picpath = newpath.replace('/', '\\')
- index = picpath.rfind('\\')
- # 路径;
- dir = picpath[:index]
- # 文件名;
- name = picpath[index+1:]
- # 前缀;
- preFix = name[:name.rfind("_")]
- for dirpath, dirname, filenames in os.walk(dir):
- for filepath in filenames:
- list.append(os.path.join(dirpath, filepath))
- # endif
- list.sort()
- return list
- # 停止抓图;
- def stopCaptureImage(self):
- CD750SSDK.StopCaptureImage()
- # 开始录制视频;
- def startCaptureAudio(self, audiopath):
- CD750SSDK.StartCaptureAudio(audiopath)
- # 停止录制视频;
- def stopCaptureAudio(self):
- CD750SSDK.StopCaptureAudio()
- # 异步录制视频;
- def asyStartCaptureAudio(self, keeptime, audiopath):
- CD750SSDK.AsyCaptureAudio(keeptime, audiopath)
- if __name__ == "__main__":
- cardManager = CCardManager()
- print cardManager.start()
- # time.sleep(1)
- # print u"开始截图"
- time_start = time.time()
- cardManager.takePicture("D:\\SAT\\test1.png")
- time_end = time.time()
- print('totally cost', time_end-time_start)
- # print u"按时间截图"
- # cardManager.captureImageByTime(5000, 5, "D:/SAT/aaa/")
- # time.sleep(5)
- # print u"遍历设备"
- # list = CD750SSDK.GetCaptureImageList("D:/SAT/aaa/")
- # if list != None:
- # for file in list:
- # print file
- # time.sleep(2)
- # # 断开连接
- # print u"断开设备"
- # cardManager.disconnect(0)
- # time.sleep(3)
- # # 关闭app
- # print u"关闭程序(宿主程序需要管理员权限)"
- # cardManager.stopApp()
|