# -*- coding:utf-8 -*- import os import sys import time import datetime import socket import hashlib # md5验证; from ssh2.session import Session from ssh2.sftp import LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR class baseSSH2: def __init__(self): self.__user = "" self.__pwd = "" self.__host = "" self.__port = 22 ''' 函数:创建tcp sock连接; 参数:无 返回:成功创建tcp连接返回sock,否则返回None 注意:创建成功后的sock,在外部使用完后必须调用sock.close()释放; ''' def __create_sock(self): # 创建tcp连接; try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((self.__host, int(self.__port))) # 没有返回值; return sock except Exception, e: print u'创建ssh2 socket失败', e return None ''' 函数:初始化ssh2所需参数; 参数:user、pwd、host、port 返回:无 ''' def init_ssh2(self, user, pwd, host, port=22): self.__host = host self.__port = port self.__user = user self.__pwd = pwd ''' 函数:执行命令; 参数:cmd 返回:执行成功,返回True及执行结果; ''' def execute_cmd(self, cmd): print u"ssh2.execute_cmd=",cmd sock = self.__create_sock() if sock is None: return False, "" # 创建ssh2会话连接; session = Session() # 关联tcp socket; if session.handshake(sock) is None: sock.close() print u'建立ssh2 会话失败' return False, "" # 以用户+密码方式建立认证连接; if session.userauth_password(self.__user, self.__pwd) is None: sock.close() print u'登录ssh2 会话失败' return False, "" strdata = '' # 创建会话通道; channel = session.open_session() channel.execute(cmd) size, data = channel.read() strdata = data while size > 0: size, data = channel.read() strdata += data channel.close() # print("Exit status: %s" % channel.get_exit_status()) sock.close() return True if channel.get_exit_status() == 0 else False, strdata ''' 函数:下载文件; 参数:ftp_path要下载的文件路径, local_path要保存的文件路径; 返回:执行成功,返回True; ''' def sftp_download(self, ftp_path, local_path): sock = self.__create_sock() if sock is None: return False # 创建ssh2会话连接; session = Session() # 关联tcp socket; if session.handshake(sock) is None: sock.close() print u'建立ssh2 会话失败' return False # 以用户+密码方式建立认证连接; if session.userauth_password(self.__user, self.__pwd) is None: sock.close() print u'登录ssh2 会话失败' return False sftp = session.sftp_init() # now = datetime.time() # print("Starting read for remote file %s" % ftp_path) try: with sftp.open(ftp_path, LIBSSH2_FXF_READ, LIBSSH2_SFTP_S_IRUSR) as fh, open(local_path, 'wb+') as lh: for size, data in fh: lh.write(data) # print 'download size=',size lh.close() # print("Finished file read in %s" % (datetime.time() - now)) sock.close() return True except Exception, e: print u'下载失败:',e return False ''' 函数:计算文件md5值; 参数:file要计算的文件路径; 返回:执行成功返回md5值,否则返回None; ''' def get_md5_sum(self, sftp_file): cmd = 'md5sum %s' % sftp_file boolean, data = self.execute_cmd(cmd) if boolean is False: return None # 分组; str_list = data.split(' ') if str_list.__len__() != 2: return None return str_list[0] ''' 函数:下载文件,并验证md5是否正确; 参数:ftp_path要下载的文件路径, local_path要保存的文件路径; 返回:执行成功,返回True; ''' def sftp_download_md5(self, ftp_path, local_path): # 先计算md5值; sftp_md5 = self.get_md5_sum(ftp_path) if sftp_md5 is None: print 'sftp_md5值空' return False print "SFTP MD5=", sftp_md5 # 下载文件后计算; if self.sftp_download(ftp_path, local_path) is True: local_md5 = "" md5_local = hashlib.md5() time.sleep(5) file_data = [] if os.path.exists(local_path): with open(local_path, mode='rb') as f: while True: data = f.read(8192) if not data: break md5_local.update(data) local_md5 = md5_local.hexdigest() print u"本地MD5=", local_md5 return True if sftp_md5 == local_md5 else False print u'下载文件失败' return False if __name__ == "__main__": host = "10.201.251.254" user = "wjf" pwd = "wjf2019" myssh2 = baseSSH2() myssh2.init_ssh2(user, pwd, host) # cmd = "md5sum /home/RT2841_2851_dailybuild/DailyBuild_RT2851_0509/signed-ota_rt2851_update.zip" # bolean, data = myssh2.execute_cmd(cmd) # print data.split(' ')[0] # print u'MD5值=', myssh2.get_md5_sum( # "/home/RT2841_2851_dailybuild/DailyBuild_RT2851_0509/signed-ota_rt2851_update.zip") # myssh2.sftp_download("rt2851/Buildimg/V8-T841T01-LF1V001/Images/USB/build.prop", "D:\\sat\\build.prop") # myssh2.sftp_download("/home/RT2841_2851_dailybuild/DailyBuild_RT2851_0509/signed-ota_rt2851_update.zip", "D:\\sat\\a.img") if myssh2.sftp_download_md5("/home/RT2841_2851_dailybuild/DailyBuild_RT2851_0509/signed-ota_rt2851_update.zip", "D:\\sat\\b.img"): print u"下载文件成功"