123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191 |
- # coding: utf-8
- #
- from __future__ import absolute_import, print_function
- import base64
- import io
- import logging
- import re
- import time
- import warnings
- import xml.dom.minidom
- import requests
- import six
- from retry import retry
- from ssat_sdk.uiautomator2.exceptions import (NullPointerExceptionError,
- UiObjectNotFoundError)
- from ssat_sdk.uiautomator2.utils import Exists, U, check_alive, hooks_wrap, intersect
- _INPUT_METHOD_RE = re.compile(r'mCurMethodId=([-_./\w]+)')
- _fail_prompt_enabled = False
- def set_fail_prompt(enable=True):
- """
- When Element click through Exception, Prompt user to decide
- """
- global _fail_prompt_enabled
- _fail_prompt_enabled = enable
- def _failprompt(fn):
- def _inner(self, *args, **kwargs):
- if not _fail_prompt_enabled:
- return fn(self, *args, **kwargs)
- from ssat_sdk.uiautomator2 import messagebox
- try:
- return fn(self, *args, **kwargs)
- except UiObjectNotFoundError as e:
- result = messagebox.retryskipabort(str(e), 30)
- if result == 'retry':
- return _inner(self, *args, **kwargs)
- elif result == 'skip':
- return True
- else:
- raise
- return _inner
- class Session(object):
- __orientation = ( # device orientation
- (0, "natural", "n", 0), (1, "left", "l", 90),
- (2, "upsidedown", "u", 180), (3, "right", "r", 270))
- def __init__(self, server, pkg_name=None, pid=None):
- self.server = server
- self._pkg_name = pkg_name
- self._pid = pid
- self._jsonrpc = server.jsonrpc
- if pid and pkg_name:
- jsonrpc_url = server.path2url('/session/%d:%s/jsonrpc/0' %
- (pid, pkg_name))
- self._jsonrpc = server.setup_jsonrpc(jsonrpc_url)
- # hot fix for session missing shell function
- self.shell = self.server.shell
- def __repr__(self):
- if self._pid and self._pkg_name:
- return "<uiautomator2.Session pid:%d pkgname:%s>" % (
- self._pid, self._pkg_name)
- return super(Session, self).__repr__()
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- def implicitly_wait(self, seconds=None):
- """set default wait timeout
- Args:
- seconds(float): to wait element show up
- """
- if seconds is not None:
- self.server.wait_timeout = seconds
- return self.server.wait_timeout
- def close(self):
- """ close app """
- if self._pkg_name:
- self.server.app_stop(self._pkg_name)
- def running(self):
- """
- Check is session is running. return bool
- """
- if self._pid and self._pkg_name:
- ping_url = self.server.path2url('/session/%d:%s/ping' %
- (self._pid, self._pkg_name))
- return self.server._reqsess.get(ping_url).text.strip() == 'pong'
- # warnings.warn("pid and pkg_name is not set, ping will always return True", Warning, stacklevel=1)
- return True
- @property
- def jsonrpc(self):
- return self._jsonrpc
- @property
- def pos_rel2abs(self):
- size = []
- def convert(x, y):
- assert x >= 0
- assert y >= 0
- if (x < 1 or y < 1) and not size:
- size.extend(
- self.server.window_size()) # size will be [width, height]
- if x < 1:
- x = int(size[0] * x)
- if y < 1:
- y = int(size[1] * y)
- return x, y
- return convert
- def make_toast(self, text, duration=1.0):
- """ Show toast
- Args:
- text (str): text to show
- duration (float): seconds of display
- """
- warnings.warn(
- "Use d.toast.show(text, duration) instead.",
- DeprecationWarning,
- stacklevel=2)
- return self.jsonrpc.makeToast(text, duration * 1000)
- @property
- def toast(self):
- obj = self
- class Toast(object):
- def get_message(self,
- wait_timeout=10,
- cache_timeout=10,
- default=None):
- """
- Args:
- wait_timeout: seconds of max wait time if toast now show right now
- cache_timeout: return immediately if toast showed in recent $cache_timeout
- default: default messsage to return when no toast show up
- Returns:
- None or toast message
- """
- deadline = time.time() + wait_timeout
- while 1:
- message = obj.jsonrpc.getLastToast(cache_timeout * 1000)
- if message:
- return message
- if time.time() > deadline:
- return default
- time.sleep(.5)
- def reset(self):
- return obj.jsonrpc.clearLastToast()
- def show(self, text, duration=1.0):
- return obj.jsonrpc.makeToast(text, duration * 1000)
- return Toast()
- @check_alive
- def set_fastinput_ime(self, enable=True):
- """ Enable of Disable FastInputIME """
- fast_ime = 'com.github.uiautomator/.FastInputIME'
- if enable:
- self.server.shell(['ime', 'enable', fast_ime])
- self.server.shell(['ime', 'set', fast_ime])
- else:
- self.server.shell(['ime', 'disable', fast_ime])
- @check_alive
- def send_keys(self, text):
- """
- Raises:
- EnvironmentError
- """
- try:
- self.wait_fastinput_ime()
- btext = U(text).encode('utf-8')
- base64text = base64.b64encode(btext).decode()
- self.server.shell([
- 'am', 'broadcast', '-a', 'ADB_INPUT_TEXT', '--es', 'text',
- base64text
- ])
- return True
- except EnvironmentError:
- warnings.warn(
- "set FastInputIME failed. use \"d(focused=True).set_text instead\"",
- Warning)
- return self(focused=True).set_text(text)
- # warnings.warn("set FastInputIME failed. use \"adb shell input text\" instead", Warning)
- # self.server.adb_shell("input", "text", text.replace(" ", "%s"))
- @check_alive
- def send_action(self, code):
- """
- Simulate input method edito code
-
- Args:
- code (str or int): input method editor code
-
- Examples:
- send_action("search"), send_action(3)
-
- Refs:
- https://developer.android.com/reference/android/view/inputmethod/EditorInfo
- """
- self.wait_fastinput_ime()
- __alias = {
- "go": 2,
- "search": 3,
- "send": 4,
- "next": 5,
- "done": 6,
- "previous": 7,
- }
- if isinstance(code, six.string_types):
- code = __alias.get(code, code)
- self.server.shell(['am', 'broadcast', '-a', 'ADB_EDITOR_CODE', '--ei', 'code', str(code)])
- @check_alive
- def clear_text(self):
- """ clear text
- Raises:
- EnvironmentError
- """
- try:
- self.wait_fastinput_ime()
- self.server.shell(['am', 'broadcast', '-a', 'ADB_CLEAR_TEXT'])
- except EnvironmentError:
- # for Android simulator
- self(focused=True).clear_text()
- def wait_fastinput_ime(self, timeout=5.0):
- """ wait FastInputIME is ready
- Args:
- timeout(float): maxium wait time
-
- Raises:
- EnvironmentError
- """
- if not self.server.serial: # maybe simulator eg: genymotion, 海马玩模拟器
- raise EnvironmentError("Android simulator is not supported.")
- deadline = time.time() + timeout
- while time.time() < deadline:
- ime_id, shown = self.current_ime()
- if ime_id != "com.github.uiautomator/.FastInputIME":
- self.set_fastinput_ime(True)
- time.sleep(0.5)
- continue
- if shown:
- return True
- time.sleep(0.2)
- raise EnvironmentError("FastInputIME started failed")
- def current_ime(self):
- """ Current input method
- Returns:
- (method_id(str), shown(bool)
- Example output:
- ("com.github.uiautomator/.FastInputIME", True)
- """
- dim, _ = self.server.shell(['dumpsys', 'input_method'])
- m = _INPUT_METHOD_RE.search(dim)
- method_id = None if not m else m.group(1)
- shown = "mInputShown=true" in dim
- return (method_id, shown)
- def tap(self, x, y):
- """
- alias of click
- """
- self.click(x, y)
- @property
- def touch(self):
- """
- ACTION_DOWN: 0 ACTION_MOVE: 2
- touch.down(x, y)
- touch.move(x, y)
- touch.up()
- """
- ACTION_DOWN = 0
- ACTION_MOVE = 2
- ACTION_UP = 1
- obj = self
- class _Touch(object):
- def down(self, x, y):
- obj.jsonrpc.injectInputEvent(ACTION_DOWN, x, y, 0)
- def move(self, x, y):
- obj.jsonrpc.injectInputEvent(ACTION_MOVE, x, y, 0)
- def up(self, x=0, y=0):
- """ ACTION_UP x, y seems no use """
- obj.jsonrpc.injectInputEvent(ACTION_UP, x, y, 0)
- return _Touch()
- def click(self, x, y):
- """
- click position
- """
- x, y = self.pos_rel2abs(x, y)
- self._click(x, y)
- @hooks_wrap
- def _click(self, x, y):
- self.jsonrpc.click(x, y)
- if self.server.click_post_delay: # click code delay
- time.sleep(self.server.click_post_delay)
- def double_click(self, x, y, duration=0.1):
- """
- double click position
- """
- x, y = self.pos_rel2abs(x, y)
- self.touch.down(x, y)
- self.touch.up(x, y)
- time.sleep(duration)
- self.click(x, y) # use click last is for htmlreport
- def long_click(self, x, y, duration=None):
- '''long click at arbitrary coordinates.
- Args:
- duration (float): seconds of pressed
- '''
- if not duration:
- duration = 0.5
- x, y = self.pos_rel2abs(x, y)
- return self._long_click(x, y, duration)
- @hooks_wrap
- def _long_click(self, x, y, duration):
- self.touch.down(x, y)
- # self.touch.move(x, y) # maybe can fix
- time.sleep(duration)
- self.touch.up(x, y)
- return self
- def swipe(self, fx, fy, tx, ty, duration=0.1, steps=None):
- """
- Args:
- fx, fy: from position
- tx, ty: to position
- duration (float): duration
- steps: 1 steps is about 5ms, if set, duration will be ignore
- Documents:
- uiautomator use steps instead of duration
- As the document say: Each step execution is throttled to 5ms per step.
- Links:
- https://developer.android.com/reference/android/support/test/uiautomator/UiDevice.html#swipe%28int,%20int,%20int,%20int,%20int%29
- """
- rel2abs = self.pos_rel2abs
- fx, fy = rel2abs(fx, fy)
- tx, ty = rel2abs(tx, ty)
- if not steps:
- steps = int(duration * 200)
- self._swipe(fx, fy, tx, ty, steps)
- @hooks_wrap
- def _swipe(self, fx, fy, tx, ty, steps):
- return self.jsonrpc.swipe(fx, fy, tx, ty, steps)
- def swipe_points(self, points, duration=0.5):
- """
- Args:
- points: is point array containg at least one point object. eg [[200, 300], [210, 320]]
- duration: duration to inject between two points
- Links:
- https://developer.android.com/reference/android/support/test/uiautomator/UiDevice.html#swipe(android.graphics.Point[], int)
- """
- ppoints = []
- rel2abs = self.pos_rel2abs
- for p in points:
- x, y = rel2abs(p[0], p[1])
- ppoints.append(x)
- ppoints.append(y)
- return self.jsonrpc.swipePoints(ppoints, int(duration * 200))
- def drag(self, sx, sy, ex, ey, duration=0.5):
- '''Swipe from one point to another point.'''
- rel2abs = self.pos_rel2abs
- sx, sy = rel2abs(sx, sy)
- ex, ey = rel2abs(ex, ey)
- return self.jsonrpc.drag(sx, sy, ex, ey, int(duration * 200))
- @retry(
- (IOError, SyntaxError), delay=.5, tries=5, jitter=0.1,
- max_delay=1) # delay .5, .6, .7, .8 ...
- def screenshot(self, filename=None, format='pillow'):
- """
- Image format is JPEG
- Args:
- filename (str): saved filename
- format (string): used when filename is empty. one of "pillow" or "opencv"
- Raises:
- IOError, SyntaxError
- Examples:
- screenshot("saved.jpg")
- screenshot().save("saved.png")
- cv2.imwrite('saved.jpg', screenshot(format='opencv'))
- """
- r = requests.get(self.server.screenshot_uri, timeout=10)
- if filename:
- with open(filename, 'wb') as f:
- f.write(r.content)
- return filename
- elif format == 'pillow':
- from PIL import Image
- buff = io.BytesIO(r.content)
- return Image.open(buff)
- elif format == 'opencv':
- import cv2
- import numpy as np
- nparr = np.fromstring(r.content, np.uint8)
- return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
- elif format == 'raw':
- return r.content
- else:
- raise RuntimeError("Invalid format " + format)
- def dump_hierarchy(self, compressed=False, pretty=False):
- """
- Args:
- shell (bool): use "adb shell uiautomator dump" to get hierarchy
- pretty (bool): format xml
- Same as
- content = self.jsonrpc.dumpWindowHierarchy(compressed, None)
- But through GET /dump/hierarchy will be more robust
- when dumpHierarchy fails, the atx-agent will restart uiautomator again, then retry
- """
- res = self.server._reqsess.get(self.server.path2url("/dump/hierarchy"))
- try:
- res.raise_for_status()
- except requests.HTTPError:
- logging.warning("request error: %s", res.text)
- raise
- content = res.json().get("result")
- if pretty and "\n " not in content:
- xml_text = xml.dom.minidom.parseString(content.encode("utf-8"))
- content = U(xml_text.toprettyxml(indent=' '))
- return content
- def freeze_rotation(self, freeze=True):
- '''freeze or unfreeze the device rotation in current status.'''
- self.jsonrpc.freezeRotation(freeze)
- def press(self, key, meta=None, duration=1.0):
- """
- press key via name or key code. Supported key name includes:
- home, back, left, right, up, down, center, menu, search, enter,
- delete(or del), recent(recent apps), volume_up, volume_down,
- volume_mute, camera, power.
- """
- if isinstance(key, int):
- time.sleep(duration)
- return self.jsonrpc.pressKeyCode(
- key, meta) if meta else self.server.jsonrpc.pressKeyCode(key)
- else:
- time.sleep(duration)
- return self.jsonrpc.pressKey(key)
- def pressKeyList(self, keyList):
- for key in keyList:
- self.press(key)
- def pressKeyTimes(self, key, times=1, duration=1.0):
- for i in range(times):
- self.press(key, duration=duration)
- def screen_on(self):
- self.jsonrpc.wakeUp()
- def screen_off(self):
- self.jsonrpc.sleep()
- @property
- def orientation(self):
- '''
- orienting the devie to left/right or natural.
- left/l: rotation=90 , displayRotation=1
- right/r: rotation=270, displayRotation=3
- natural/n: rotation=0 , displayRotation=0
- upsidedown/u: rotation=180, displayRotation=2
- '''
- return self.__orientation[self.info["displayRotation"]][1]
- def set_orientation(self, value):
- '''setter of orientation property.'''
- for values in self.__orientation:
- if value in values:
- # can not set upside-down until api level 18.
- self.jsonrpc.setOrientation(values[1])
- break
- else:
- raise ValueError("Invalid orientation.")
- @property
- def last_traversed_text(self):
- '''get last traversed text. used in webview for highlighted text.'''
- return self.jsonrpc.getLastTraversedText()
- def clear_traversed_text(self):
- '''clear the last traversed text.'''
- self.jsonrpc.clearLastTraversedText()
- def open_notification(self):
- return self.jsonrpc.openNotification()
- def open_quick_settings(self):
- return self.jsonrpc.openQuickSettings()
- def exists(self, **kwargs):
- return self(**kwargs).exists
- @property
- def xpath(self):
- return self.server.ext_xpath
- def watcher(self, name):
- obj = self
- class Watcher(object):
- def __init__(self):
- self.__selectors = []
- @property
- def triggered(self):
- return obj.server.jsonrpc.hasWatcherTriggered(name)
- def remove(self):
- obj.server.jsonrpc.removeWatcher(name)
- def when(self, **kwargs):
- self.__selectors.append(Selector(**kwargs))
- return self
- def click(self, **kwargs):
- target = Selector(**kwargs) if kwargs else self.__selectors[-1]
- obj.server.jsonrpc.registerClickUiObjectWatcher(
- name, self.__selectors, target)
- def press(self, *keys):
- """
- key (str): on of
- ("home", "back", "left", "right", "up", "down", "center",
- "search", "enter", "delete", "del", "recent", "volume_up",
- "menu", "volume_down", "volume_mute", "camera", "power")
- """
- obj.server.jsonrpc.registerPressKeyskWatcher(
- name, self.__selectors, keys)
- return Watcher()
- @property
- def watchers(self):
- obj = self
- class Watchers(list):
- def __init__(self):
- for watcher in obj.server.jsonrpc.getWatchers():
- self.append(watcher)
- @property
- def triggered(self):
- return obj.server.jsonrpc.hasAnyWatcherTriggered()
- def remove(self, name=None):
- if name:
- obj.server.jsonrpc.removeWatcher(name)
- else:
- for name in self:
- obj.server.jsonrpc.removeWatcher(name)
- def reset(self):
- obj.server.jsonrpc.resetWatcherTriggers()
- return self
- def run(self):
- obj.server.jsonrpc.runWatchers()
- return self
- @property
- def watched(self):
- return obj.server.jsonrpc.hasWatchedOnWindowsChange()
- @watched.setter
- def watched(self, b):
- """
- Args:
- b: boolean
- """
- assert isinstance(b, bool)
- obj.server.jsonrpc.runWatchersOnWindowsChange(b)
- return Watchers()
- @property
- def info(self):
- return self.jsonrpc.deviceInfo()
- def __call__(self, **kwargs):
- return UiObject(self, Selector(**kwargs))
- class Selector(dict):
- """The class is to build parameters for UiSelector passed to Android device.
- """
- __fields = {
- "text": (0x01, None), # MASK_TEXT,
- "textContains": (0x02, None), # MASK_TEXTCONTAINS,
- "textMatches": (0x04, None), # MASK_TEXTMATCHES,
- "textStartsWith": (0x08, None), # MASK_TEXTSTARTSWITH,
- "className": (0x10, None), # MASK_CLASSNAME
- "classNameMatches": (0x20, None), # MASK_CLASSNAMEMATCHES
- "description": (0x40, None), # MASK_DESCRIPTION
- "descriptionContains": (0x80, None), # MASK_DESCRIPTIONCONTAINS
- "descriptionMatches": (0x0100, None), # MASK_DESCRIPTIONMATCHES
- "descriptionStartsWith": (0x0200, None), # MASK_DESCRIPTIONSTARTSWITH
- "checkable": (0x0400, False), # MASK_CHECKABLE
- "checked": (0x0800, False), # MASK_CHECKED
- "clickable": (0x1000, False), # MASK_CLICKABLE
- "longClickable": (0x2000, False), # MASK_LONGCLICKABLE,
- "scrollable": (0x4000, False), # MASK_SCROLLABLE,
- "enabled": (0x8000, False), # MASK_ENABLED,
- "focusable": (0x010000, False), # MASK_FOCUSABLE,
- "focused": (0x020000, False), # MASK_FOCUSED,
- "selected": (0x040000, False), # MASK_SELECTED,
- "packageName": (0x080000, None), # MASK_PACKAGENAME,
- "packageNameMatches": (0x100000, None), # MASK_PACKAGENAMEMATCHES,
- "resourceId": (0x200000, None), # MASK_RESOURCEID,
- "resourceIdMatches": (0x400000, None), # MASK_RESOURCEIDMATCHES,
- "index": (0x800000, 0), # MASK_INDEX,
- "instance": (0x01000000, 0) # MASK_INSTANCE,
- }
- __mask, __childOrSibling, __childOrSiblingSelector = "mask", "childOrSibling", "childOrSiblingSelector"
- def __init__(self, **kwargs):
- super(Selector, self).__setitem__(self.__mask, 0)
- super(Selector, self).__setitem__(self.__childOrSibling, [])
- super(Selector, self).__setitem__(self.__childOrSiblingSelector, [])
- for k in kwargs:
- self[k] = kwargs[k]
- def __str__(self):
- """ remove useless part for easily debugger """
- selector = self.copy()
- selector.pop('mask')
- for key in ('childOrSibling', 'childOrSiblingSelector'):
- if not selector.get(key):
- selector.pop(key)
- args = []
- for (k, v) in selector.items():
- args.append(k + '=' + repr(v))
- return 'Selector [' + ', '.join(args) + ']'
- def __setitem__(self, k, v):
- if k in self.__fields:
- super(Selector, self).__setitem__(U(k), U(v))
- super(Selector, self).__setitem__(
- self.__mask, self[self.__mask] | self.__fields[k][0])
- else:
- raise ReferenceError("%s is not allowed." % k)
- def __delitem__(self, k):
- if k in self.__fields:
- super(Selector, self).__delitem__(k)
- super(Selector, self).__setitem__(
- self.__mask, self[self.__mask] & ~self.__fields[k][0])
- def clone(self):
- kwargs = dict((k, self[k]) for k in self if k not in [
- self.__mask, self.__childOrSibling, self.__childOrSiblingSelector
- ])
- selector = Selector(**kwargs)
- for v in self[self.__childOrSibling]:
- selector[self.__childOrSibling].append(v)
- for s in self[self.__childOrSiblingSelector]:
- selector[self.__childOrSiblingSelector].append(s.clone())
- return selector
- def child(self, **kwargs):
- self[self.__childOrSibling].append("child")
- self[self.__childOrSiblingSelector].append(Selector(**kwargs))
- return self
- def sibling(self, **kwargs):
- self[self.__childOrSibling].append("sibling")
- self[self.__childOrSiblingSelector].append(Selector(**kwargs))
- return self
- def update_instance(self, i):
- # update inside child instance
- if self[self.__childOrSiblingSelector]:
- self[self.__childOrSiblingSelector][-1]['instance'] = i
- else:
- self['instance'] = i
- class UiObject(object):
- def __init__(self, session, selector):
- self.session = session
- self.selector = selector
- self.jsonrpc = session.jsonrpc
- @property
- def wait_timeout(self):
- return self.session.server.wait_timeout
- @property
- def exists(self):
- '''check if the object exists in current window.'''
- return Exists(self)
- @property
- @retry(
- UiObjectNotFoundError, delay=.5, tries=3, jitter=0.1, logger=logging)
- def info(self):
- '''ui object info.'''
- try:
- return self.jsonrpc.objInfo(self.selector)
- except Exception,e:
- print("error:info is none")
- return None
- @_failprompt
- def click(self, timeout=None, offset=None):
- """
- Click UI element.
- Args:
- timeout: seconds wait element show up
- offset: (xoff, yoff) default (0.5, 0.5) -> center
- The click method does the same logic as java uiautomator does.
- 1. waitForExists 2. get VisibleBounds center 3. send click event
- Raises:
- UiObjectNotFoundError
- """
- self.must_wait(timeout=timeout)
- x, y = self.center(offset=offset)
- # ext.htmlreport need to comment bellow code
- # if info['clickable']:
- # return self.jsonrpc.click(self.selector)
- self.session.click(x, y)
- delay = self.session.server.click_post_delay
- if delay:
- time.sleep(delay)
- def bounds(self):
- """
- Returns:
- left_top_x, left_top_y, right_bottom_x, right_bottom_y
- """
- info = self.info
- bounds = info.get('visibleBounds') or info.get("bounds")
- lx, ly, rx, ry = bounds['left'], bounds['top'], bounds['right'], bounds['bottom']
- return (lx, ly, rx, ry)
- def center(self, offset=(0.5, 0.5)):
- """
- Args:
- offset: optional, (x_off, y_off)
- (0, 0) means left-top, (0.5, 0.5) means middle(Default)
- Return:
- center point (x, y)
- """
- lx, ly, rx, ry = self.bounds()
- if offset is None:
- offset = (0.5, 0.5) # default center
- xoff, yoff = offset
- width, height = rx - lx, ry - ly
- x = lx + width * xoff
- y = ly + height * yoff
- return (x, y)
- def click_gone(self, maxretry=10, interval=1.0):
- """
- Click until element is gone
- Args:
- maxretry (int): max click times
- interval (float): sleep time between clicks
- Return:
- Bool if element is gone
- """
- self.click_exists()
- while maxretry > 0:
- time.sleep(interval)
- if not self.exists:
- return True
- self.click_exists()
- maxretry -= 1
- return False
- def click_exists(self, timeout=0):
- try:
- self.click(timeout=timeout)
- return True
- except UiObjectNotFoundError:
- return False
- def long_click(self, duration=None, timeout=None):
- """
- Args:
- duration (float): seconds of pressed
- timeout (float): seconds wait element show up
- """
- # if info['longClickable'] and not duration:
- # return self.jsonrpc.longClick(self.selector)
- self.must_wait(timeout=timeout)
- x, y = self.center()
- return self.session.long_click(x, y, duration)
- def drag_to(self, *args, **kwargs):
- duration = kwargs.pop('duration', 0.5)
- timeout = kwargs.pop('timeout', None)
- self.must_wait(timeout=timeout)
- steps = int(duration * 200)
- if len(args) >= 2 or "x" in kwargs or "y" in kwargs:
- def drag2xy(x, y):
- x, y = self.session.pos_rel2abs(x,
- y) # convert percent position
- return self.jsonrpc.dragTo(self.selector, x, y, steps)
- return drag2xy(*args, **kwargs)
- return self.jsonrpc.dragTo(self.selector, Selector(**kwargs), steps)
- def swipe(self, direction, steps=10):
- """
- Performs the swipe action on the UiObject.
- Swipe from center
- Args:
- direction (str): one of ("left", "right", "up", "down")
- steps (int): move steps, one step is about 5ms
- percent: float between [0, 1]
- Note: percent require API >= 18
- # assert 0 <= percent <= 1
- """
- assert direction in ("left", "right", "up", "down")
- self.must_wait()
- info = self.info
- bounds = info.get('visibleBounds') or info.get("bounds")
- lx, ly, rx, ry = bounds['left'], bounds['top'], bounds['right'], bounds['bottom']
- cx, cy = (lx + rx) // 2, (ly + ry) // 2
- if direction == 'up':
- self.session.swipe(cx, cy, cx, ly, steps=steps)
- elif direction == 'down':
- self.session.swipe(cx, cy, cx, ry - 1, steps=steps)
- elif direction == 'left':
- self.session.swipe(cx, cy, lx, cy, steps=steps)
- elif direction == 'right':
- self.session.swipe(cx, cy, rx - 1, cy, steps=steps)
- # return self.jsonrpc.swipe(self.selector, direction, percent, steps)
- def gesture(self, start1, start2, end1, end2, steps=100):
- '''
- perform two point gesture.
- Usage:
- d().gesture(startPoint1, startPoint2, endPoint1, endPoint2, steps)
- '''
- rel2abs = self.session.pos_rel2abs
- def point(x=0, y=0):
- x, y = rel2abs(x, y)
- return {"x": x, "y": y}
- def ctp(pt):
- return point(*pt) if type(pt) == tuple else pt
- s1, s2, e1, e2 = ctp(start1), ctp(start2), ctp(end1), ctp(end2)
- return self.jsonrpc.gesture(self.selector, s1, s2, e1, e2, steps)
- def pinch_in(self, percent=100, steps=50):
- return self.jsonrpc.pinchIn(self.selector, percent, steps)
- def pinch_out(self, percent=100, steps=50):
- return self.jsonrpc.pinchOut(self.selector, percent, steps)
- def wait(self, exists=True, timeout=None):
- """
- Wait until UI Element exists or gone
- Args:
- timeout (float): wait element timeout
- Example:
- d(text="Clock").wait()
- d(text="Settings").wait("gone") # wait until it's gone
- """
- if timeout is None:
- timeout = self.wait_timeout
- http_wait = timeout + 10
- if exists:
- try:
- return self.jsonrpc.waitForExists(
- self.selector, int(timeout * 1000), http_timeout=http_wait)
- except requests.ReadTimeout as e:
- warnings.warn("waitForExists readTimeout: %s" %
- e, RuntimeWarning)
- return self.exists()
- else:
- try:
- return self.jsonrpc.waitUntilGone(
- self.selector, int(timeout * 1000), http_timeout=http_wait)
- except requests.ReadTimeout as e:
- warnings.warn("waitForExists readTimeout: %s" %
- e, RuntimeWarning)
- return not self.exists()
- def wait_gone(self, timeout=None):
- """ wait until ui gone
- Args:
- timeout (float): wait element gone timeout
- Returns:
- bool if element gone
- """
- timeout = timeout or self.wait_timeout
- return self.wait(exists=False, timeout=timeout)
- def must_wait(self, exists=True, timeout=None):
- """ wait and if not found raise UiObjectNotFoundError """
- if not self.wait(exists, timeout):
- raise UiObjectNotFoundError({'code': -32002, 'method': 'wait'})
- def send_keys(self, text):
- """ alias of set_text """
- return self.set_text(text)
- def set_text(self, text, timeout=None):
- self.must_wait(timeout=timeout)
- if not text:
- return self.jsonrpc.clearTextField(self.selector)
- else:
- return self.jsonrpc.setText(self.selector, text)
- def get_text(self, timeout=None):
- """ get text from field """
- self.must_wait(timeout=timeout)
- return self.jsonrpc.getText(self.selector)
- def clear_text(self, timeout=None):
- self.must_wait(timeout=timeout)
- return self.set_text(None)
- def child(self, **kwargs):
- return UiObject(self.session, self.selector.clone().child(**kwargs))
- def sibling(self, **kwargs):
- return UiObject(self.session, self.selector.clone().sibling(**kwargs))
- child_selector, from_parent = child, sibling
- def child_by_text(self, txt, **kwargs):
- if "allow_scroll_search" in kwargs:
- allow_scroll_search = kwargs.pop("allow_scroll_search")
- name = self.jsonrpc.childByText(self.selector, Selector(**kwargs),
- txt, allow_scroll_search)
- else:
- name = self.jsonrpc.childByText(self.selector, Selector(**kwargs),
- txt)
- return UiObject(self.session, name)
- def child_by_description(self, txt, **kwargs):
- # need test
- if "allow_scroll_search" in kwargs:
- allow_scroll_search = kwargs.pop("allow_scroll_search")
- name = self.jsonrpc.childByDescription(self.selector,
- Selector(**kwargs), txt,
- allow_scroll_search)
- else:
- name = self.jsonrpc.childByDescription(self.selector,
- Selector(**kwargs), txt)
- return UiObject(self.session, name)
- def child_by_instance(self, inst, **kwargs):
- # need test
- return UiObject(self.session,
- self.jsonrpc.childByInstance(self.selector,
- Selector(**kwargs), inst))
- def parent(self):
- # android-uiautomator-server not implemented
- # In UIAutomator, UIObject2 has getParent() method
- # https://developer.android.com/reference/android/support/test/uiautomator/UiObject2.html
- raise NotImplementedError()
- # return UiObject(self.session, self.jsonrpc.getParent(self.selector))
- def __getitem__(self, index):
- """
- Raises:
- IndexError
- """
- if isinstance(self.selector, six.string_types):
- raise IndexError(
- "Index is not supported when UiObject returned by child_by_xxx")
- selector = self.selector.clone()
- selector.update_instance(index)
- return UiObject(self.session, selector)
- @property
- def count(self):
- return self.jsonrpc.count(self.selector)
- def __len__(self):
- return self.count
- def __iter__(self):
- obj, length = self, self.count
- class Iter(object):
- def __init__(self):
- self.index = -1
- def next(self):
- self.index += 1
- if self.index < length:
- return obj[self.index]
- else:
- raise StopIteration()
- __next__ = next
- return Iter()
- def right(self, **kwargs):
- def onrightof(rect1, rect2):
- left, top, right, bottom = intersect(rect1, rect2)
- return rect2["left"] - rect1["right"] if top < bottom else -1
- return self.__view_beside(onrightof, **kwargs)
- def left(self, **kwargs):
- def onleftof(rect1, rect2):
- left, top, right, bottom = intersect(rect1, rect2)
- return rect1["left"] - rect2["right"] if top < bottom else -1
- return self.__view_beside(onleftof, **kwargs)
- def up(self, **kwargs):
- def above(rect1, rect2):
- left, top, right, bottom = intersect(rect1, rect2)
- return rect1["top"] - rect2["bottom"] if left < right else -1
- return self.__view_beside(above, **kwargs)
- def down(self, **kwargs):
- def under(rect1, rect2):
- left, top, right, bottom = intersect(rect1, rect2)
- return rect2["top"] - rect1["bottom"] if left < right else -1
- return self.__view_beside(under, **kwargs)
- def __view_beside(self, onsideof, **kwargs):
- bounds = self.info["bounds"]
- min_dist, found = -1, None
- for ui in UiObject(self.session, Selector(**kwargs)):
- dist = onsideof(bounds, ui.info["bounds"])
- if dist >= 0 and (min_dist < 0 or dist < min_dist):
- min_dist, found = dist, ui
- return found
- @property
- def fling(self):
- """
- Args:
- dimention (str): one of "vert", "vertically", "vertical", "horiz", "horizental", "horizentally"
- action (str): one of "forward", "backward", "toBeginning", "toEnd", "to"
- """
- jsonrpc = self.jsonrpc
- selector = self.selector
- class _Fling(object):
- def __init__(self):
- self.vertical = True
- self.action = 'forward'
- def __getattr__(self, key):
- if key in ["horiz", "horizental", "horizentally"]:
- self.vertical = False
- return self
- if key in ['vert', 'vertically', 'vertical']:
- self.vertical = True
- return self
- if key in [
- "forward", "backward", "toBeginning", "toEnd", "to"
- ]:
- self.action = key
- return self
- raise ValueError("invalid prop %s" % key)
- def __call__(self, max_swipes=500, **kwargs):
- if self.action == "forward":
- return jsonrpc.flingForward(selector, self.vertical)
- elif self.action == "backward":
- return jsonrpc.flingBackward(selector, self.vertical)
- elif self.action == "toBeginning":
- return jsonrpc.flingToBeginning(selector, self.vertical,
- max_swipes)
- elif self.action == "toEnd":
- return jsonrpc.flingToEnd(selector, self.vertical,
- max_swipes)
- return _Fling()
- @property
- def scroll(self):
- """
- Args:
- dimention (str): one of "vert", "vertically", "vertical", "horiz", "horizental", "horizentally"
- action (str): one of "forward", "backward", "toBeginning", "toEnd", "to"
- """
- selector = self.selector
- jsonrpc = self.jsonrpc
- class _Scroll(object):
- def __init__(self):
- self.vertical = True
- self.action = 'forward'
- def __getattr__(self, key):
- if key in ["horiz", "horizental", "horizentally"]:
- self.vertical = False
- return self
- if key in ['vert', 'vertically', 'vertical']:
- self.vertical = True
- return self
- if key in [
- "forward", "backward", "toBeginning", "toEnd", "to"
- ]:
- self.action = key
- return self
- raise ValueError("invalid prop %s" % key)
- def __call__(self, steps=20, max_swipes=500, **kwargs):
- if self.action in ["forward", "backward"]:
- method = jsonrpc.scrollForward if self.action == "forward" else jsonrpc.scrollBackward
- return method(selector, self.vertical, steps)
- elif self.action == "toBeginning":
- return jsonrpc.scrollToBeginning(selector, self.vertical,
- max_swipes, steps)
- elif self.action == "toEnd":
- return jsonrpc.scrollToEnd(selector, self.vertical,
- max_swipes, steps)
- elif self.action == "to":
- return jsonrpc.scrollTo(selector, Selector(**kwargs),
- self.vertical)
- return _Scroll()
|