install.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # coding: utf-8
  2. #
  3. # https://github.com/openatx/u2init#rest-api
  4. import re
  5. import time
  6. import requests
  7. import ssat_sdk.uiautomator2 as u2
  8. from logzero import logger
  9. import logging
  10. logger.setLevel(logging.DEBUG)
  11. # def oppo_install(self, apk_url):
  12. # m = hashlib.md5()
  13. # m.update(apk_url.encode('utf-8'))
  14. # key = m.hexdigest()[8:16]
  15. # dst = "/sdcard/atx-" + key + ".apk" # 安装包会在安装完后自动删除
  16. # d = uiautomator2.connect(self._device_url)
  17. # print(d.info, dst)
  18. # d.push_url(apk_url, dst)
  19. # # d.push("/Users/codeskyblue/workdir/atxcrawler/apks/ApiDemos-debug.apk", dst) # For debug
  20. # with d.session("com.coloros.filemanager") as s:
  21. # s(text=u"所有文件").click()
  22. # s(className="android.widget.ListView").scroll.to(textContains=key)
  23. # s(textContains=key).click()
  24. # btn_done = d(className="android.widget.Button", text=u"完成")
  25. # while not btn_done.exists:
  26. # s(text="继续安装旧版本").click_exists()
  27. # s(text="无视风险安装").click_exists()
  28. # s(text="重新安装").click_exists()
  29. # # 自动清除安装包和残留
  30. # if s(resourceId=
  31. # "com.android.packageinstaller:id/install_confirm_panel"
  32. # ).exists:
  33. # # 通过偏移点击<安装>
  34. # s(resourceId=
  35. # "com.android.packageinstaller:id/bottom_button_layout"
  36. # ).click(offset=(0.75, 0.5))
  37. # btn_done.click()
  38. # def vivo_install(self, apk_url):
  39. # print("Vivo detected, open u2 watchers")
  40. # u = uiautomator2.connect_wifi(self._device_url)
  41. # u.watcher("AUTO_INSTALL").when(
  42. # textMatches="好|安装", className="android.widget.Button").click()
  43. # u.watchers.watched = True
  44. # self.pm_install(apk_url)
  45. def install_apk(device_url, apk_url):
  46. """
  47. Args:
  48. device_url: udid, device_ip or serial(when usb connected)
  49. """
  50. psurl = pkgserv_addr(device_url)
  51. _http_install_apk(psurl, apk_url)
  52. def pkgserv_addr(device_url):
  53. """
  54. 根据设备线获取到atxserver的地址,然后再获取到u2init的地址,直接再决定是无线安装还是手动安装
  55. Returns:
  56. Package API url
  57. """
  58. logger.info("device url %s", device_url)
  59. d = u2.connect(device_url)
  60. devinfo = d.device_info
  61. serial = devinfo['serial']
  62. logger.info("serial %s, udid %s", serial, devinfo['udid'])
  63. aserver_url = devinfo.get(
  64. "serverUrl",
  65. "http://wifiphone.nie.netease.com") # TODO(atx-agent should udpate)
  66. logger.info("atx-server url %s", aserver_url)
  67. r = requests.get(
  68. aserver_url + "/devices/" + devinfo['udid'] + "/info").json()
  69. pvd = r.get('provider')
  70. if not pvd:
  71. logger.info("u2init not connected")
  72. return "http://" + d.wlan_ip + ":7912/packages"
  73. pkg_url = 'http://%s:%d/devices/%s/pkgs' % (pvd['ip'], pvd['port'], serial)
  74. logger.info("package url %s", pkg_url)
  75. return pkg_url
  76. def _http_install_apk(pkg_restapi, apk_url):
  77. """ install apk from u2init """
  78. resp = requests.post(pkg_restapi, data={"url": apk_url}).json()
  79. if not resp.get('success'):
  80. raise RuntimeError(resp.get('description'))
  81. id = resp['data']['id']
  82. logger.info("install id %s", id)
  83. _wait_installed(pkg_restapi + "/" + id)
  84. def _wait_installed(query_url):
  85. """ query until install finished """
  86. while True:
  87. data = safe_getjson(query_url)
  88. status = data.get('status')
  89. logger.debug("%s %s", status, data.get('description'))
  90. if status in ("success", "failure"):
  91. break
  92. time.sleep(1)
  93. def safe_getjson(url):
  94. """ get rest api """
  95. r = requests.get(url).json()
  96. desc = r.get('description')
  97. if not r.get('success'):
  98. raise RuntimeError(desc)
  99. return r.get('data')
  100. def main():
  101. # ins = U2Installer("http://localhost:17000")
  102. # apk_url = "http://arch.s3.netease.com/hzdev-appci/h35_trunk_RelWithDebInfo_373397.apk" # 1.8G large apk
  103. apk_url = "https://gohttp.nie.netease.com/tools/apks/qrcodescan-2.6.0-green.apk"
  104. # command line
  105. # python -m uiautomator2.cli install 10.240.174.43 http://arch.s3.netease.com/hzdev-appci/h35_trunk_RelWithDebInfo_373397.apk -s http://wifiphone.nie.netease.com
  106. # psurl = pkgserv_addr("3578298f")
  107. psurl = pkgserv_addr("10.242.163.69")
  108. install_apk(psurl, apk_url)
  109. if __name__ == '__main__':
  110. main()