#!/usr/bin/env python # -*- coding: utf-8 -*- """ ::Timeout atx-agent:ReverseProxy use http.DefaultTransport. Default Timeout: 30s |-- Dial --|-- TLS handshake --|-- Request --|-- Resp.headers --|-- Respose.body --| |------------------------------ http.Client.Timeout -------------------------------| Refs: - https://golang.org/pkg/net/http/#RoundTripper - http://colobu.com/2016/07/01/the-complete-guide-to-golang-net-http-timeouts """ from __future__ import absolute_import, print_function import functools import hashlib import io import json import os import re import shutil import subprocess import sys import threading import time import warnings from collections import namedtuple from datetime import datetime from subprocess import list2cmdline import humanize import progress.bar import requests import six import six.moves.urllib.parse as urlparse from retry import retry from ssat_sdk.uiautomator2 import adbutils from ssat_sdk.uiautomator2.exceptions import (GatewayError, JsonRpcError, ConnectError, NullObjectExceptionError, NullPointerExceptionError, SessionBrokenError, StaleObjectExceptionError, UiaError, UiAutomationNotConnectedError, UiObjectNotFoundError) from ssat_sdk.uiautomator2.session import Session, set_fail_prompt # noqa: F401 from ssat_sdk.uiautomator2.version import __atx_agent_version__ if six.PY2: FileNotFoundError = OSError DEBUG = False HTTP_TIMEOUT = 60 class _ProgressBar(progress.bar.Bar): message = "progress" suffix = '%(percent)d%% [%(eta_td)s, %(speed)s]' @property def speed(self): return humanize.naturalsize( self.elapsed and self.index / self.elapsed, gnu=True) + '/s' def log_print(s): thread_name = threading.current_thread().getName() print(thread_name + ": " + datetime.now().strftime('%H:%M:%S,%f')[:-3] + " " + s) def _is_wifi_addr(addr): if not addr: return False if re.match(r"^https?://", addr): return True m = re.search(r"(\d+\.\d+\.\d+\.\d+)", addr) if m and m.group(1) != "127.0.0.1": return True return False def connect(addr=None): """ Args: addr (str): uiautomator server address or serial number. default from env-var ANDROID_DEVICE_IP Returns: UIAutomatorServer Raises: ConnectError Example: connect("10.0.0.1:7912") connect("10.0.0.1") # use default 7912 port connect("http://10.0.0.1") connect("http://10.0.0.1:7912") connect("cff1123ea") # adb device serial number """ if not addr or addr == '+': addr = os.getenv('ANDROID_DEVICE_IP') if _is_wifi_addr(addr): return connect_wifi(addr) return connect_usb(addr) def connect_adb_wifi(addr): """ Run adb connect, and then call connect_usb(..) Args: addr: ip+port which can be used for "adb connect" argument Raises: ConnectError """ assert isinstance(addr, six.string_types) subprocess.call([adbutils.adb_path(), "connect", addr]) try: subprocess.call([adbutils.adb_path(), "-s", addr, "wait-for-device"], timeout=2) except subprocess.TimeoutExpired: raise ConnectError("Fail execute", "adb connect " + addr) return connect_usb(addr) def connect_usb(serial=None, healthcheck=True): print("uiautomator2.__init.connect_usb: serial=",serial,";healthcheck=",healthcheck) """ Args: serial (str): android device serial healthcheck (bool): start uiautomator if not ready Returns: UIAutomatorServer Raises: ConnectError """ adb = adbutils.AdbClient() if not serial: device = adb.must_one_device() print("uiautomator2.__init.connect_usb: not serial, device=", device) else: device = adbutils.AdbDevice(adb, serial) # adb = adbutils.Adb(serial) global connect_serial connect_serial = device.serial log_print("connect_serial:%s" % str(connect_serial)) lport = device.forward_port(7912) print("uiautomator2.__init.connect_usb: lport=", lport) d = connect_wifi('127.0.0.1:' + str(lport)) print("uiautomator2.__init.connect_usb: UIAutomatorServer=", d) if healthcheck: if not d.agent_alive: warnings.warn("backend atx-agent is not alive, start again ...", RuntimeWarning) # TODO: /data/local/tmp might not be execuable and atx-agent can be somewhere else device.shell_output("/data/local/tmp/atx-agent", "server", "-d") deadline = time.time() + 3 while time.time() < deadline: if d.alive: break elif not d.alive: warnings.warn("backend uiautomator2 is not alive, start again ...", RuntimeWarning) d.reset_uiautomator() print("uiautomator2.__init.connect_usb: atx-agent is alive and uiautomator app is alive!") return d def connect_wifi(addr=None): """ Args: addr (str) uiautomator server address. Returns: UIAutomatorServer Raises: ConnectError Examples: connect_wifi("10.0.0.1") """ print("uiautomator2.__init.connect_wifi: start") if '://' not in addr: addr = 'http://' + addr if addr.startswith('http://'): u = urlparse.urlparse(addr) host = u.hostname port = u.port or 7912 return UIAutomatorServer(host, port) else: raise ConnectError("address should start with http://") connect_serial = None def check_adb_connect(): if not connect_serial: return True CMD_ADB_CHECK = "adb -s %s shell getprop ro.build.version.sdk" % connect_serial p = subprocess.Popen(CMD_ADB_CHECK, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outInfo = p.stdout.read() # log_print(u"查询SDK版本号outInfo:%s" % str(outInfo)) errInfo = p.stderr.read() p.kill() if errInfo.__len__() > 0: log_print(u"查询SDK版本号失败,出错信息:%s" % str(errInfo)) return False else: return True def check_adb_connect_type(): CMD_ADB_DEVICES = "adb devices" p = subprocess.Popen(CMD_ADB_DEVICES, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outInfo = p.stdout.read() if "192.168" in outInfo: return "tcp/ip" else: return "usb" def disconnectAdb(): if not connect_serial: return True # CMD_ADB_DISCONNECT = "adb disconnect %s" % connect_serial CMD_ADB_DISCONNECT = "adb disconnect %s" % str(connect_serial) p = subprocess.Popen(CMD_ADB_DISCONNECT, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.kill() def connectAdb(): if not connect_serial: return True CMD_ADB_CONNECT = "adb connect %s" % connect_serial p = subprocess.Popen(CMD_ADB_CONNECT, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE) log_print(u"执行adb重连机制CMD_ADB_CONNECT:%s" % str(CMD_ADB_CONNECT)) p.kill() def killAdb(): if not connect_serial: return True CMD_ADB_KILL_Server = "adb kill-server" p = subprocess.Popen(CMD_ADB_KILL_Server, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.kill() def confirmAdb(serial=None): # 如果传来的serial不为空,则重新连接传入的serial对应的设备 if serial: global connect_serial connect_serial = serial if not connect_serial: # log_print("序列为空返回!") return True # ret = check_adb_connect() if check_adb_connect(): # log_print("adb已经连接connect_serial:%s" % str(connect_serial)) return True else: log_print(u"connect_serial:%s,Adb 未连接,尝试重连!!!" % str(connect_serial)) count = 0 while True: if count > 30: return False break # print(u"adb重连第count:%s次" % str(count)) log_print(u"adb重连第count:%s次" % str(count)) disconnectAdb() time.sleep(5) if count > 1: log_print(u"adb重连大于2次,仍未连接成功,kill adb server") killAdb() time.sleep(5) # disconnectAdb() # time.sleep(5) if check_adb_connect(): # print(u"adb重连成功count:%s" % str(count)) log_print(u"adb重连成功count:%s" % str(count)) # printLog(u"adb重连成功count:%s"%str(count)) # 重连之后要重新构建连接 return True connectAdb() time.sleep(5) if check_adb_connect(): # print(u"adb重连成功count:%s" % str(count)) log_print(u"adb重连成功count:%s" % str(count)) # printLog(u"adb重连成功count:%s"%str(count)) # 重连之后要重新构建连接 return True break else: log_print(u"adb重连失败count:%s" % str(count)) count = count + 1 time.sleep(1) class TimeoutRequestsSession(requests.Session): def __init__(self): super(TimeoutRequestsSession, self).__init__() # refs: https://stackoverflow.com/questions/33895739/python-requests-cant-load-any-url-remote-end-closed-connection-without-respo adapter = requests.adapters.HTTPAdapter(max_retries=3) self.mount("http://", adapter) self.mount("https://", adapter) def request(self, method, url, **kwargs): if kwargs.get('timeout') is None: kwargs['timeout'] = HTTP_TIMEOUT verbose = hasattr(self, 'debug') and self.debug if verbose: data = kwargs.get('data') or '""' if isinstance(data, dict): data = json.dumps(data) time_start = time.time() print( datetime.now().strftime("%H:%M:%S.%f")[:-3], "$ curl -X {method} -d '{data}' '{url}'".format( method=method, url=url, data=data)) try: resp = super(TimeoutRequestsSession, self).request( method, url, **kwargs) except requests.ConnectionError: raise EnvironmentError( "atx-agent is not running. Fix it with following steps.\n1. Plugin device into computer.\n2. Run command \"python -m uiautomator2 init\"" ) else: if verbose: print( datetime.now().strftime("%H:%M:%S.%f")[:-3], "Response (%d ms) >>>\n" % ( (time.time() - time_start) * 1000) + resp.text.rstrip() + "\n<<< END") return resp def plugin_register(name, plugin, *args, **kwargs): """ Add plugin into UIAutomatorServer Args: name: string plugin: class or function which take d as first parameter Example: def upload_screenshot(d): def inner(): d.screenshot("tmp.jpg") # use requests.post upload tmp.jpg return inner plugin_register("upload_screenshot", save_screenshot) d = u2.connect() d.ext_upload_screenshot() """ UIAutomatorServer.plugins()[name] = (plugin, args, kwargs) def plugin_clear(): UIAutomatorServer.plugins().clear() class UIAutomatorServer(object): __isfrozen = False __plugins = {} def __init__(self, host, port=7912): """ Args: host (str): host address port (int): port number Raises: EnvironmentError """ self._host = host self._port = port self._reqsess = TimeoutRequestsSession( ) # use requests.Session to enable HTTP Keep-Alive self._server_url = 'http://{}:{}'.format(host, port) self._server_jsonrpc_url = self._server_url + "/jsonrpc/0" self._default_session = Session(self, None) self._cached_plugins = {} self.__devinfo = None self._hooks = {} self.platform = None # hot fix for weditor self.ash = AdbShell(self.shell) # the powerful adb shell self.wait_timeout = 20.0 # wait element timeout self.click_post_delay = None # wait after each click self._freeze() # prevent creating new attrs # self._atx_agent_check() def _freeze(self): self.__isfrozen = True @staticmethod def plugins(): return UIAutomatorServer.__plugins def __setattr__(self, key, value): """ Prevent creating new attributes outside __init__ """ if self.__isfrozen and not hasattr(self, key): raise TypeError("Key %s does not exist in class %r" % (key, self)) object.__setattr__(self, key, value) def __str__(self): return 'uiautomator2 object for %s:%d' % (self._host, self._port) def __repr__(self): return str(self) def _atx_agent_check(self): """ check atx-agent health status and version """ try: version = self._reqsess.get( self.path2url('/version'), timeout=5).text if version != __atx_agent_version__: warnings.warn( 'Version dismatch, expect "%s" actually "%s"' % (__atx_agent_version__, version), Warning, stacklevel=2) # Cancel bellow code to make connect() return faster. # launch service to prevent uiautomator killed by Android system # self.adb_shell('am', 'startservice', '-n', 'com.github.uiautomator/.Service') except (requests.ConnectionError, ) as e: raise EnvironmentError( "atx-agent is not responding, need to init device first") @property def debug(self): return hasattr(self._reqsess, 'debug') and self._reqsess.debug @debug.setter def debug(self, value): self._reqsess.debug = bool(value) @property def serial(self): return self.shell(['getprop', 'ro.serialno'])[0].strip() @property def jsonrpc(self): """ Make jsonrpc call easier For example: self.jsonrpc.pressKey("home") """ return self.setup_jsonrpc() def path2url(self, path): return urlparse.urljoin(self._server_url, path) def window_size(self): """ return (width, height) """ info = self._reqsess.get(self.path2url('/info')).json() w, h = info['display']['width'], info['display']['height'] if (w > h) != (self.info["displayRotation"] % 2 == 1): w, h = h, w return w, h def hooks_register(self, func): """ Args: func: should accept 3 args. func_name:string, args:tuple, kwargs:dict """ self._hooks[func] = True def hooks_apply(self, stage, func_name, args=(), kwargs={}, ret=None): """ Args: stage(str): one of "before" or "after" """ for fn in self._hooks.keys(): fn(stage, func_name, args, kwargs, ret) def setup_jsonrpc(self, jsonrpc_url=None): """ Wrap jsonrpc call into object Usage example: self.setup_jsonrpc().pressKey("home") """ if not jsonrpc_url: jsonrpc_url = self._server_jsonrpc_url class JSONRpcWrapper(): def __init__(self, server): self.server = server self.method = None def __getattr__(self, method): self.method = method # jsonrpc function name return self def __call__(self, *args, **kwargs): http_timeout = kwargs.pop('http_timeout', HTTP_TIMEOUT) params = args if args else kwargs return self.server.jsonrpc_retry_call(jsonrpc_url, self.method, params, http_timeout) return JSONRpcWrapper(self) def jsonrpc_retry_call(self, *args, **kwargs): # method, params=[], http_timeout=60): try: return self.jsonrpc_call(*args, **kwargs) except (GatewayError, ): warnings.warn( "uiautomator2 is not reponding, restart uiautomator2 automatically", RuntimeWarning, stacklevel=1) # for XiaoMi, want to recover uiautomator2 must start app:com.github.uiautomator self.reset_uiautomator() return self.jsonrpc_call(*args, **kwargs) except UiAutomationNotConnectedError: warnings.warn( "UiAutomation not connected, restart uiautoamtor", RuntimeWarning, stacklevel=1) self.reset_uiautomator() return self.jsonrpc_call(*args, **kwargs) except (NullObjectExceptionError, NullPointerExceptionError, StaleObjectExceptionError) as e: if args[1] != 'dumpWindowHierarchy': # args[1] method warnings.warn( "uiautomator2 raise exception %s, and run code again" % e, RuntimeWarning, stacklevel=1) time.sleep(1) return self.jsonrpc_call(*args, **kwargs) def jsonrpc_call(self, jsonrpc_url, method, params=[], http_timeout=60): """ jsonrpc2 call Refs: - http://www.jsonrpc.org/specification """ request_start = time.time() data = { "jsonrpc": "2.0", "id": self._jsonrpc_id(method), "method": method, "params": params, } data = json.dumps(data).encode('utf-8') res = self._reqsess.post( jsonrpc_url, # +"?m="+method, #?method is for debug headers={"Content-Type": "application/json"}, timeout=http_timeout, data=data) if DEBUG: print("Shell$ curl -X POST -d '{}' {}".format(data, jsonrpc_url)) print("Output> " + res.text) if res.status_code == 502: raise GatewayError( res, "gateway error, time used %.1fs" % (time.time() - request_start)) if res.status_code == 410: # http status gone: session broken raise SessionBrokenError("app quit or crash", jsonrpc_url, res.text) if res.status_code != 200: raise UiaError(jsonrpc_url, data, res.status_code, res.text, "HTTP Return code is not 200", res.text) jsondata = res.json() error = jsondata.get('error') if not error: return jsondata.get('result') # error happends err = JsonRpcError(error, method) if isinstance( err.data, six.string_types) and 'UiAutomation not connected' in err.data: err.__class__ = UiAutomationNotConnectedError elif err.message: if 'uiautomator.UiObjectNotFoundException' in err.message: err.__class__ = UiObjectNotFoundError elif 'android.support.test.uiautomator.StaleObjectException' in err.message: # StaleObjectException # https://developer.android.com/reference/android/support/test/uiautomator/StaleObjectException.html # A StaleObjectException exception is thrown when a UiObject2 is used after the underlying View has been destroyed. # In this case, it is necessary to call findObject(BySelector) to obtain a new UiObject2 instance. err.__class__ = StaleObjectExceptionError elif 'java.lang.NullObjectException' in err.message: err.__class__ = NullObjectExceptionError elif 'java.lang.NullPointerException' == err.message: err.__class__ = NullPointerExceptionError raise err def _jsonrpc_id(self, method): m = hashlib.md5() m.update(("%s at %f" % (method, time.time())).encode("utf-8")) return m.hexdigest() @property def agent_alive(self): try: r = self._reqsess.get(self.path2url('/version'), timeout=2) return r.status_code == 200 except: return False @property def alive(self): try: r = self._reqsess.get(self.path2url('/ping'), timeout=2) if r.status_code != 200: return False r = self._reqsess.post( self.path2url('/jsonrpc/0'), data=json.dumps({ "jsonrpc": "2.0", "id": 1, "method": "deviceInfo" }), timeout=2) if r.status_code != 200: return False if r.json().get('error'): return False return True except requests.exceptions.ReadTimeout: return False except EnvironmentError: return False def service(self, name): """ Manage service start or stop Example: d.service("uiautomator").start() d.service("uiautomator").stop() """ u2obj = self class _Service(object): def __init__(self, name): self.name = name # FIXME(ssx): support other service: minicap, minitouch assert name == 'uiautomator' def start(self): res = u2obj._reqsess.post(u2obj.path2url('/uiautomator')) res.raise_for_status() def stop(self): res = u2obj._reqsess.delete(u2obj.path2url('/uiautomator')) if res.status_code != 200: warnings.warn(res.text) def running(self): res = u2obj._reqsess.get(u2obj.path2url("/uiautomator")) res.raise_for_status() return res.json().get("running") return _Service(name) @property def uiautomator(self): return self.service("uiautomator") def reset_uiautomator(self): """ Reset uiautomator Raises: RuntimeError """ # self.open_identify() stopUrl = self.path2url('/uiautomator') print("reset_uiautomator:stopUrl=",stopUrl) self._reqsess.delete(stopUrl) # stop uiautomator keeper first # wait = not unlock # should not wait IdentifyActivity open or it will stuck sometimes # self.app_start( # may also stuck here. # 'com.github.uiautomator', # '.MainActivity', # wait=False, # stop=True) time.sleep(.5) # launch atx-agent uiautomator keeper startUrl = self.path2url('/uiautomator') print("reset_uiautomator:startUrl=",startUrl) self._reqsess.post(startUrl) # wait until uiautomator2 service working deadline = time.time() + 120.0 self.shell([ 'am', 'start', '-n', 'com.github.uiautomator/.MainActivity' ]) time.sleep(1) while time.time() < deadline: print( time.strftime("[%Y-%m-%d %H:%M:%S]"), "uiautomator is starting ...") if self.alive: # keyevent BACK if current is com.github.uiautomator # XiaoMi uiautomator will kill the app(com.github.uiautomator) when launch # it is better to start a service to make uiautomator live longer if self.current_app()['package'] != 'com.github.uiautomator': self.shell([ 'am', 'startservice', '-n', 'com.github.uiautomator/.Service' ]) time.sleep(1.5) else: time.sleep(.5) self.shell(['input', 'keyevent', 'BACK']) print("uiautomator back to normal") return True time.sleep(1) self.shell(['input', 'keyevent', 'KEYCODE_VOLUME_UP']) self.shell(['input', 'keyevent', 'KEYCODE_VOLUME_DOWN']) raise RuntimeError( "Uiautomator started failed. Find solutions in https://github.com/openatx/uiautomator2/wiki/Common-issues" ) def healthcheck(self): """ Reset device into health state Raises: RuntimeError """ sh = self.ash if not sh.is_screen_on(): print(time.strftime("[%Y-%m-%d %H:%M:%S]"), "wakeup screen") sh.keyevent("WAKEUP") sh.keyevent("HOME") sh.swipe(0.1, 0.9, 0.9, 0.1) # swipe to unlock sh.keyevent("HOME") sh.keyevent("BACK") self.reset_uiautomator() def app_install(self, url, installing_callback=None, server=None): """ {u'message': u'downloading', "progress": {u'totalSize': 407992690, u'copiedSize': 49152}} Returns: packageName Raises: RuntimeError """ r = self._reqsess.post(self.path2url('/install'), data={'url': url}) if r.status_code != 200: raise RuntimeError("app install error:", r.text) id = r.text.strip() print(time.strftime('%H:%M:%S'), "id:", id) return self._wait_install_finished(id, installing_callback) def _wait_install_finished(self, id, installing_callback): bar = None downloaded = True while True: resp = self._reqsess.get(self.path2url('/install/' + id)) resp.raise_for_status() jdata = resp.json() message = jdata['message'] pg = jdata.get('progress') def notty_print_progress(pg): written = pg['copiedSize'] total = pg['totalSize'] print( time.strftime('%H:%M:%S'), 'downloading %.1f%% [%s/%s]' % (100.0 * written / total, humanize.naturalsize(written, gnu=True), humanize.naturalsize(total, gnu=True))) if message == 'downloading': downloaded = False if pg: # if there is a progress if hasattr(sys.stdout, 'isatty'): if sys.stdout.isatty(): if not bar: bar = _ProgressBar( time.strftime('%H:%M:%S') + ' downloading', max=pg['totalSize']) written = pg['copiedSize'] bar.next(written - bar.index) else: notty_print_progress(pg) else: pass else: print(time.strftime('%H:%M:%S'), "download initialing") else: if not downloaded: downloaded = True if bar: # bar only set in atty bar.next(pg['copiedSize'] - bar.index) if pg else None bar.finish() else: print(time.strftime('%H:%M:%S'), "download 100%") print(time.strftime('%H:%M:%S'), message) if message == 'installing': if callable(installing_callback): installing_callback(self) if message == 'success installed': return jdata.get('packageName') if jdata.get('error'): raise RuntimeError("error", jdata.get('error')) try: time.sleep(1) except KeyboardInterrupt: bar.finish() if bar else None print("keyboard interrupt catched, cancel install id", id) self._reqsess.delete(self.path2url('/install/' + id)) raise def shell(self, cmdargs, stream=False, timeout=60): """ Run adb shell command with arguments and return its output. Require atx-agent >=0.3.3 Args: cmdargs: str or list, example: "ls -l" or ["ls", "-l"] timeout: seconds of command run, works on when stream is False stream: bool used for long running process. Returns: (output, exit_code) when stream is False requests.Response when stream is True, you have to close it after using Raises: RuntimeError For atx-agent is not support return exit code now. When command got something wrong, exit_code is always 1, otherwise exit_code is always 0 """ print("uiautomator2.__init__.shell:cmd=",cmdargs) if isinstance(cmdargs, (list, tuple)): cmdargs = list2cmdline(cmdargs) if stream: return self._reqsess.get( self.path2url("/shell/stream"), params={"command": cmdargs}, stream=True) ret = self._reqsess.post( self.path2url('/shell'), data={ 'command': cmdargs, 'timeout': str(timeout) }, timeout=timeout+10) if ret.status_code != 200: raise RuntimeError( "device agent responds with an error code %d" % ret.status_code, ret.text) resp = ret.json() exit_code = 1 if resp.get('error') else 0 exit_code = resp.get('exitCode', exit_code) shell_response = namedtuple("ShellResponse", ("output", "exit_code")) return shell_response(resp.get('output'), exit_code) def adb_shell(self, *args): """ Example: adb_shell('pwd') adb_shell('ls', '-l') adb_shell('ls -l') Returns: string for stdout merged with stderr, after the entire shell command is completed. """ # print( # "DeprecatedWarning: adb_shell is deprecated, use: output, exit_code = shell(['ls', '-l']) instead" # ) cmdline = args[0] if len(args) == 1 else list2cmdline(args) return self.shell(cmdline)[0] def app_start(self, pkg_name, activity=None, extras={}, wait=True, stop=False, unlock=False): """ Launch application Args: pkg_name (str): package name activity (str): app activity stop (bool): Stop app before starting the activity. (require activity) """ if unlock: self.unlock() if activity: # -D: enable debugging # -W: wait for launch to complete # -S: force stop the target app before starting the activity # --user | current: Specify which user to run as; if not # specified then run as the current user. # -e # --ei # --ez args = [ 'am', 'start', '-a', 'android.intent.action.MAIN', '-c', 'android.intent.category.LAUNCHER' ] if wait: args.append('-W') if stop: args.append('-S') args += ['-n', '{}/{}'.format(pkg_name, activity)] # -e --ez extra_args = [] for k, v in extras.items(): if isinstance(v, bool): extra_args.extend(['--ez', k, 'true' if v else 'false']) elif isinstance(v, int): extra_args.extend(['--ei', k, str(v)]) else: extra_args.extend(['-e', k, v]) args += extra_args # 'am', 'start', '-W', '-n', '{}/{}'.format(pkg_name, activity)) self.shell(args) else: if stop: self.app_stop(pkg_name) self.shell([ 'monkey', '-p', pkg_name, '-c', 'android.intent.category.LAUNCHER', '1' ]) @retry(EnvironmentError, delay=.5, tries=3, jitter=.1) def current_app(self): """ Returns: dict(package, activity, pid?) Raises: EnvironementError For developer: Function reset_uiautomator need this function, so can't use jsonrpc here. """ # Related issue: https://github.com/openatx/uiautomator2/issues/200 # $ adb shell dumpsys window windows # Example output: # mCurrentFocus=Window{41b37570 u0 com.incall.apps.launcher/com.incall.apps.launcher.Launcher} # mFocusedApp=AppWindowToken{422df168 token=Token{422def98 ActivityRecord{422dee38 u0 com.example/.UI.play.PlayActivity t14}}} # Regexp # r'mFocusedApp=.*ActivityRecord{\w+ \w+ (?P.*)/(?P.*) .*' # r'mCurrentFocus=Window{\w+ \w+ (?P.*)/(?P.*)\}') _focusedRE = re.compile( r'mCurrentFocus=Window{.*\s+(?P[^\s]+)/(?P[^\s]+)\}' ) m = _focusedRE.search(self.shell(['dumpsys', 'window', 'windows'])[0]) if m: return dict( package=m.group('package'), activity=m.group('activity')) # try: adb shell dumpsys activity top _activityRE = re.compile( r'ACTIVITY (?P[^\s]+)/(?P[^/\s]+) \w+ pid=(?P\d+)' ) output, _ = self.shell(['dumpsys', 'activity', 'top']) ms = _activityRE.finditer(output) ret = None for m in ms: ret = dict( package=m.group('package'), activity=m.group('activity'), pid=int(m.group('pid'))) if ret: # get last result return ret raise EnvironmentError("Couldn't get focused app") def wait_activity(self, activity, timeout=10): """ wait activity Args: activity (str): name of activity timeout (float): max wait time Returns: bool of activity """ deadline = time.time() + timeout while time.time() < deadline: current_activity = self.current_app().get('activity') if activity == current_activity: return True time.sleep(.5) return False def app_stop(self, pkg_name): """ Stop one application: am force-stop""" self.shell(['am', 'force-stop', pkg_name]) def app_stop_all(self, excludes=[]): """ Stop all third party applications Args: excludes (list): apps that do now want to kill Returns: a list of killed apps """ our_apps = ['com.github.uiautomator', 'com.github.uiautomator.test'] output, _ = self.shell(['pm', 'list', 'packages', '-3']) pkgs = re.findall('package:([^\s]+)', output) process_names = re.findall('([^\s]+)$', self.shell('ps')[0], re.M) kill_pkgs = set(pkgs).intersection(process_names).difference(our_apps + excludes) kill_pkgs = list(kill_pkgs) for pkg_name in kill_pkgs: self.app_stop(pkg_name) return kill_pkgs def app_clear(self, pkg_name): """ Stop and clear app data: pm clear """ self.shell(['pm', 'clear', pkg_name]) def app_uninstall(self, pkg_name): """ Uninstall an app """ self.shell(["pm", "uninstall", pkg_name]) def app_uninstall_all(self, excludes=[], verbose=False): """ Uninstall all apps """ our_apps = ['com.github.uiautomator', 'com.github.uiautomator.test'] output, _ = self.shell(['pm', 'list', 'packages', '-3']) pkgs = re.findall('package:([^\s]+)', output) pkgs = set(pkgs).difference(our_apps + excludes) pkgs = list(pkgs) for pkg_name in pkgs: if verbose: print("uninstalling", pkg_name) self.app_uninstall(pkg_name) return pkgs def unlock(self): """ unlock screen """ self.open_identify() self._default_session.press("home") def open_identify(self, theme='black'): """ Args: theme (str): black or red """ self.shell([ 'am', 'start', '-W', '-n', 'com.github.uiautomator/.IdentifyActivity', '-e', 'theme', theme ]) def _pidof_app(self, pkg_name): """ Return pid of package name """ text = self._reqsess.get(self.path2url('/pidof/' + pkg_name)).text if text.isdigit(): return int(text) def push_url(self, url, dst, mode=0o644): """ Args: url (str): http url address dst (str): destination mode (str): file mode Raises: FileNotFoundError(py3) OSError(py2) """ modestr = oct(mode).replace('o', '') r = self._reqsess.post( self.path2url('/download'), data={ 'url': url, 'filepath': dst, 'mode': modestr }) if r.status_code != 200: raise IOError("push-url", "%s -> %s" % (url, dst), r.text) key = r.text.strip() while 1: r = self._reqsess.get(self.path2url('/download/' + key)) jdata = r.json() message = jdata.get('message') if message == 'downloaded': log_print("downloaded") break elif message == 'downloading': progress = jdata.get('progress') if progress: copied_size = progress.get('copiedSize') total_size = progress.get('totalSize') log_print("{} {} / {}".format( message, humanize.naturalsize(copied_size), humanize.naturalsize(total_size))) else: log_print("downloading") else: log_print("unknown json:" + str(jdata)) raise IOError(message) time.sleep(1) def push(self, src, dst, mode=0o644): """ Args: src (path or fileobj): source file dst (str): destination can be folder or file path Returns: dict object, for example: {"mode": "0660", "size": 63, "target": "/sdcard/ABOUT.rst"} Since chmod may fail in android, the result "mode" may not same with input args(mode) Raises: IOError(if push got something wrong) """ modestr = oct(mode).replace('o', '') pathname = self.path2url('/upload/' + dst.lstrip('/')) if isinstance(src, six.string_types): src = open(src, 'rb') r = self._reqsess.post( pathname, data={'mode': modestr}, files={'file': src}) if r.status_code == 200: return r.json() raise IOError("push", "%s -> %s" % (src, dst), r.text) def pull(self, src, dst): """ Pull file from device to local Raises: FileNotFoundError(py3) OSError(py2) Require atx-agent >= 0.0.9 """ pathname = self.path2url("/raw/" + src.lstrip("/")) r = self._reqsess.get(pathname, stream=True) if r.status_code != 200: raise FileNotFoundError("pull", src, r.text) with open(dst, 'wb') as f: shutil.copyfileobj(r.raw, f) def pull_content(self, src): """ Read remote file content Raises: FileNotFoundError """ pathname = self.path2url("/raw/" + src.lstrip("/")) r = self._reqsess.get(pathname) if r.status_code != 200: raise FileNotFoundError("pull", src, r.text) return r.content @property def screenshot_uri(self): return 'http://%s:%d/screenshot/0' % (self._host, self._port) def screenshot(self, *args, **kwargs): """ Take screenshot of device Returns: PIL.Image """ return self.session().screenshot(*args, **kwargs) @property def device_info(self): if self.__devinfo: return self.__devinfo self.__devinfo = self._reqsess.get(self.path2url('/info')).json() return self.__devinfo def app_info(self, pkg_name): """ Get app info Args: pkg_name (str): package name Return example: { "mainActivity": "com.github.uiautomator.MainActivity", "label": "ATX", "versionName": "1.1.7", "versionCode": 1001007, "size":1760809 } Raises: UiaError """ url = self.path2url('/packages/{0}/info'.format(pkg_name)) resp = self._reqsess.get(url) resp.raise_for_status() resp = resp.json() if not resp.get('success'): raise UiaError(resp.get('description', 'unknown')) return resp.get('data') def app_icon(self, pkg_name): """ Returns: PIL.Image Raises: UiaError """ from PIL import Image url = self.path2url('/packages/{0}/icon'.format(pkg_name)) resp = self._reqsess.get(url) resp.raise_for_status() return Image.open(io.BytesIO(resp.content)) @property def wlan_ip(self): return self._reqsess.get(self.path2url("/wlan/ip")).text.strip() def disable_popups(self, enable=True): """ Automatic click all popups TODO: need fix """ raise NotImplementedError() # self.watcher if enable: self.jsonrpc.setAccessibilityPatterns({ "com.android.packageinstaller": [u"确定", u"安装", u"下一步", u"好", u"允许", u"我知道"], "com.miui.securitycenter": [u"继续安装"], # xiaomi "com.lbe.security.miui": [u"允许"], # xiaomi "android": [u"好", u"安装"], # vivo "com.huawei.systemmanager": [u"立即删除"], # huawei "com.android.systemui": [u"同意"], # 锤子 }) else: self.jsonrpc.setAccessibilityPatterns({}) def session(self, pkg_name=None, attach=False, launch_timeout=None): """ Create a new session Args: pkg_name (str): android package name attach (bool): attach to already running app launch_timeout (int): launch timeout Raises: requests.HTTPError, SessionBrokenError """ if pkg_name is None: return self._default_session if not attach: request_data = {"flags": "-W -S"} if launch_timeout: request_data["timeout"] = str(launch_timeout) resp = self._reqsess.post( self.path2url("/session/" + pkg_name), data=request_data) if resp.status_code == 410: # Gone raise SessionBrokenError(pkg_name, resp.text) resp.raise_for_status() jsondata = resp.json() if not jsondata["success"]: raise SessionBrokenError("app launch failed", jsondata["error"], jsondata["output"]) time.sleep(2.5) # wait launch finished, maybe no need pid = self._pidof_app(pkg_name) if not pid: raise SessionBrokenError(pkg_name) return Session(self, pkg_name, pid) def __getattr__(self, attr): if attr in self._cached_plugins: return self._cached_plugins[attr] if attr.startswith('ext_'): plugin_name = attr[4:] if plugin_name not in self.__plugins: if plugin_name == 'xpath': import ssat_sdk.uiautomator2.ext.xpath as xpath xpath.init() else: raise ValueError( "plugin \"%s\" not registed" % plugin_name) func, args, kwargs = self.__plugins[plugin_name] obj = functools.partial(func, self)(*args, **kwargs) self._cached_plugins[attr] = obj return obj try: return getattr(self._default_session, attr) except AttributeError: raise AttributeError( "'Session or UIAutomatorServer' object has no attribute '%s'" % attr) def __call__(self, **kwargs): return self._default_session(**kwargs) class AdbShell(object): def __init__(self, shellfn): """ Args: shellfn: Shell function """ self.shell = shellfn def wmsize(self): """ get window size Returns: (width, height) """ output, _ = self.shell("wm size") m = re.match(r"Physical size: (\d+)x(\d+)", output) if m: return map(int, m.groups()) raise RuntimeError("Can't parse wm size: " + output) def is_screen_on(self): output, _ = self.shell("dumpsys power") return 'mHoldingDisplaySuspendBlocker=true' in output def keyevent(self, v): """ Args: v: eg home wakeup back """ v = v.upper() self.shell("input keyevent " + v) def _adjust_pos(self, x, y, w=None, h=None): if x < 1: x = x * w if y < 1: y = y * h return (x, y) def swipe(self, x0, y0, x1, y1): w, h = None, None if x0 < 1 or y0 < 1 or x1 < 1 or y1 < 1: w, h = self.wmsize() x0, y0 = self._adjust_pos(x0, y0, w, h) x1, y1 = self._adjust_pos(x1, y1, w, h) self.shell("input swipe %d %d %d %d" % (x0, y0, x1, y1))