# -*- coding:utf-8 -*- import time import sys import os import socket from ftplib import FTP_TLS import re # 正则表达式; import shutil class MyFTP: def __init__(self, host, port=21): self.host = host self.port = port self.ftp = FTP_TLS() self.ftp.encoding = 'utf-8' self.log_file = open("log.txt", "a") self.file_list = [] self.dir_list = [] self.cur_dir = '' def login(self, username, password): try: timeout = 60 socket.setdefaulttimeout(timeout) # 0主动模式 1 #被动模式 self.ftp.set_pasv(False) # 打开调试级别2,显示详细信息 # self.ftp.set_debuglevel(2) self.debug_print('开始尝试连接到 %s' % self.host) self.ftp.connect(self.host, self.port) self.debug_print('成功连接到 %s' % self.host) self.debug_print('开始尝试登录到 %s' % self.host) self.ftp.login(username, password) self.ftp.prot_p() self.debug_print('成功登录到 %s' % self.host) self.debug_print(self.ftp.welcome) except Exception as err: self.deal_error("FTP 连接或登录失败 ,错误描述为:%s" % err) pass def is_same_size(self, local_file, remote_file): try: remote_file_size = self.ftp.size(remote_file) except Exception as err: # self.debug_print("is_same_size() 错误描述为:%s" % err) remote_file_size = -1 try: local_file_size = os.path.getsize(local_file) except Exception as err: # self.debug_print("is_same_size() 错误描述为:%s" % err) local_file_size = -1 self.debug_print('local_file_size:%d , remote_file_size:%d' % (local_file_size, remote_file_size)) if remote_file_size == local_file_size: return 1 else: return 0 def download_file(self, local_file, remote_file): """从ftp下载文件 参数: local_file: 本地文件 remote_file: 远程文件 """ self.debug_print("download_file()---> local_path = %s ,remote_path = %s" % (local_file, remote_file)) if self.is_same_size(local_file, remote_file): self.debug_print('%s 文件大小相同,无需下载' % local_file) return else: try: self.debug_print('>>>>>>>>>>>>下载文件 %s ... ...' % local_file) buf_size = 1024 file_handler = open(local_file, 'wb') self.ftp.retrbinary('RETR %s' % remote_file, file_handler.write, buf_size) file_handler.close() except Exception as err: self.debug_print('下载文件出错,出现异常:%s ' % err) return def download_file_tree(self, local_path, remote_path): """从远程目录下载多个文件到本地目录 参数: local_path: 本地路径 remote_path: 远程路径 """ print("download_file_tree()---> local_path = %s ,remote_path = %s" % (local_path, remote_path)) try: self.ftp.cwd(remote_path) except Exception as err: self.debug_print('远程目录%s不存在,继续...' % remote_path + " ,具体错误描述为:%s" % err) return if not os.path.isdir(local_path): self.debug_print('本地目录%s不存在,先创建本地目录' % local_path) os.makedirs(local_path) self.debug_print('切换至目录: %s' % self.ftp.pwd()) self.file_list = [] # 方法回调 self.ftp.dir(self.get_file_list) remote_names = self.file_list self.debug_print('远程目录 列表: %s' % remote_names) for item in remote_names: file_type = item[0] file_name = item[1] local = os.path.join(local_path, file_name) if file_type == 'd': print("download_file_tree()---> 下载目录: %s" % file_name) self.download_file_tree(local, file_name) elif file_type == '-': print("download_file()---> 下载文件: %s" % file_name) self.download_file(local, file_name) self.ftp.cwd("..") self.debug_print('返回上层目录 %s' % self.ftp.pwd()) return True def upload_file(self, local_file, remote_file): """从本地上传文件到ftp 参数: local_path: 本地文件 remote_path: 远程文件 """ if not os.path.isfile(local_file): self.debug_print('%s 不存在' % local_file) return if self.is_same_size(local_file, remote_file): self.debug_print('跳过相等的文件: %s' % local_file) return buf_size = 1024 file_handler = open(local_file, 'rb') self.ftp.storbinary('STOR %s' % remote_file, file_handler, buf_size) file_handler.close() self.debug_print('上传: %s' % local_file + "成功!") def upload_file_tree(self, local_path, remote_path): """从本地上传目录下多个文件到ftp 参数: local_path: 本地路径 remote_path: 远程路径 """ if not os.path.isdir(local_path): self.debug_print('本地目录 %s 不存在' % local_path) return """ 创建服务器目录 """ try: self.ftp.cwd(remote_path) # 切换工作路径 except Exception as e: base_dir, part_path = self.ftp.pwd(), remote_path.split('/') print e,base_dir,part_path for p in part_path[1:]: print "子目录",p base_dir = base_dir + p + '/' # 拼接子目录 try: self.ftp.cwd(base_dir) # 切换到子目录, 不存在则异常 except Exception as e: print('INFO:', e) self.ftp.mkd(base_dir) # 不存在创建当前子目录 self.ftp.cwd(remote_path) self.debug_print('切换至远程目录: %s' % self.ftp.pwd()) local_name_list = os.listdir(local_path) self.debug_print('本地目录list: %s' % local_name_list) #self.debug_print('判断是否有服务器目录: %s' % os.path.isdir()) for local_name in local_name_list: src = os.path.join(local_path, local_name) print("src路径=========="+src) if os.path.isdir(src): try: self.ftp.mkd(local_name) except Exception as err: self.debug_print("目录已存在 %s ,具体错误描述为:%s" % (local_name, err)) self.debug_print("upload_file_tree()---> 上传目录: %s" % local_name) self.debug_print("upload_file_tree()---> 上传src目录: %s" % src) self.upload_file_tree(src, local_name) else: self.debug_print("upload_file_tree()---> 上传文件: %s" % local_name) self.upload_file(src, local_name) self.ftp.cwd("..") def close(self): """ 退出ftp """ self.debug_print("close()---> FTP退出") self.ftp.quit() self.log_file.close() def debug_print(self, s): """ 打印日志 """ self.write_log(s) def deal_error(self, e): """ 处理错误异常 参数: e:异常 """ log_str = '发生错误: %s' % e self.write_log(log_str) sys.exit() def write_log(self, log_str): """ 记录日志 参数: log_str:日志 """ time_now = time.localtime() date_now = time.strftime('%Y-%m-%d', time_now) format_log_str = "%s ---> %s \n " % (date_now, log_str) print(format_log_str) self.log_file.write(format_log_str) def get_file_list(self, line): """ 获取文件列表 参数: line: """ file_arr = self.get_file_name(line) # 去除 . 和 .. if file_arr[1] not in ['.', '..']: self.file_list.append(file_arr) def get_file_name(self, line): """ 获取文件名 参数: line: """ pos = line.rfind(':') while (line[pos] != ' '): pos += 1 while (line[pos] == ' '): pos += 1 file_arr = [line[0], line[pos:]] return file_arr def get_dirs(self): self.get_dir() self.dir_list = [] self.ftp.dir(self.callback_get_dirs) return self.dir_list def callback_get_dirs(self, line): # 正则表达式; # 字串:drwxrwxrwx 1 user group 0 Dec 28 18:35 V8-T851T01-LF1V024-CTS # 以时间冒号为分隔符; p = re.compile(r"(.*):(\d+) (.*)", re.DOTALL) mo = p.search(line) if mo is not None: ftpdir = mo.group(3) if ftpdir != '.' and ftpdir != '..': self.dir_list.append(self.cur_dir + '/' + ftpdir) else: print "正规匹配失败",line def get_files(self, path): filelist = self.ftp.nlst(path) for file in filelist: self.file_list.append(path+'/'+file) return self.file_list def set_dir(self, path): self.ftp.cwd(path) def get_dir(self): self.cur_dir = self.ftp.pwd() return self.cur_dir FTPHOST='10.118.1.85' FTPUSER='你的账号' FTPPWD='你的密码' FTPDIR='/ProjectSoftware/TEST/TV/Regional_Customers/RT2851_MOKA/cts-auth/22Q1/approved' LOCALDIR41=r'F:\22Q1-CTS\41' LOCALDIR51=r'F:\22Q1-CTS\51' # 下载指定ftp路径内的所有的prop文件 def download_prop(ftp_dir, local_dir): ftp = MyFTP(FTPHOST) print ftp.login(FTPUSER, FTPPWD) print ftp.set_dir(FTPDIR) print ftp.get_dirs() if os.path.exists(local_dir): shutil.rmtree(local_dir) if not os.path.exists(local_dir): os.makedirs(local_dir) ftp.file_list = [] for path in ftp.dir_list: ftp.get_files(path) for i in range(ftp.file_list.__len__()-1, -1, -1): path = ftp.file_list[i] (filepath,filename) = os.path.split(path) if filename != 'build.prop': ftp.file_list.remove(path) file_list = [] for i in range(0,ftp.file_list.__len__()): path = ftp.file_list[i] (filepath, filename) = os.path.split(path) file_list.append(local_dir+'\\'+str(i)+filename) ftp.download_file(local_dir+'\\'+str(i)+filename, path) return file_list # 解析指定的prop文件; def parse_prop(path, find_objs): brand='' prop_list = [] if os.path.exists(path) is False: return with open(path, 'r') as f: content = f.read() for obj in find_objs: res=r"\n%s=(.*)\n"%obj print "表达式:",res p = re.compile(res) mo = p.search(content) if mo is not None: prop = mo.group() if prop.find("test-keys"): prop = prop.replace("test-keys", "release-keys") prop=prop.strip('\n') prop_list.append(prop) if obj == 'ro.product.brand': brand=prop.replace(obj+'=','') print "prop_list=",prop_list (filepath,filename) = os.path.split(path) if prop_list.__len__() > 0: prop_file = "%s\\%s.prop" % (filepath, brand.lower()) if os.path.exists(prop_file): os.remove(prop_file) print "prop文件:",prop_file with open(prop_file, 'ab+') as f: for prop in prop_list: f.writelines(prop+'\n') # 移除原文件; os.remove(path) return content.split('\n') # 将待匹配字符串保存在数组中 if __name__ == "__main__": file_list = download_prop(FTPDIR, LOCALDIR51) for file in file_list: parse_prop(file, ['ro.product.model', 'ro.product.brand', 'ro.product.name', 'ro.build.flavor', 'ro.build.date', 'ro.build.fingerprint', 'ro.build.date.utc', 'ro.build.description', 'ro.build.version.incremental'] ) print "FTP-OK"