123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- # coding: utf-8
- #
- # https://github.com/openatx/u2init#rest-api
- import re
- import time
- import requests
- import ssat_sdk.uiautomator2 as u2
- from logzero import logger
- import logging
- logger.setLevel(logging.DEBUG)
- # def oppo_install(self, apk_url):
- # m = hashlib.md5()
- # m.update(apk_url.encode('utf-8'))
- # key = m.hexdigest()[8:16]
- # dst = "/sdcard/atx-" + key + ".apk" # 安装包会在安装完后自动删除
- # d = uiautomator2.connect(self._device_url)
- # print(d.info, dst)
- # d.push_url(apk_url, dst)
- # # d.push("/Users/codeskyblue/workdir/atxcrawler/apks/ApiDemos-debug.apk", dst) # For debug
- # with d.session("com.coloros.filemanager") as s:
- # s(text=u"所有文件").click()
- # s(className="android.widget.ListView").scroll.to(textContains=key)
- # s(textContains=key).click()
- # btn_done = d(className="android.widget.Button", text=u"完成")
- # while not btn_done.exists:
- # s(text="继续安装旧版本").click_exists()
- # s(text="无视风险安装").click_exists()
- # s(text="重新安装").click_exists()
- # # 自动清除安装包和残留
- # if s(resourceId=
- # "com.android.packageinstaller:id/install_confirm_panel"
- # ).exists:
- # # 通过偏移点击<安装>
- # s(resourceId=
- # "com.android.packageinstaller:id/bottom_button_layout"
- # ).click(offset=(0.75, 0.5))
- # btn_done.click()
- # def vivo_install(self, apk_url):
- # print("Vivo detected, open u2 watchers")
- # u = uiautomator2.connect_wifi(self._device_url)
- # u.watcher("AUTO_INSTALL").when(
- # textMatches="好|安装", className="android.widget.Button").click()
- # u.watchers.watched = True
- # self.pm_install(apk_url)
- def install_apk(device_url, apk_url):
- """
- Args:
- device_url: udid, device_ip or serial(when usb connected)
- """
- psurl = pkgserv_addr(device_url)
- _http_install_apk(psurl, apk_url)
- def pkgserv_addr(device_url):
- """
- 根据设备线获取到atxserver的地址,然后再获取到u2init的地址,直接再决定是无线安装还是手动安装
-
- Returns:
- Package API url
- """
- logger.info("device url %s", device_url)
- d = u2.connect(device_url)
- devinfo = d.device_info
- serial = devinfo['serial']
- logger.info("serial %s, udid %s", serial, devinfo['udid'])
- aserver_url = devinfo.get(
- "serverUrl",
- "http://wifiphone.nie.netease.com") # TODO(atx-agent should udpate)
- logger.info("atx-server url %s", aserver_url)
- r = requests.get(
- aserver_url + "/devices/" + devinfo['udid'] + "/info").json()
- pvd = r.get('provider')
- if not pvd:
- logger.info("u2init not connected")
- return "http://" + d.wlan_ip + ":7912/packages"
- pkg_url = 'http://%s:%d/devices/%s/pkgs' % (pvd['ip'], pvd['port'], serial)
- logger.info("package url %s", pkg_url)
- return pkg_url
- def _http_install_apk(pkg_restapi, apk_url):
- """ install apk from u2init """
- resp = requests.post(pkg_restapi, data={"url": apk_url}).json()
- if not resp.get('success'):
- raise RuntimeError(resp.get('description'))
- id = resp['data']['id']
- logger.info("install id %s", id)
- _wait_installed(pkg_restapi + "/" + id)
- def _wait_installed(query_url):
- """ query until install finished """
- while True:
- data = safe_getjson(query_url)
- status = data.get('status')
- logger.debug("%s %s", status, data.get('description'))
- if status in ("success", "failure"):
- break
- time.sleep(1)
- def safe_getjson(url):
- """ get rest api """
- r = requests.get(url).json()
- desc = r.get('description')
- if not r.get('success'):
- raise RuntimeError(desc)
- return r.get('data')
- def main():
- # ins = U2Installer("http://localhost:17000")
- # apk_url = "http://arch.s3.netease.com/hzdev-appci/h35_trunk_RelWithDebInfo_373397.apk" # 1.8G large apk
- apk_url = "https://gohttp.nie.netease.com/tools/apks/qrcodescan-2.6.0-green.apk"
- # command line
- # python -m uiautomator2.cli install 10.240.174.43 http://arch.s3.netease.com/hzdev-appci/h35_trunk_RelWithDebInfo_373397.apk -s http://wifiphone.nie.netease.com
- # psurl = pkgserv_addr("3578298f")
- psurl = pkgserv_addr("10.242.163.69")
- install_apk(psurl, apk_url)
- if __name__ == '__main__':
- main()
|