| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 | # coding: utf-8## https://github.com/openatx/u2init#rest-apiimport reimport timeimport requestsimport ssat_sdk.uiautomator2 as u2from logzero import loggerimport logginglogger.setLevel(logging.DEBUG)# def oppo_install(self, apk_url):#     m = hashlib.md5()#     m.update(apk_url.encode('utf-8'))#     key = m.hexdigest()[8:16]#     dst = "/sdcard/atx-" + key + ".apk"  # 安装包会在安装完后自动删除#     d = uiautomator2.connect(self._device_url)#     print(d.info, dst)#     d.push_url(apk_url, dst)#     # d.push("/Users/codeskyblue/workdir/atxcrawler/apks/ApiDemos-debug.apk", dst) # For debug#     with d.session("com.coloros.filemanager") as s:#         s(text=u"所有文件").click()#         s(className="android.widget.ListView").scroll.to(textContains=key)#         s(textContains=key).click()#         btn_done = d(className="android.widget.Button", text=u"完成")#         while not btn_done.exists:#             s(text="继续安装旧版本").click_exists()#             s(text="无视风险安装").click_exists()#             s(text="重新安装").click_exists()#             # 自动清除安装包和残留#             if s(resourceId=#                  "com.android.packageinstaller:id/install_confirm_panel"#                  ).exists:#                 # 通过偏移点击<安装>#                 s(resourceId=#                   "com.android.packageinstaller:id/bottom_button_layout"#                   ).click(offset=(0.75, 0.5))#         btn_done.click()#     def vivo_install(self, apk_url):#         print("Vivo detected, open u2 watchers")#         u = uiautomator2.connect_wifi(self._device_url)#         u.watcher("AUTO_INSTALL").when(#             textMatches="好|安装", className="android.widget.Button").click()#         u.watchers.watched = True#         self.pm_install(apk_url)def install_apk(device_url, apk_url):    """    Args:        device_url: udid, device_ip or serial(when usb connected)    """    psurl = pkgserv_addr(device_url)    _http_install_apk(psurl, apk_url)def pkgserv_addr(device_url):    """    根据设备线获取到atxserver的地址,然后再获取到u2init的地址,直接再决定是无线安装还是手动安装        Returns:        Package API url    """    logger.info("device url %s", device_url)    d = u2.connect(device_url)    devinfo = d.device_info    serial = devinfo['serial']    logger.info("serial %s, udid %s", serial, devinfo['udid'])    aserver_url = devinfo.get(        "serverUrl",        "http://wifiphone.nie.netease.com")  # TODO(atx-agent should udpate)    logger.info("atx-server url %s", aserver_url)    r = requests.get(        aserver_url + "/devices/" + devinfo['udid'] + "/info").json()    pvd = r.get('provider')    if not pvd:        logger.info("u2init not connected")        return "http://" + d.wlan_ip + ":7912/packages"    pkg_url = 'http://%s:%d/devices/%s/pkgs' % (pvd['ip'], pvd['port'], serial)    logger.info("package url %s", pkg_url)    return pkg_urldef _http_install_apk(pkg_restapi, apk_url):    """ install apk from u2init """    resp = requests.post(pkg_restapi, data={"url": apk_url}).json()    if not resp.get('success'):        raise RuntimeError(resp.get('description'))    id = resp['data']['id']    logger.info("install id %s", id)    _wait_installed(pkg_restapi + "/" + id)def _wait_installed(query_url):    """ query until install finished """    while True:        data = safe_getjson(query_url)        status = data.get('status')        logger.debug("%s %s", status, data.get('description'))        if status in ("success", "failure"):            break        time.sleep(1)def safe_getjson(url):    """ get rest api """    r = requests.get(url).json()    desc = r.get('description')    if not r.get('success'):        raise RuntimeError(desc)    return r.get('data')def main():    # ins = U2Installer("http://localhost:17000")    # apk_url = "http://arch.s3.netease.com/hzdev-appci/h35_trunk_RelWithDebInfo_373397.apk"  # 1.8G large apk    apk_url = "https://gohttp.nie.netease.com/tools/apks/qrcodescan-2.6.0-green.apk"    # command line    # python -m uiautomator2.cli install 10.240.174.43 http://arch.s3.netease.com/hzdev-appci/h35_trunk_RelWithDebInfo_373397.apk -s http://wifiphone.nie.netease.com    # psurl = pkgserv_addr("3578298f")    psurl = pkgserv_addr("10.242.163.69")    install_apk(psurl, apk_url)if __name__ == '__main__':    main()
 |