| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356 | 
							- #!/usr/bin/env python
 
- # -*- coding: utf-8 -*-
 
- """
 
- ::Timeout
 
- atx-agent:ReverseProxy use http.DefaultTransport. Default Timeout: 30s
 
- |-- Dial --|-- TLS handshake --|-- Request --|-- Resp.headers --|-- Respose.body --|
 
- |------------------------------ http.Client.Timeout -------------------------------|
 
- Refs:
 
-     - https://golang.org/pkg/net/http/#RoundTripper
 
-     - http://colobu.com/2016/07/01/the-complete-guide-to-golang-net-http-timeouts
 
- """
 
- from __future__ import absolute_import, print_function
 
- import functools
 
- import hashlib
 
- import io
 
- import json
 
- import os
 
- import re
 
- import shutil
 
- import subprocess
 
- import sys
 
- import threading
 
- import time
 
- import warnings
 
- from collections import namedtuple
 
- from datetime import datetime
 
- from subprocess import list2cmdline
 
- import humanize
 
- import progress.bar
 
- import requests
 
- import six
 
- import six.moves.urllib.parse as urlparse
 
- from retry import retry
 
- from ssat_sdk.uiautomator2 import adbutils
 
- from ssat_sdk.uiautomator2.exceptions import (GatewayError, JsonRpcError, ConnectError,
 
-                                      NullObjectExceptionError,
 
-                                      NullPointerExceptionError,
 
-                                      SessionBrokenError,
 
-                                      StaleObjectExceptionError, UiaError,
 
-                                      UiAutomationNotConnectedError,
 
-                                      UiObjectNotFoundError)
 
- from ssat_sdk.uiautomator2.session import Session, set_fail_prompt  # noqa: F401
 
- from ssat_sdk.uiautomator2.version import __atx_agent_version__
 
- if six.PY2:
 
-     FileNotFoundError = OSError
 
- DEBUG = False
 
- HTTP_TIMEOUT = 60
 
- class _ProgressBar(progress.bar.Bar):
 
-     message = "progress"
 
-     suffix = '%(percent)d%% [%(eta_td)s, %(speed)s]'
 
-     @property
 
-     def speed(self):
 
-         return humanize.naturalsize(
 
-             self.elapsed and self.index / self.elapsed, gnu=True) + '/s'
 
- def log_print(s):
 
-     thread_name = threading.current_thread().getName()
 
-     print(thread_name + ": " + datetime.now().strftime('%H:%M:%S,%f')[:-3] +
 
-           " " + s)
 
- def _is_wifi_addr(addr):
 
-     if not addr:
 
-         return False
 
-     if re.match(r"^https?://", addr):
 
-         return True
 
-     m = re.search(r"(\d+\.\d+\.\d+\.\d+)", addr)
 
-     if m and m.group(1) != "127.0.0.1":
 
-         return True
 
-     return False
 
- def connect(addr=None):
 
-     """
 
-     Args:
 
-         addr (str): uiautomator server address or serial number. default from env-var ANDROID_DEVICE_IP
 
-     Returns:
 
-         UIAutomatorServer
 
-     
 
-     Raises:
 
-         ConnectError
 
-     Example:
 
-         connect("10.0.0.1:7912")
 
-         connect("10.0.0.1") # use default 7912 port
 
-         connect("http://10.0.0.1")
 
-         connect("http://10.0.0.1:7912")
 
-         connect("cff1123ea")  # adb device serial number
 
-     """
 
-     if not addr or addr == '+':
 
-         addr = os.getenv('ANDROID_DEVICE_IP')
 
-     if _is_wifi_addr(addr):
 
-         return connect_wifi(addr)
 
-     return connect_usb(addr)
 
- def connect_adb_wifi(addr):
 
-     """
 
-     Run adb connect, and then call connect_usb(..)
 
-     Args:
 
-         addr: ip+port which can be used for "adb connect" argument
 
-     
 
-     Raises:
 
-         ConnectError
 
-     """
 
-     assert isinstance(addr, six.string_types)
 
-     subprocess.call([adbutils.adb_path(), "connect", addr])
 
-     try:
 
-         subprocess.call([adbutils.adb_path(), "-s", addr, "wait-for-device"], timeout=2)
 
-     except subprocess.TimeoutExpired:
 
-         raise ConnectError("Fail execute", "adb connect " + addr)
 
-     return connect_usb(addr)
 
- def connect_usb(serial=None, healthcheck=True):
 
-     print("uiautomator2.__init.connect_usb: serial=",serial,";healthcheck=",healthcheck)
 
-     """
 
-     Args:
 
-         serial (str): android device serial
 
-         healthcheck (bool): start uiautomator if not ready
 
-     Returns:
 
-         UIAutomatorServer
 
-     
 
-     Raises:
 
-         ConnectError
 
-     """
 
-     adb = adbutils.AdbClient()
 
-     if not serial:
 
-         device = adb.must_one_device()
 
-         print("uiautomator2.__init.connect_usb: not serial, device=", device)
 
-     else:
 
-         device = adbutils.AdbDevice(adb, serial)
 
-     # adb = adbutils.Adb(serial)
 
-     global connect_serial
 
-     connect_serial = device.serial
 
-     log_print("connect_serial:%s" % str(connect_serial))
 
-     lport = device.forward_port(7912)
 
-     print("uiautomator2.__init.connect_usb: lport=", lport)
 
-     d = connect_wifi('127.0.0.1:' + str(lport))
 
-     print("uiautomator2.__init.connect_usb: UIAutomatorServer=", d)
 
-     if healthcheck:
 
-         if not d.agent_alive:
 
-             warnings.warn("backend atx-agent is not alive, start again ...",
 
-                         RuntimeWarning)
 
-             # TODO: /data/local/tmp might not be execuable and atx-agent can be somewhere else
 
-             device.shell_output("/data/local/tmp/atx-agent", "server", "-d")
 
-             deadline = time.time() + 3
 
-             while time.time() < deadline:
 
-                 if d.alive:
 
-                     break
 
-         elif not d.alive:
 
-             warnings.warn("backend uiautomator2 is not alive, start again ...",
 
-                         RuntimeWarning)
 
-             d.reset_uiautomator()
 
-         print("uiautomator2.__init.connect_usb: atx-agent is alive and uiautomator app is alive!")
 
-     return d
 
- def connect_wifi(addr=None):
 
-     """
 
-     Args:
 
-         addr (str) uiautomator server address.
 
-     Returns:
 
-         UIAutomatorServer
 
-     Raises:
 
-         ConnectError
 
-     Examples:
 
-         connect_wifi("10.0.0.1")
 
-     """
 
-     print("uiautomator2.__init.connect_wifi: start")
 
-     if '://' not in addr:
 
-         addr = 'http://' + addr
 
-     if addr.startswith('http://'):
 
-         u = urlparse.urlparse(addr)
 
-         host = u.hostname
 
-         port = u.port or 7912
 
-         return UIAutomatorServer(host, port)
 
-     else:
 
-         raise ConnectError("address should start with http://")
 
- connect_serial = None
 
- def check_adb_connect():
 
-     if not connect_serial:
 
-         return True
 
-     CMD_ADB_CHECK = "adb -s %s shell getprop ro.build.version.sdk" % connect_serial
 
-     p = subprocess.Popen(CMD_ADB_CHECK, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
-     outInfo = p.stdout.read()
 
-     # log_print(u"查询SDK版本号outInfo:%s" % str(outInfo))
 
-     errInfo = p.stderr.read()
 
-     p.kill()
 
-     if errInfo.__len__() > 0:
 
-         log_print(u"查询SDK版本号失败,出错信息:%s" % str(errInfo))
 
-         return False
 
-     else:
 
-         return True
 
- def check_adb_connect_type():
 
-     CMD_ADB_DEVICES = "adb devices"
 
-     p = subprocess.Popen(CMD_ADB_DEVICES, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
-     outInfo = p.stdout.read()
 
-     if "192.168" in outInfo:
 
-         return "tcp/ip"
 
-     else:
 
-         return "usb"
 
- def disconnectAdb():
 
-     if not connect_serial:
 
-         return True
 
-     # CMD_ADB_DISCONNECT = "adb disconnect %s" % connect_serial
 
-     CMD_ADB_DISCONNECT = "adb disconnect %s" % str(connect_serial)
 
-     p = subprocess.Popen(CMD_ADB_DISCONNECT, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
-     p.kill()
 
- def connectAdb():
 
-     if not connect_serial:
 
-         return True
 
-     CMD_ADB_CONNECT = "adb connect %s" % connect_serial
 
-     p = subprocess.Popen(CMD_ADB_CONNECT, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
-     log_print(u"执行adb重连机制CMD_ADB_CONNECT:%s" % str(CMD_ADB_CONNECT))
 
-     p.kill()
 
- def killAdb():
 
-     if not connect_serial:
 
-         return True
 
-     CMD_ADB_KILL_Server = "adb kill-server"
 
-     p = subprocess.Popen(CMD_ADB_KILL_Server, bufsize=128, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 
-     p.kill()
 
- def confirmAdb(serial=None):
 
-     # 如果传来的serial不为空,则重新连接传入的serial对应的设备
 
-     if serial:
 
-         global connect_serial
 
-         connect_serial = serial
 
-     if not connect_serial:
 
-         # log_print("序列为空返回!")
 
-         return True
 
-     # ret = check_adb_connect()
 
-     if check_adb_connect():
 
-         # log_print("adb已经连接connect_serial:%s" % str(connect_serial))
 
-         return True
 
-     else:
 
-         log_print(u"connect_serial:%s,Adb 未连接,尝试重连!!!" % str(connect_serial))
 
-         count = 0
 
-         while True:
 
-             if count > 30:
 
-                 return False
 
-                 break
 
-             # print(u"adb重连第count:%s次" % str(count))
 
-             log_print(u"adb重连第count:%s次" % str(count))
 
-             disconnectAdb()
 
-             time.sleep(5)
 
-             if count > 1:
 
-                 log_print(u"adb重连大于2次,仍未连接成功,kill adb server")
 
-                 killAdb()
 
-                 time.sleep(5)
 
-                 # disconnectAdb()
 
-                 # time.sleep(5)
 
-             if check_adb_connect():
 
-                 # print(u"adb重连成功count:%s" % str(count))
 
-                 log_print(u"adb重连成功count:%s" % str(count))
 
-                 # printLog(u"adb重连成功count:%s"%str(count))
 
-                 # 重连之后要重新构建连接
 
-                 return True
 
-             connectAdb()
 
-             time.sleep(5)
 
-             if check_adb_connect():
 
-                 # print(u"adb重连成功count:%s" % str(count))
 
-                 log_print(u"adb重连成功count:%s" % str(count))
 
-                 # printLog(u"adb重连成功count:%s"%str(count))
 
-                 # 重连之后要重新构建连接
 
-                 return True
 
-                 break
 
-             else:
 
-                 log_print(u"adb重连失败count:%s" % str(count))
 
-                 count = count + 1
 
-                 time.sleep(1)
 
- class TimeoutRequestsSession(requests.Session):
 
-     def __init__(self):
 
-         super(TimeoutRequestsSession, self).__init__()
 
-         # refs: https://stackoverflow.com/questions/33895739/python-requests-cant-load-any-url-remote-end-closed-connection-without-respo
 
-         adapter = requests.adapters.HTTPAdapter(max_retries=3)
 
-         self.mount("http://", adapter)
 
-         self.mount("https://", adapter)
 
-     def request(self, method, url, **kwargs):
 
-         if kwargs.get('timeout') is None:
 
-             kwargs['timeout'] = HTTP_TIMEOUT
 
-         verbose = hasattr(self, 'debug') and self.debug
 
-         if verbose:
 
-             data = kwargs.get('data') or '""'
 
-             if isinstance(data, dict):
 
-                 data = json.dumps(data)
 
-             time_start = time.time()
 
-             print(
 
-                 datetime.now().strftime("%H:%M:%S.%f")[:-3],
 
-                 "$ curl -X {method} -d '{data}' '{url}'".format(
 
-                     method=method, url=url, data=data))
 
-         try:
 
-             resp = super(TimeoutRequestsSession, self).request(
 
-                 method, url, **kwargs)
 
-         except requests.ConnectionError:
 
-             raise EnvironmentError(
 
-                 "atx-agent is not running. Fix it with following steps.\n1. Plugin device into computer.\n2. Run command \"python -m uiautomator2 init\""
 
-             )
 
-         else:
 
-             if verbose:
 
-                 print(
 
-                     datetime.now().strftime("%H:%M:%S.%f")[:-3],
 
-                     "Response (%d ms) >>>\n" % (
 
-                         (time.time() - time_start) * 1000) +
 
-                     resp.text.rstrip() + "\n<<< END")
 
-             return resp
 
- def plugin_register(name, plugin, *args, **kwargs):
 
-     """
 
-     Add plugin into UIAutomatorServer
 
-     Args:
 
-         name: string
 
-         plugin: class or function which take d as first parameter
 
-     Example:
 
-         def upload_screenshot(d):
 
-             def inner():
 
-                 d.screenshot("tmp.jpg")
 
-                 # use requests.post upload tmp.jpg
 
-             return inner
 
-         plugin_register("upload_screenshot", save_screenshot)
 
-         d = u2.connect()
 
-         d.ext_upload_screenshot()
 
-     """
 
-     UIAutomatorServer.plugins()[name] = (plugin, args, kwargs)
 
- def plugin_clear():
 
-     UIAutomatorServer.plugins().clear()
 
- class UIAutomatorServer(object):
 
-     __isfrozen = False
 
-     __plugins = {}
 
-     def __init__(self, host, port=7912):
 
-         """
 
-         Args:
 
-             host (str): host address
 
-             port (int): port number
 
-         Raises:
 
-             EnvironmentError
 
-         """
 
-         self._host = host
 
-         self._port = port
 
-         self._reqsess = TimeoutRequestsSession(
 
-         )  # use requests.Session to enable HTTP Keep-Alive
 
-         self._server_url = 'http://{}:{}'.format(host, port)
 
-         self._server_jsonrpc_url = self._server_url + "/jsonrpc/0"
 
-         self._default_session = Session(self, None)
 
-         self._cached_plugins = {}
 
-         self.__devinfo = None
 
-         self._hooks = {}
 
-         self.platform = None  # hot fix for weditor
 
-         self.ash = AdbShell(self.shell)  # the powerful adb shell
 
-         self.wait_timeout = 20.0  # wait element timeout
 
-         self.click_post_delay = None  # wait after each click
 
-         self._freeze()  # prevent creating new attrs
 
-         # self._atx_agent_check()
 
-     def _freeze(self):
 
-         self.__isfrozen = True
 
-     @staticmethod
 
-     def plugins():
 
-         return UIAutomatorServer.__plugins
 
-     def __setattr__(self, key, value):
 
-         """ Prevent creating new attributes outside __init__ """
 
-         if self.__isfrozen and not hasattr(self, key):
 
-             raise TypeError("Key %s does not exist in class %r" % (key, self))
 
-         object.__setattr__(self, key, value)
 
-     def __str__(self):
 
-         return 'uiautomator2 object for %s:%d' % (self._host, self._port)
 
-     def __repr__(self):
 
-         return str(self)
 
-     def _atx_agent_check(self):
 
-         """ check atx-agent health status and version """
 
-         try:
 
-             version = self._reqsess.get(
 
-                 self.path2url('/version'), timeout=5).text
 
-             if version != __atx_agent_version__:
 
-                 warnings.warn(
 
-                     'Version dismatch, expect "%s" actually "%s"' %
 
-                     (__atx_agent_version__, version),
 
-                     Warning,
 
-                     stacklevel=2)
 
-             # Cancel bellow code to make connect() return faster.
 
-             # launch service to prevent uiautomator killed by Android system
 
-             # self.adb_shell('am', 'startservice', '-n', 'com.github.uiautomator/.Service')
 
-         except (requests.ConnectionError, ) as e:
 
-             raise EnvironmentError(
 
-                 "atx-agent is not responding, need to init device first")
 
-     @property
 
-     def debug(self):
 
-         return hasattr(self._reqsess, 'debug') and self._reqsess.debug
 
-     @debug.setter
 
-     def debug(self, value):
 
-         self._reqsess.debug = bool(value)
 
-     @property
 
-     def serial(self):
 
-         return self.shell(['getprop', 'ro.serialno'])[0].strip()
 
-     @property
 
-     def jsonrpc(self):
 
-         """
 
-         Make jsonrpc call easier
 
-         For example:
 
-             self.jsonrpc.pressKey("home")
 
-         """
 
-         return self.setup_jsonrpc()
 
-     def path2url(self, path):
 
-         return urlparse.urljoin(self._server_url, path)
 
-     def window_size(self):
 
-         """ return (width, height) """
 
-         info = self._reqsess.get(self.path2url('/info')).json()
 
-         w, h = info['display']['width'], info['display']['height']
 
-         if (w > h) != (self.info["displayRotation"] % 2 == 1):
 
-             w, h = h, w
 
-         return w, h
 
-     def hooks_register(self, func):
 
-         """
 
-         Args:
 
-             func: should accept 3 args. func_name:string, args:tuple, kwargs:dict
 
-         """
 
-         self._hooks[func] = True
 
-     def hooks_apply(self, stage, func_name, args=(), kwargs={}, ret=None):
 
-         """
 
-         Args:
 
-             stage(str): one of "before" or "after"
 
-         """
 
-         for fn in self._hooks.keys():
 
-             fn(stage, func_name, args, kwargs, ret)
 
-     def setup_jsonrpc(self, jsonrpc_url=None):
 
-         """
 
-         Wrap jsonrpc call into object
 
-         Usage example:
 
-             self.setup_jsonrpc().pressKey("home")
 
-         """
 
-         if not jsonrpc_url:
 
-             jsonrpc_url = self._server_jsonrpc_url
 
-         class JSONRpcWrapper():
 
-             def __init__(self, server):
 
-                 self.server = server
 
-                 self.method = None
 
-             def __getattr__(self, method):
 
-                 self.method = method  # jsonrpc function name
 
-                 return self
 
-             def __call__(self, *args, **kwargs):
 
-                 http_timeout = kwargs.pop('http_timeout', HTTP_TIMEOUT)
 
-                 params = args if args else kwargs
 
-                 return self.server.jsonrpc_retry_call(jsonrpc_url, self.method,
 
-                                                       params, http_timeout)
 
-         return JSONRpcWrapper(self)
 
-     def jsonrpc_retry_call(self, *args,
 
-                            **kwargs):  # method, params=[], http_timeout=60):
 
-         try:
 
-             return self.jsonrpc_call(*args, **kwargs)
 
-         except (GatewayError, ):
 
-             warnings.warn(
 
-                 "uiautomator2 is not reponding, restart uiautomator2 automatically",
 
-                 RuntimeWarning,
 
-                 stacklevel=1)
 
-             # for XiaoMi, want to recover uiautomator2 must start app:com.github.uiautomator
 
-             self.reset_uiautomator()
 
-             return self.jsonrpc_call(*args, **kwargs)
 
-         except UiAutomationNotConnectedError:
 
-             warnings.warn(
 
-                 "UiAutomation not connected, restart uiautoamtor",
 
-                 RuntimeWarning,
 
-                 stacklevel=1)
 
-             self.reset_uiautomator()
 
-             return self.jsonrpc_call(*args, **kwargs)
 
-         except (NullObjectExceptionError, NullPointerExceptionError,
 
-                 StaleObjectExceptionError) as e:
 
-             if args[1] != 'dumpWindowHierarchy':  # args[1] method
 
-                 warnings.warn(
 
-                     "uiautomator2 raise exception %s, and run code again" % e,
 
-                     RuntimeWarning,
 
-                     stacklevel=1)
 
-             time.sleep(1)
 
-             return self.jsonrpc_call(*args, **kwargs)
 
-     def jsonrpc_call(self, jsonrpc_url, method, params=[], http_timeout=60):
 
-         """ jsonrpc2 call
 
-         Refs:
 
-             - http://www.jsonrpc.org/specification
 
-         """
 
-         request_start = time.time()
 
-         data = {
 
-             "jsonrpc": "2.0",
 
-             "id": self._jsonrpc_id(method),
 
-             "method": method,
 
-             "params": params,
 
-         }
 
-         data = json.dumps(data).encode('utf-8')
 
-         res = self._reqsess.post(
 
-             jsonrpc_url,  # +"?m="+method, #?method is for debug
 
-             headers={"Content-Type": "application/json"},
 
-             timeout=http_timeout,
 
-             data=data)
 
-         if DEBUG:
 
-             print("Shell$ curl -X POST -d '{}' {}".format(data, jsonrpc_url))
 
-             print("Output> " + res.text)
 
-         if res.status_code == 502:
 
-             raise GatewayError(
 
-                 res, "gateway error, time used %.1fs" %
 
-                 (time.time() - request_start))
 
-         if res.status_code == 410:  # http status gone: session broken
 
-             raise SessionBrokenError("app quit or crash", jsonrpc_url,
 
-                                      res.text)
 
-         if res.status_code != 200:
 
-             raise UiaError(jsonrpc_url, data, res.status_code, res.text,
 
-                            "HTTP Return code is not 200", res.text)
 
-         jsondata = res.json()
 
-         error = jsondata.get('error')
 
-         if not error:
 
-             return jsondata.get('result')
 
-         # error happends
 
-         err = JsonRpcError(error, method)
 
-         if isinstance(
 
-                 err.data,
 
-                 six.string_types) and 'UiAutomation not connected' in err.data:
 
-             err.__class__ = UiAutomationNotConnectedError
 
-         elif err.message:
 
-             if 'uiautomator.UiObjectNotFoundException' in err.message:
 
-                 err.__class__ = UiObjectNotFoundError
 
-             elif 'android.support.test.uiautomator.StaleObjectException' in err.message:
 
-                 # StaleObjectException
 
-                 # https://developer.android.com/reference/android/support/test/uiautomator/StaleObjectException.html
 
-                 # A StaleObjectException exception is thrown when a UiObject2 is used after the underlying View has been destroyed.
 
-                 # In this case, it is necessary to call findObject(BySelector) to obtain a new UiObject2 instance.
 
-                 err.__class__ = StaleObjectExceptionError
 
-             elif 'java.lang.NullObjectException' in err.message:
 
-                 err.__class__ = NullObjectExceptionError
 
-             elif 'java.lang.NullPointerException' == err.message:
 
-                 err.__class__ = NullPointerExceptionError
 
-         raise err
 
-     def _jsonrpc_id(self, method):
 
-         m = hashlib.md5()
 
-         m.update(("%s at %f" % (method, time.time())).encode("utf-8"))
 
-         return m.hexdigest()
 
-     @property
 
-     def agent_alive(self):
 
-         try:
 
-             r = self._reqsess.get(self.path2url('/version'), timeout=2)
 
-             return r.status_code == 200
 
-         except:
 
-             return False
 
-     @property
 
-     def alive(self):
 
-         try:
 
-             r = self._reqsess.get(self.path2url('/ping'), timeout=2)
 
-             if r.status_code != 200:
 
-                 return False
 
-             r = self._reqsess.post(
 
-                 self.path2url('/jsonrpc/0'),
 
-                 data=json.dumps({
 
-                     "jsonrpc": "2.0",
 
-                     "id": 1,
 
-                     "method": "deviceInfo"
 
-                 }),
 
-                 timeout=2)
 
-             if r.status_code != 200:
 
-                 return False
 
-             if r.json().get('error'):
 
-                 return False
 
-             return True
 
-         except requests.exceptions.ReadTimeout:
 
-             return False
 
-         except EnvironmentError:
 
-             return False
 
-     def service(self, name):
 
-         """ Manage service start or stop
 
-         Example:
 
-             d.service("uiautomator").start()
 
-             d.service("uiautomator").stop()
 
-         """
 
-         u2obj = self
 
-         class _Service(object):
 
-             def __init__(self, name):
 
-                 self.name = name
 
-                 # FIXME(ssx): support other service: minicap, minitouch
 
-                 assert name == 'uiautomator'
 
-             def start(self):
 
-                 res = u2obj._reqsess.post(u2obj.path2url('/uiautomator'))
 
-                 res.raise_for_status()
 
-             def stop(self):
 
-                 res = u2obj._reqsess.delete(u2obj.path2url('/uiautomator'))
 
-                 if res.status_code != 200:
 
-                     warnings.warn(res.text)
 
-             
 
-             def running(self):
 
-                 res = u2obj._reqsess.get(u2obj.path2url("/uiautomator"))
 
-                 res.raise_for_status()
 
-                 return res.json().get("running")
 
-         return _Service(name)
 
-     
 
-     @property
 
-     def uiautomator(self):
 
-         return self.service("uiautomator")
 
-     def reset_uiautomator(self):
 
-         """
 
-         Reset uiautomator
 
-         Raises:
 
-             RuntimeError
 
-         """
 
-         # self.open_identify()
 
-         stopUrl = self.path2url('/uiautomator')
 
-         print("reset_uiautomator:stopUrl=",stopUrl)
 
-         self._reqsess.delete(stopUrl)  # stop uiautomator keeper first
 
-         # wait = not unlock  # should not wait IdentifyActivity open or it will stuck sometimes
 
-         # self.app_start(  # may also stuck here.
 
-         #     'com.github.uiautomator',
 
-         #     '.MainActivity',
 
-         #     wait=False,
 
-         #     stop=True)
 
-         time.sleep(.5)
 
-         # launch atx-agent uiautomator keeper
 
-         startUrl = self.path2url('/uiautomator')
 
-         print("reset_uiautomator:startUrl=",startUrl)
 
-         self._reqsess.post(startUrl)
 
-         # wait until uiautomator2 service working
 
-         deadline = time.time() + 120.0
 
-         self.shell([
 
-             'am', 'start', '-n',
 
-             'com.github.uiautomator/.MainActivity'
 
-         ])
 
-         time.sleep(1)
 
-         while time.time() < deadline:
 
-             print(
 
-                 time.strftime("[%Y-%m-%d %H:%M:%S]"),
 
-                 "uiautomator is starting ...")
 
-             if self.alive:
 
-                 # keyevent BACK if current is com.github.uiautomator
 
-                 # XiaoMi uiautomator will kill the app(com.github.uiautomator) when launch
 
-                 #   it is better to start a service to make uiautomator live longer
 
-                 if self.current_app()['package'] != 'com.github.uiautomator':
 
-                     self.shell([
 
-                         'am', 'startservice', '-n',
 
-                         'com.github.uiautomator/.Service'
 
-                     ])
 
-                     time.sleep(1.5)
 
-                 else:
 
-                     time.sleep(.5)
 
-                     self.shell(['input', 'keyevent', 'BACK'])
 
-                 print("uiautomator back to normal")
 
-                 return True
 
-             time.sleep(1)
 
-             self.shell(['input', 'keyevent', 'KEYCODE_VOLUME_UP'])
 
-             self.shell(['input', 'keyevent', 'KEYCODE_VOLUME_DOWN'])
 
-         raise RuntimeError(
 
-             "Uiautomator started failed. Find solutions in https://github.com/openatx/uiautomator2/wiki/Common-issues"
 
-         )
 
-     def healthcheck(self):
 
-         """
 
-         Reset device into health state
 
-         Raises:
 
-             RuntimeError
 
-         """
 
-         sh = self.ash
 
-         if not sh.is_screen_on():
 
-             print(time.strftime("[%Y-%m-%d %H:%M:%S]"), "wakeup screen")
 
-             sh.keyevent("WAKEUP")
 
-             sh.keyevent("HOME")
 
-             sh.swipe(0.1, 0.9, 0.9, 0.1)  # swipe to unlock
 
-         sh.keyevent("HOME")
 
-         sh.keyevent("BACK")
 
-         self.reset_uiautomator()
 
-     def app_install(self, url, installing_callback=None, server=None):
 
-         """
 
-         {u'message': u'downloading', "progress": {u'totalSize': 407992690, u'copiedSize': 49152}}
 
-         Returns:
 
-             packageName
 
-         Raises:
 
-             RuntimeError
 
-         """
 
-         r = self._reqsess.post(self.path2url('/install'), data={'url': url})
 
-         if r.status_code != 200:
 
-             raise RuntimeError("app install error:", r.text)
 
-         id = r.text.strip()
 
-         print(time.strftime('%H:%M:%S'), "id:", id)
 
-         return self._wait_install_finished(id, installing_callback)
 
-     def _wait_install_finished(self, id, installing_callback):
 
-         bar = None
 
-         downloaded = True
 
-         while True:
 
-             resp = self._reqsess.get(self.path2url('/install/' + id))
 
-             resp.raise_for_status()
 
-             jdata = resp.json()
 
-             message = jdata['message']
 
-             pg = jdata.get('progress')
 
-             def notty_print_progress(pg):
 
-                 written = pg['copiedSize']
 
-                 total = pg['totalSize']
 
-                 print(
 
-                     time.strftime('%H:%M:%S'), 'downloading %.1f%% [%s/%s]' %
 
-                     (100.0 * written / total,
 
-                      humanize.naturalsize(written, gnu=True),
 
-                      humanize.naturalsize(total, gnu=True)))
 
-             if message == 'downloading':
 
-                 downloaded = False
 
-                 if pg:  # if there is a progress
 
-                     if hasattr(sys.stdout, 'isatty'):
 
-                         if sys.stdout.isatty():
 
-                             if not bar:
 
-                                 bar = _ProgressBar(
 
-                                     time.strftime('%H:%M:%S') + ' downloading',
 
-                                     max=pg['totalSize'])
 
-                             written = pg['copiedSize']
 
-                             bar.next(written - bar.index)
 
-                         else:
 
-                             notty_print_progress(pg)
 
-                     else:
 
-                         pass
 
-                 else:
 
-                     print(time.strftime('%H:%M:%S'), "download initialing")
 
-             else:
 
-                 if not downloaded:
 
-                     downloaded = True
 
-                     if bar:  # bar only set in atty
 
-                         bar.next(pg['copiedSize'] - bar.index) if pg else None
 
-                         bar.finish()
 
-                     else:
 
-                         print(time.strftime('%H:%M:%S'), "download 100%")
 
-                 print(time.strftime('%H:%M:%S'), message)
 
-             if message == 'installing':
 
-                 if callable(installing_callback):
 
-                     installing_callback(self)
 
-             if message == 'success installed':
 
-                 return jdata.get('packageName')
 
-             if jdata.get('error'):
 
-                 raise RuntimeError("error", jdata.get('error'))
 
-             try:
 
-                 time.sleep(1)
 
-             except KeyboardInterrupt:
 
-                 bar.finish() if bar else None
 
-                 print("keyboard interrupt catched, cancel install id", id)
 
-                 self._reqsess.delete(self.path2url('/install/' + id))
 
-                 raise
 
-     def shell(self, cmdargs, stream=False, timeout=60):
 
-         """
 
-         Run adb shell command with arguments and return its output. Require atx-agent >=0.3.3
 
-         Args:
 
-             cmdargs: str or list, example: "ls -l" or ["ls", "-l"]
 
-             timeout: seconds of command run, works on when stream is False
 
-             stream: bool used for long running process.
 
-         Returns:
 
-             (output, exit_code) when stream is False
 
-             requests.Response when stream is True, you have to close it after using
 
-         Raises:
 
-             RuntimeError
 
-         For atx-agent is not support return exit code now.
 
-         When command got something wrong, exit_code is always 1, otherwise exit_code is always 0
 
-         """
 
-         print("uiautomator2.__init__.shell:cmd=",cmdargs)
 
-         if isinstance(cmdargs, (list, tuple)):
 
-             cmdargs = list2cmdline(cmdargs)
 
-         if stream:
 
-             return self._reqsess.get(
 
-                 self.path2url("/shell/stream"),
 
-                 params={"command": cmdargs},
 
-                 stream=True)
 
-         ret = self._reqsess.post(
 
-             self.path2url('/shell'),
 
-             data={
 
-                 'command': cmdargs,
 
-                 'timeout': str(timeout)
 
-             }, timeout=timeout+10)
 
-         if ret.status_code != 200:
 
-             raise RuntimeError(
 
-                 "device agent responds with an error code %d" %
 
-                 ret.status_code, ret.text)
 
-         resp = ret.json()
 
-         exit_code = 1 if resp.get('error') else 0
 
-         exit_code = resp.get('exitCode', exit_code)
 
-         shell_response = namedtuple("ShellResponse", ("output", "exit_code"))
 
-         return shell_response(resp.get('output'), exit_code)
 
-     def adb_shell(self, *args):
 
-         """
 
-         Example:
 
-             adb_shell('pwd')
 
-             adb_shell('ls', '-l')
 
-             adb_shell('ls -l')
 
-         Returns:
 
-             string for stdout merged with stderr, after the entire shell command is completed.
 
-         """
 
-         # print(
 
-         #     "DeprecatedWarning: adb_shell is deprecated, use: output, exit_code = shell(['ls', '-l']) instead"
 
-         # )
 
-         cmdline = args[0] if len(args) == 1 else list2cmdline(args)
 
-         return self.shell(cmdline)[0]
 
-     def app_start(self,
 
-                   pkg_name,
 
-                   activity=None,
 
-                   extras={},
 
-                   wait=True,
 
-                   stop=False,
 
-                   unlock=False):
 
-         """ Launch application
 
-         Args:
 
-             pkg_name (str): package name
 
-             activity (str): app activity
 
-             stop (bool): Stop app before starting the activity. (require activity)
 
-         """
 
-         if unlock:
 
-             self.unlock()
 
-         if activity:
 
-             # -D: enable debugging
 
-             # -W: wait for launch to complete
 
-             # -S: force stop the target app before starting the activity
 
-             # --user <USER_ID> | current: Specify which user to run as; if not
 
-             #    specified then run as the current user.
 
-             # -e <EXTRA_KEY> <EXTRA_STRING_VALUE>
 
-             # --ei <EXTRA_KEY> <EXTRA_INT_VALUE>
 
-             # --ez <EXTRA_KEY> <EXTRA_BOOLEAN_VALUE>
 
-             args = [
 
-                 'am', 'start', '-a', 'android.intent.action.MAIN', '-c',
 
-                 'android.intent.category.LAUNCHER'
 
-             ]
 
-             if wait:
 
-                 args.append('-W')
 
-             if stop:
 
-                 args.append('-S')
 
-             args += ['-n', '{}/{}'.format(pkg_name, activity)]
 
-             # -e --ez
 
-             extra_args = []
 
-             for k, v in extras.items():
 
-                 if isinstance(v, bool):
 
-                     extra_args.extend(['--ez', k, 'true' if v else 'false'])
 
-                 elif isinstance(v, int):
 
-                     extra_args.extend(['--ei', k, str(v)])
 
-                 else:
 
-                     extra_args.extend(['-e', k, v])
 
-             args += extra_args
 
-             # 'am', 'start', '-W', '-n', '{}/{}'.format(pkg_name, activity))
 
-             self.shell(args)
 
-         else:
 
-             if stop:
 
-                 self.app_stop(pkg_name)
 
-             self.shell([
 
-                 'monkey', '-p', pkg_name, '-c',
 
-                 'android.intent.category.LAUNCHER', '1'
 
-             ])
 
-     @retry(EnvironmentError, delay=.5, tries=3, jitter=.1)
 
-     def current_app(self):
 
-         """
 
-         Returns:
 
-             dict(package, activity, pid?)
 
-         Raises:
 
-             EnvironementError
 
-         For developer:
 
-             Function reset_uiautomator need this function, so can't use jsonrpc here.
 
-         """
 
-         # Related issue: https://github.com/openatx/uiautomator2/issues/200
 
-         # $ adb shell dumpsys window windows
 
-         # Example output:
 
-         #   mCurrentFocus=Window{41b37570 u0 com.incall.apps.launcher/com.incall.apps.launcher.Launcher}
 
-         #   mFocusedApp=AppWindowToken{422df168 token=Token{422def98 ActivityRecord{422dee38 u0 com.example/.UI.play.PlayActivity t14}}}
 
-         # Regexp
 
-         #   r'mFocusedApp=.*ActivityRecord{\w+ \w+ (?P<package>.*)/(?P<activity>.*) .*'
 
-         #   r'mCurrentFocus=Window{\w+ \w+ (?P<package>.*)/(?P<activity>.*)\}')
 
-         _focusedRE = re.compile(
 
-             r'mCurrentFocus=Window{.*\s+(?P<package>[^\s]+)/(?P<activity>[^\s]+)\}'
 
-         )
 
-         m = _focusedRE.search(self.shell(['dumpsys', 'window', 'windows'])[0])
 
-         if m:
 
-             return dict(
 
-                 package=m.group('package'), activity=m.group('activity'))
 
-         # try: adb shell dumpsys activity top
 
-         _activityRE = re.compile(
 
-             r'ACTIVITY (?P<package>[^\s]+)/(?P<activity>[^/\s]+) \w+ pid=(?P<pid>\d+)'
 
-         )
 
-         output, _ = self.shell(['dumpsys', 'activity', 'top'])
 
-         ms = _activityRE.finditer(output)
 
-         ret = None
 
-         for m in ms:
 
-             ret = dict(
 
-                 package=m.group('package'),
 
-                 activity=m.group('activity'),
 
-                 pid=int(m.group('pid')))
 
-         if ret:  # get last result
 
-             return ret
 
-         raise EnvironmentError("Couldn't get focused app")
 
-     def wait_activity(self, activity, timeout=10):
 
-         """ wait activity
 
-         Args:
 
-             activity (str): name of activity
 
-             timeout (float): max wait time
 
-         Returns:
 
-             bool of activity
 
-         """
 
-         deadline = time.time() + timeout
 
-         while time.time() < deadline:
 
-             current_activity = self.current_app().get('activity')
 
-             if activity == current_activity:
 
-                 return True
 
-             time.sleep(.5)
 
-         return False
 
-     def app_stop(self, pkg_name):
 
-         """ Stop one application: am force-stop"""
 
-         self.shell(['am', 'force-stop', pkg_name])
 
-     def app_stop_all(self, excludes=[]):
 
-         """ Stop all third party applications
 
-         Args:
 
-             excludes (list): apps that do now want to kill
 
-         Returns:
 
-             a list of killed apps
 
-         """
 
-         our_apps = ['com.github.uiautomator', 'com.github.uiautomator.test']
 
-         output, _ = self.shell(['pm', 'list', 'packages', '-3'])
 
-         pkgs = re.findall('package:([^\s]+)', output)
 
-         process_names = re.findall('([^\s]+)$', self.shell('ps')[0], re.M)
 
-         kill_pkgs = set(pkgs).intersection(process_names).difference(our_apps +
 
-                                                                      excludes)
 
-         kill_pkgs = list(kill_pkgs)
 
-         for pkg_name in kill_pkgs:
 
-             self.app_stop(pkg_name)
 
-         return kill_pkgs
 
-     def app_clear(self, pkg_name):
 
-         """ Stop and clear app data: pm clear """
 
-         self.shell(['pm', 'clear', pkg_name])
 
-     def app_uninstall(self, pkg_name):
 
-         """ Uninstall an app """
 
-         self.shell(["pm", "uninstall", pkg_name])
 
-     def app_uninstall_all(self, excludes=[], verbose=False):
 
-         """ Uninstall all apps """
 
-         our_apps = ['com.github.uiautomator', 'com.github.uiautomator.test']
 
-         output, _ = self.shell(['pm', 'list', 'packages', '-3'])
 
-         pkgs = re.findall('package:([^\s]+)', output)
 
-         pkgs = set(pkgs).difference(our_apps + excludes)
 
-         pkgs = list(pkgs)
 
-         for pkg_name in pkgs:
 
-             if verbose:
 
-                 print("uninstalling", pkg_name)
 
-             self.app_uninstall(pkg_name)
 
-         return pkgs
 
-     def unlock(self):
 
-         """ unlock screen """
 
-         self.open_identify()
 
-         self._default_session.press("home")
 
-     def open_identify(self, theme='black'):
 
-         """
 
-         Args:
 
-             theme (str): black or red
 
-         """
 
-         self.shell([
 
-             'am', 'start', '-W', '-n',
 
-             'com.github.uiautomator/.IdentifyActivity', '-e', 'theme', theme
 
-         ])
 
-     def _pidof_app(self, pkg_name):
 
-         """
 
-         Return pid of package name
 
-         """
 
-         text = self._reqsess.get(self.path2url('/pidof/' + pkg_name)).text
 
-         if text.isdigit():
 
-             return int(text)
 
-     def push_url(self, url, dst, mode=0o644):
 
-         """
 
-         Args:
 
-             url (str): http url address
 
-             dst (str): destination
 
-             mode (str): file mode
 
-         Raises:
 
-             FileNotFoundError(py3) OSError(py2)
 
-         """
 
-         modestr = oct(mode).replace('o', '')
 
-         r = self._reqsess.post(
 
-             self.path2url('/download'),
 
-             data={
 
-                 'url': url,
 
-                 'filepath': dst,
 
-                 'mode': modestr
 
-             })
 
-         if r.status_code != 200:
 
-             raise IOError("push-url", "%s -> %s" % (url, dst), r.text)
 
-         key = r.text.strip()
 
-         while 1:
 
-             r = self._reqsess.get(self.path2url('/download/' + key))
 
-             jdata = r.json()
 
-             message = jdata.get('message')
 
-             if message == 'downloaded':
 
-                 log_print("downloaded")
 
-                 break
 
-             elif message == 'downloading':
 
-                 progress = jdata.get('progress')
 
-                 if progress:
 
-                     copied_size = progress.get('copiedSize')
 
-                     total_size = progress.get('totalSize')
 
-                     log_print("{} {} / {}".format(
 
-                         message, humanize.naturalsize(copied_size),
 
-                         humanize.naturalsize(total_size)))
 
-                 else:
 
-                     log_print("downloading")
 
-             else:
 
-                 log_print("unknown json:" + str(jdata))
 
-                 raise IOError(message)
 
-             time.sleep(1)
 
-     def push(self, src, dst, mode=0o644):
 
-         """
 
-         Args:
 
-             src (path or fileobj): source file
 
-             dst (str): destination can be folder or file path
 
-         Returns:
 
-             dict object, for example:
 
-                 {"mode": "0660", "size": 63, "target": "/sdcard/ABOUT.rst"}
 
-             Since chmod may fail in android, the result "mode" may not same with input args(mode)
 
-         Raises:
 
-             IOError(if push got something wrong)
 
-         """
 
-         modestr = oct(mode).replace('o', '')
 
-         pathname = self.path2url('/upload/' + dst.lstrip('/'))
 
-         if isinstance(src, six.string_types):
 
-             src = open(src, 'rb')
 
-         r = self._reqsess.post(
 
-             pathname, data={'mode': modestr}, files={'file': src})
 
-         if r.status_code == 200:
 
-             return r.json()
 
-         raise IOError("push", "%s -> %s" % (src, dst), r.text)
 
-     def pull(self, src, dst):
 
-         """
 
-         Pull file from device to local
 
-         Raises:
 
-             FileNotFoundError(py3) OSError(py2)
 
-         Require atx-agent >= 0.0.9
 
-         """
 
-         pathname = self.path2url("/raw/" + src.lstrip("/"))
 
-         r = self._reqsess.get(pathname, stream=True)
 
-         if r.status_code != 200:
 
-             raise FileNotFoundError("pull", src, r.text)
 
-         with open(dst, 'wb') as f:
 
-             shutil.copyfileobj(r.raw, f)
 
-         
 
-     def pull_content(self, src):
 
-         """
 
-         Read remote file content
 
-         Raises:
 
-             FileNotFoundError
 
-         """
 
-         pathname = self.path2url("/raw/" + src.lstrip("/"))
 
-         r = self._reqsess.get(pathname)
 
-         if r.status_code != 200:
 
-             raise FileNotFoundError("pull", src, r.text)
 
-         return r.content
 
-     @property
 
-     def screenshot_uri(self):
 
-         return 'http://%s:%d/screenshot/0' % (self._host, self._port)
 
-     
 
-     def screenshot(self, *args, **kwargs):
 
-         """
 
-         Take screenshot of device
 
-         Returns:
 
-             PIL.Image
 
-         """
 
-         return self.session().screenshot(*args, **kwargs)
 
-     @property
 
-     def device_info(self):
 
-         if self.__devinfo:
 
-             return self.__devinfo
 
-         self.__devinfo = self._reqsess.get(self.path2url('/info')).json()
 
-         return self.__devinfo
 
-     def app_info(self, pkg_name):
 
-         """
 
-         Get app info
 
-         Args:
 
-             pkg_name (str): package name
 
-         Return example:
 
-             {
 
-                 "mainActivity": "com.github.uiautomator.MainActivity",
 
-                 "label": "ATX",
 
-                 "versionName": "1.1.7",
 
-                 "versionCode": 1001007,
 
-                 "size":1760809
 
-             }
 
-         Raises:
 
-             UiaError
 
-         """
 
-         url = self.path2url('/packages/{0}/info'.format(pkg_name))
 
-         resp = self._reqsess.get(url)
 
-         resp.raise_for_status()
 
-         resp = resp.json()
 
-         if not resp.get('success'):
 
-             raise UiaError(resp.get('description', 'unknown'))
 
-         return resp.get('data')
 
-     def app_icon(self, pkg_name):
 
-         """
 
-         Returns:
 
-             PIL.Image
 
-         Raises:
 
-             UiaError
 
-         """
 
-         from PIL import Image
 
-         url = self.path2url('/packages/{0}/icon'.format(pkg_name))
 
-         resp = self._reqsess.get(url)
 
-         resp.raise_for_status()
 
-         return Image.open(io.BytesIO(resp.content))
 
-     @property
 
-     def wlan_ip(self):
 
-         return self._reqsess.get(self.path2url("/wlan/ip")).text.strip()
 
-     def disable_popups(self, enable=True):
 
-         """
 
-         Automatic click all popups
 
-         TODO: need fix
 
-         """
 
-         raise NotImplementedError()
 
-         # self.watcher
 
-         if enable:
 
-             self.jsonrpc.setAccessibilityPatterns({
 
-                 "com.android.packageinstaller":
 
-                 [u"确定", u"安装", u"下一步", u"好", u"允许", u"我知道"],
 
-                 "com.miui.securitycenter": [u"继续安装"],  # xiaomi
 
-                 "com.lbe.security.miui": [u"允许"],  # xiaomi
 
-                 "android": [u"好", u"安装"],  # vivo
 
-                 "com.huawei.systemmanager": [u"立即删除"],  # huawei
 
-                 "com.android.systemui": [u"同意"],  # 锤子
 
-             })
 
-         else:
 
-             self.jsonrpc.setAccessibilityPatterns({})
 
-     def session(self, pkg_name=None, attach=False, launch_timeout=None):
 
-         """
 
-         Create a new session
 
-         Args:
 
-             pkg_name (str): android package name
 
-             attach (bool): attach to already running app
 
-             launch_timeout (int): launch timeout
 
-         Raises:
 
-             requests.HTTPError, SessionBrokenError
 
-         """
 
-         if pkg_name is None:
 
-             return self._default_session
 
-         if not attach:
 
-             request_data = {"flags": "-W -S"}
 
-             if launch_timeout:
 
-                 request_data["timeout"] = str(launch_timeout)
 
-             resp = self._reqsess.post(
 
-                 self.path2url("/session/" + pkg_name), data=request_data)
 
-             if resp.status_code == 410:  # Gone
 
-                 raise SessionBrokenError(pkg_name, resp.text)
 
-             resp.raise_for_status()
 
-             jsondata = resp.json()
 
-             if not jsondata["success"]:
 
-                 raise SessionBrokenError("app launch failed",
 
-                                          jsondata["error"], jsondata["output"])
 
-             time.sleep(2.5)  # wait launch finished, maybe no need
 
-         pid = self._pidof_app(pkg_name)
 
-         if not pid:
 
-             raise SessionBrokenError(pkg_name)
 
-         return Session(self, pkg_name, pid)
 
-     def __getattr__(self, attr):
 
-         if attr in self._cached_plugins:
 
-             return self._cached_plugins[attr]
 
-         if attr.startswith('ext_'):
 
-             plugin_name = attr[4:]
 
-             if plugin_name not in self.__plugins:
 
-                 if plugin_name == 'xpath':
 
-                     import ssat_sdk.uiautomator2.ext.xpath as xpath
 
-                     xpath.init()
 
-                 else:
 
-                     raise ValueError(
 
-                         "plugin \"%s\" not registed" % plugin_name)
 
-             func, args, kwargs = self.__plugins[plugin_name]
 
-             obj = functools.partial(func, self)(*args, **kwargs)
 
-             self._cached_plugins[attr] = obj
 
-             return obj
 
-         try:
 
-             return getattr(self._default_session, attr)
 
-         except AttributeError:
 
-             raise AttributeError(
 
-                 "'Session or UIAutomatorServer' object has no attribute '%s'" %
 
-                 attr)
 
-     def __call__(self, **kwargs):
 
-         return self._default_session(**kwargs)
 
- class AdbShell(object):
 
-     def __init__(self, shellfn):
 
-         """
 
-         Args:
 
-             shellfn: Shell function
 
-         """
 
-         self.shell = shellfn
 
-     def wmsize(self):
 
-         """ get window size
 
-         Returns:
 
-             (width, height)
 
-         """
 
-         output, _ = self.shell("wm size")
 
-         m = re.match(r"Physical size: (\d+)x(\d+)", output)
 
-         if m:
 
-             return map(int, m.groups())
 
-         raise RuntimeError("Can't parse wm size: " + output)
 
-     def is_screen_on(self):
 
-         output, _ = self.shell("dumpsys power")
 
-         return 'mHoldingDisplaySuspendBlocker=true' in output
 
-     def keyevent(self, v):
 
-         """
 
-         Args:
 
-             v: eg home wakeup back
 
-         """
 
-         v = v.upper()
 
-         self.shell("input keyevent " + v)
 
-     def _adjust_pos(self, x, y, w=None, h=None):
 
-         if x < 1:
 
-             x = x * w
 
-         if y < 1:
 
-             y = y * h
 
-         return (x, y)
 
-     def swipe(self, x0, y0, x1, y1):
 
-         w, h = None, None
 
-         if x0 < 1 or y0 < 1 or x1 < 1 or y1 < 1:
 
-             w, h = self.wmsize()
 
-         x0, y0 = self._adjust_pos(x0, y0, w, h)
 
-         x1, y1 = self._adjust_pos(x1, y1, w, h)
 
-         self.shell("input swipe %d %d %d %d" % (x0, y0, x1, y1))
 
 
  |