runyaml.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. #
  4. """
  5. packageName: com.netease.cloudmusic # Optional
  6. activity: .MainActivity # Optional
  7. launch: true # Optional, default true
  8. close: true # Optional, close app when finished
  9. plugins:
  10. ocr: $ocr-api-url
  11. perf:
  12. interval: 1
  13. debug: true
  14. filename: perf.csv
  15. steps:
  16. - q: ~hell
  17. - q: ^hello w
  18. - q: =hello world
  19. - q: hello world
  20. - q: foo
  21. timeout: 10 # seconds
  22. - text: the input text
  23. - code: |
  24. time.sleep(10) # only "d, time" can be used
  25. watchers: # click when show up
  26. - q: ~跳过
  27. timeout: 0 # default
  28. """
  29. import argparse
  30. import time
  31. import re
  32. import logging
  33. import uiautomator2 as u2
  34. import logzero
  35. from logzero import logger
  36. class JSONRunner(object):
  37. __alias = {
  38. 'q': 'query',
  39. }
  40. def __init__(self, cnf):
  41. self._cnf = cnf
  42. self._title = cnf.get('title')
  43. self._pkg_name = cnf.get('packageName')
  44. self._activity = cnf.get('activity')
  45. self._launch = cnf.get('launch', True)
  46. self._close = cnf.get('close', True)
  47. self._clear = cnf.get('clear')
  48. self._steps = cnf.get('steps')
  49. self._watchers = cnf.get('watchers', [])
  50. self._udid = cnf.get('udid')
  51. self._plugins = cnf.get('plugins', {})
  52. d = self._d = u2.connect(self._udid)
  53. self.session = None
  54. # plugins
  55. if 'ocr' in self._plugins:
  56. logger.info("load plugin: ocr")
  57. import ssat_sdk.uiautomator2.ext.ocr as ocr
  58. ocr.API = self._plugins['ocr']
  59. logger.info("Use ocr plugin: %s", ocr.API)
  60. u2.plugin_register('ocr', ocr.OCR)
  61. if self._pkg_name and 'perf' in self._plugins:
  62. logger.info("load plugin: perf")
  63. import ssat_sdk.uiautomator2.ext.perf as perf
  64. u2.plugin_register('perf', perf.Perf, self._pkg_name)
  65. perfcnf = self._plugins['perf']
  66. d.ext_perf.interval = perfcnf.get('interval', 1.0)
  67. d.ext_perf.debug = perfcnf.get('debug', True)
  68. d.ext_perf.csv_output = perfcnf.get('filename', 'perf.csv')
  69. def _find_xpath(self, xpath, hierarchy):
  70. el = self.session.xpath(xpath, hierarchy)
  71. if el.exists:
  72. return el.wait().center()
  73. def _find_text(self, text, hierarchy):
  74. elements = self.session.xpath(
  75. '//*[re:match(name(), "\.(TextView|Button|ImageView)$")]',
  76. hierarchy).all()
  77. if text.startswith('~') or text.startswith('^'): # Regexp
  78. pattern = re.compile(text[1:] if text.startswith('~') else text)
  79. for el in elements:
  80. if el.text and pattern.match(el.text):
  81. logger.debug("find match: %s, %s", el.text, el.attrib)
  82. return el.center()
  83. elif text.startswith("="):
  84. text = text[1:]
  85. for el in elements:
  86. if el.text == text:
  87. logger.debug("find exactly match text: %s", el.text)
  88. return el.center()
  89. def _oper_input(self, text):
  90. logger.info("input text: %s", text)
  91. self.session.set_fastinput_ime(True)
  92. self.session.send_keys(text)
  93. self.session.press("enter")
  94. def _run_watchers(self, hierarchy):
  95. for kwargs in self._watchers:
  96. kwargs['timeout'] = kwargs.get('timeout', 0)
  97. if 'q' in kwargs:
  98. kwargs['query'] = kwargs.pop('q')
  99. if self._run_onestep(hierarchy, **kwargs):
  100. logger.info("trigger watcher: %s", kwargs)
  101. def _run_onestep(self,
  102. hierarchy,
  103. action='click',
  104. query=None,
  105. text=None,
  106. code=None,
  107. ocr=None,
  108. timeout=10):
  109. """
  110. Returns:
  111. bool: if step handled
  112. """
  113. if text:
  114. self._oper_input(text)
  115. return True
  116. if ocr:
  117. self._d.ext_ocr(ocr).click(timeout=timeout)
  118. return True
  119. # find element and click
  120. if query:
  121. pos = None
  122. if query.startswith("/"):
  123. pos = self._find_xpath(query, hierarchy)
  124. else:
  125. pos = self._find_text(query, hierarchy)
  126. if pos is None:
  127. if action == 'assertNotExists':
  128. return True
  129. return False
  130. if action == 'click':
  131. logger.info("click: %s", pos)
  132. self.session.click(*pos)
  133. return True
  134. elif action == 'assertExists':
  135. return True
  136. if code:
  137. logger.info("exec: |\n%s", code)
  138. exec(code, {'d': self.session, 'time': time})
  139. return True
  140. # raise NotImplementedError("only support click action")
  141. def _handle_step(self, **kwargs):
  142. retry_cnt = 0
  143. timeout = kwargs.pop('timeout', 10)
  144. deadline = time.time() + timeout
  145. # alias
  146. if 'q' in kwargs:
  147. kwargs['query'] = kwargs.pop('q')
  148. while retry_cnt == 0 or time.time() < deadline:
  149. retry_cnt += 1
  150. hierarchy = self.session.dump_hierarchy()
  151. self._run_watchers(hierarchy) # 处理弹框
  152. if self._run_onestep(hierarchy, **kwargs):
  153. break
  154. logger.debug("process %s, retry %d", kwargs, retry_cnt)
  155. time.sleep(.5)
  156. else:
  157. raise RuntimeError("element not found: %s", kwargs)
  158. def prepare_session(self):
  159. d = self._d
  160. if not self._launch or not self._pkg_name:
  161. self.session = self._d.session(self._pkg_name, attach=True)
  162. return
  163. if self._clear:
  164. d.app_clear(self._pkg_name)
  165. s = None
  166. if self._activity:
  167. d.app_start(self._pkg_name, self._activity, stop=True)
  168. s = d.session(self._pkg_name, attach=True)
  169. else:
  170. s = d.session(self._pkg_name)
  171. self.session = s
  172. def run(self):
  173. logger.info("test begins: %s", self._title)
  174. logger.info("launch app: %s", self._pkg_name)
  175. self.prepare_session()
  176. if 'perf' in self._plugins:
  177. # add report
  178. import easyhtmlreport as htmlreport
  179. hrp = htmlreport.HTMLReport(self._d)
  180. hrp.patch_click()
  181. self._d.ext_perf.csv_output = 'report/perf.csv'
  182. self._d.ext_perf.start()
  183. try:
  184. for step in self._steps:
  185. logger.info("==> %s", step)
  186. self._handle_step(**step)
  187. logger.info("Finished")
  188. finally:
  189. if 'perf' in self._plugins:
  190. self._d.ext_perf.stop()
  191. if self._launch and self._pkg_name and self._close:
  192. self._d.app_stop(self._pkg_name)
  193. def main(filename, debug=False, onlystep=False):
  194. if not debug:
  195. logzero.loglevel(logging.INFO)
  196. import yaml
  197. with open(filename, 'rb') as f:
  198. cnf = yaml.load(f)
  199. if onlystep:
  200. cnf['launch'] = False
  201. tc = JSONRunner(cnf)
  202. tc.run()
  203. # if __name__ == '__main__':
  204. # parser = argparse.ArgumentParser()
  205. # parser.add_argument(
  206. # '-d', '--debug', help='set loglevel to debug', action='store_true')
  207. # parser.add_argument('--step', help='run only step', action='store_true')
  208. # parser.add_argument('yamlfile')
  209. # args = parser.parse_args()
  210. # main(args.yamlfile, args.debug, onlystep=args.step)