oss_resumable.c 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179
  1. #include "aos_log.h"
  2. #include "aos_define.h"
  3. #include "aos_util.h"
  4. #include "aos_string.h"
  5. #include "aos_status.h"
  6. #include "aos_crc64.h"
  7. #include "oss_auth.h"
  8. #include "oss_util.h"
  9. #include "oss_xml.h"
  10. #include "oss_api.h"
  11. #include "oss_resumable.h"
  12. int32_t oss_get_thread_num(oss_resumable_clt_params_t *clt_params)
  13. {
  14. if ((NULL == clt_params) || (clt_params->thread_num <= 0 || clt_params->thread_num > 1024)) {
  15. return 1;
  16. }
  17. return clt_params->thread_num;
  18. }
  19. void oss_get_upload_checkpoint_path(oss_resumable_clt_params_t *clt_params, const aos_string_t *filepath,
  20. aos_pool_t *pool, aos_string_t *checkpoint_path)
  21. {
  22. if ((NULL == checkpoint_path) || (NULL == clt_params) || (!clt_params->enable_checkpoint)) {
  23. return;
  24. }
  25. if (aos_is_null_string(&clt_params->checkpoint_path)) {
  26. int len = filepath->len + strlen(".ucp") + 1;
  27. char *buffer = (char *)aos_pcalloc(pool, len);
  28. apr_snprintf(buffer, len, "%.*s.ucp", filepath->len, filepath->data);
  29. aos_str_set(checkpoint_path , buffer);
  30. return;
  31. }
  32. checkpoint_path->data = clt_params->checkpoint_path.data;
  33. checkpoint_path->len = clt_params->checkpoint_path.len;
  34. }
  35. void oss_get_download_checkpoint_path(oss_resumable_clt_params_t *clt_params, const aos_string_t *filepath,
  36. aos_pool_t *pool, aos_string_t *checkpoint_path)
  37. {
  38. if ((NULL == checkpoint_path) || (NULL == clt_params) || (!clt_params->enable_checkpoint)) {
  39. return;
  40. }
  41. if (aos_is_null_string(&clt_params->checkpoint_path)) {
  42. int len = filepath->len + strlen(".dcp") + 1;
  43. char *buffer = (char *)aos_pcalloc(pool, len);
  44. apr_snprintf(buffer, len, "%.*s.dcp", filepath->len, filepath->data);
  45. aos_str_set(checkpoint_path , buffer);
  46. return;
  47. }
  48. checkpoint_path->data = clt_params->checkpoint_path.data;
  49. checkpoint_path->len = clt_params->checkpoint_path.len;
  50. }
  51. int oss_get_file_info(const aos_string_t *filepath, aos_pool_t *pool, apr_finfo_t *finfo)
  52. {
  53. apr_status_t s;
  54. char buf[256];
  55. apr_file_t *thefile;
  56. s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
  57. if (s != APR_SUCCESS) {
  58. aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  59. return s;
  60. }
  61. s = apr_file_info_get(finfo, APR_FINFO_NORM, thefile);
  62. if (s != APR_SUCCESS) {
  63. apr_file_close(thefile);
  64. aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  65. return s;
  66. }
  67. apr_file_close(thefile);
  68. return AOSE_OK;
  69. }
  70. int oss_does_file_exist(const aos_string_t *filepath, aos_pool_t *pool)
  71. {
  72. apr_status_t s;
  73. apr_file_t *thefile;
  74. s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
  75. if (s != APR_SUCCESS) {
  76. return AOS_FALSE;
  77. }
  78. apr_file_close(thefile);
  79. return AOS_TRUE;
  80. }
  81. int oss_open_checkpoint_file(aos_pool_t *pool, aos_string_t *checkpoint_path, oss_checkpoint_t *checkpoint)
  82. {
  83. apr_status_t s;
  84. apr_file_t *thefile;
  85. char buf[256];
  86. s = apr_file_open(&thefile, checkpoint_path->data, APR_CREATE | APR_WRITE, APR_UREAD | APR_UWRITE | APR_GREAD, pool);
  87. if (s == APR_SUCCESS) {
  88. checkpoint->thefile = thefile;
  89. } else {
  90. aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  91. }
  92. return s;
  93. }
  94. int oss_get_part_num(int64_t file_size, int64_t part_size)
  95. {
  96. int64_t num = 0;
  97. int64_t left = 0;
  98. left = (file_size % part_size == 0) ? 0 : 1;
  99. num = file_size / part_size + left;
  100. return (int)num;
  101. }
  102. void oss_build_parts(int64_t file_size, int64_t part_size, oss_checkpoint_part_t *parts)
  103. {
  104. int i = 0;
  105. for (; i * part_size < file_size; i++) {
  106. parts[i].index = i;
  107. parts[i].offset = i * part_size;
  108. parts[i].size = aos_min(part_size, (file_size - i * part_size));
  109. parts[i].completed = AOS_FALSE;
  110. }
  111. }
  112. void oss_build_thread_params(oss_thread_params_t *thd_params, int part_num,
  113. aos_pool_t *parent_pool, oss_request_options_t *options,
  114. aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath,
  115. aos_string_t *upload_id, oss_checkpoint_part_t *parts,
  116. oss_part_task_result_t *result)
  117. {
  118. int i = 0;
  119. aos_pool_t *subpool = NULL;
  120. oss_config_t *config = NULL;
  121. aos_http_controller_t *ctl;
  122. for (; i < part_num; i++) {
  123. aos_pool_create(&subpool, parent_pool);
  124. config = oss_config_create(subpool);
  125. aos_str_set(&config->endpoint, options->config->endpoint.data);
  126. aos_str_set(&config->access_key_id, options->config->access_key_id.data);
  127. aos_str_set(&config->access_key_secret, options->config->access_key_secret.data);
  128. config->is_cname = options->config->is_cname;
  129. ctl = aos_http_controller_create(subpool, 0);
  130. thd_params[i].options.config = config;
  131. thd_params[i].options.ctl = ctl;
  132. thd_params[i].options.pool = subpool;
  133. thd_params[i].bucket = bucket;
  134. thd_params[i].object = object;
  135. thd_params[i].filepath = filepath;
  136. thd_params[i].upload_id = upload_id;
  137. thd_params[i].part = parts + i;
  138. thd_params[i].result = result + i;
  139. thd_params[i].result->part = thd_params[i].part;
  140. }
  141. }
  142. void oss_destroy_thread_pool(oss_thread_params_t *thd_params, int part_num)
  143. {
  144. int i = 0;
  145. for (; i < part_num; i++) {
  146. aos_pool_destroy(thd_params[i].options.pool);
  147. }
  148. }
  149. void oss_set_task_tracker(oss_thread_params_t *thd_params, int part_num,
  150. apr_uint32_t *launched, apr_uint32_t *failed, apr_uint32_t *completed,
  151. apr_queue_t *failed_parts, apr_queue_t *completed_parts)
  152. {
  153. int i = 0;
  154. for (; i < part_num; i++) {
  155. thd_params[i].launched = launched;
  156. thd_params[i].failed = failed;
  157. thd_params[i].completed = completed;
  158. thd_params[i].failed_parts = failed_parts;
  159. thd_params[i].completed_parts = completed_parts;
  160. }
  161. }
  162. int oss_verify_checkpoint_md5(aos_pool_t *pool, const oss_checkpoint_t *checkpoint)
  163. {
  164. return AOS_TRUE;
  165. }
  166. void oss_build_upload_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, aos_string_t *file_path,
  167. apr_finfo_t *finfo, aos_string_t *upload_id, int64_t part_size)
  168. {
  169. int i = 0;
  170. checkpoint->cp_type = OSS_CP_UPLOAD;
  171. aos_str_set(&checkpoint->file_path, aos_pstrdup(pool, file_path));
  172. checkpoint->file_size = finfo->size;
  173. checkpoint->file_last_modified = finfo->mtime;
  174. aos_str_set(&checkpoint->upload_id, aos_pstrdup(pool, upload_id));
  175. checkpoint->part_size = part_size;
  176. for (; i * part_size < finfo->size; i++) {
  177. checkpoint->parts[i].index = i;
  178. checkpoint->parts[i].offset = i * part_size;
  179. checkpoint->parts[i].size = aos_min(part_size, (finfo->size - i * part_size));
  180. checkpoint->parts[i].completed = AOS_FALSE;
  181. aos_str_set(&checkpoint->parts[i].etag , "");
  182. }
  183. checkpoint->part_num = i;
  184. }
  185. void oss_build_download_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, aos_string_t *file_path,
  186. const char *object_name, int64_t object_size, const char *object_last_modified,
  187. const char *object_etag, int64_t part_size)
  188. {
  189. int i = 0;
  190. checkpoint->cp_type = OSS_CP_DOWNLOAD;
  191. checkpoint->thefile = NULL;
  192. aos_str_set(&checkpoint->file_path, aos_pstrdup(pool, file_path));
  193. aos_str_set(&checkpoint->object_name, object_name);
  194. checkpoint->object_size = object_size;
  195. aos_str_set(&checkpoint->object_last_modified, object_last_modified);
  196. aos_str_set(&checkpoint->object_etag, object_etag);
  197. checkpoint->part_size = part_size;
  198. for (; i * part_size < object_size; i++) {
  199. checkpoint->parts[i].index = i;
  200. checkpoint->parts[i].offset = i * part_size;
  201. checkpoint->parts[i].size = aos_min(part_size, (object_size - i * part_size));
  202. checkpoint->parts[i].completed = AOS_FALSE;
  203. aos_str_set(&checkpoint->parts[i].etag , "");
  204. }
  205. checkpoint->part_num = i;
  206. }
  207. int oss_dump_checkpoint(aos_pool_t *pool, const oss_checkpoint_t *checkpoint)
  208. {
  209. char *xml_body = NULL;
  210. apr_status_t s;
  211. char buf[256];
  212. apr_size_t len;
  213. // to xml
  214. xml_body = oss_build_checkpoint_xml(pool, checkpoint);
  215. if (NULL == xml_body) {
  216. return AOSE_OUT_MEMORY;
  217. }
  218. // truncate to empty
  219. s = apr_file_trunc(checkpoint->thefile, 0);
  220. if (s != APR_SUCCESS) {
  221. aos_error_log("apr_file_write failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  222. return AOSE_FILE_TRUNC_ERROR;
  223. }
  224. // write to file
  225. len = strlen(xml_body);
  226. s = apr_file_write(checkpoint->thefile, xml_body, &len);
  227. if (s != APR_SUCCESS) {
  228. aos_error_log("apr_file_write failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  229. return AOSE_FILE_WRITE_ERROR;
  230. }
  231. // flush file
  232. s = apr_file_flush(checkpoint->thefile);
  233. if (s != APR_SUCCESS) {
  234. aos_error_log("apr_file_flush failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  235. return AOSE_FILE_FLUSH_ERROR;
  236. }
  237. return AOSE_OK;
  238. }
  239. int oss_load_checkpoint(aos_pool_t *pool, const aos_string_t *filepath, oss_checkpoint_t *checkpoint)
  240. {
  241. apr_status_t s;
  242. char buf[256];
  243. apr_size_t len;
  244. apr_finfo_t finfo;
  245. char *xml_body = NULL;
  246. apr_file_t *thefile;
  247. // open file
  248. s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
  249. if (s != APR_SUCCESS) {
  250. aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  251. return AOSE_OPEN_FILE_ERROR;
  252. }
  253. // get file stat
  254. s = apr_file_info_get(&finfo, APR_FINFO_NORM, thefile);
  255. if (s != APR_SUCCESS) {
  256. aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  257. apr_file_close(thefile);
  258. return AOSE_FILE_INFO_ERROR;
  259. }
  260. xml_body = (char *)aos_palloc(pool, (apr_size_t)(finfo.size + 1));
  261. // read
  262. s = apr_file_read_full(thefile, xml_body, (apr_size_t)finfo.size, &len);
  263. if (s != APR_SUCCESS) {
  264. aos_error_log("apr_file_read_full failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  265. apr_file_close(thefile);
  266. return AOSE_FILE_READ_ERROR;
  267. }
  268. apr_file_close(thefile);
  269. xml_body[len] = '\0';
  270. // parse
  271. return oss_checkpoint_parse_from_body(pool, xml_body, checkpoint);
  272. }
  273. int oss_is_upload_checkpoint_valid(aos_pool_t *pool, oss_checkpoint_t *checkpoint, apr_finfo_t *finfo)
  274. {
  275. if (oss_verify_checkpoint_md5(pool, checkpoint) &&
  276. (checkpoint->cp_type == OSS_CP_UPLOAD) &&
  277. (checkpoint->file_size == finfo->size) &&
  278. (checkpoint->file_last_modified == finfo->mtime)) {
  279. return AOS_TRUE;
  280. }
  281. return AOS_FALSE;
  282. }
  283. int oss_is_download_checkpoint_valid(aos_pool_t *pool,
  284. oss_checkpoint_t *checkpoint, const char *object_name,
  285. int64_t object_size, const char *object_last_modified,
  286. const char *object_etag)
  287. {
  288. if (oss_verify_checkpoint_md5(pool, checkpoint) &&
  289. (checkpoint->cp_type == OSS_CP_DOWNLOAD) &&
  290. (checkpoint->object_size == object_size) &&
  291. !strcmp(checkpoint->object_last_modified.data, object_last_modified) &&
  292. !strcasecmp(checkpoint->object_etag.data, object_etag)) {
  293. return AOS_TRUE;
  294. }
  295. return AOS_FALSE;
  296. }
  297. void oss_update_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint,
  298. int32_t part_index, aos_string_t *etag, uint64_t crc64)
  299. {
  300. char *p = NULL;
  301. checkpoint->parts[part_index].completed = AOS_TRUE;
  302. p = apr_pstrdup(pool, etag->data);
  303. aos_str_set(&checkpoint->parts[part_index].etag, p);
  304. checkpoint->parts[part_index].crc64 = crc64;
  305. }
  306. void oss_get_checkpoint_todo_parts(oss_checkpoint_t *checkpoint, int *part_num, oss_checkpoint_part_t *parts)
  307. {
  308. int i = 0;
  309. int idx = 0;
  310. for (; i < checkpoint->part_num; i++) {
  311. if (!checkpoint->parts[i].completed) {
  312. parts[idx].index = checkpoint->parts[i].index;
  313. parts[idx].offset = checkpoint->parts[i].offset;
  314. parts[idx].size = checkpoint->parts[i].size;
  315. parts[idx].completed = checkpoint->parts[i].completed;
  316. parts[idx].crc64 = checkpoint->parts[i].crc64;
  317. idx++;
  318. }
  319. }
  320. *part_num = idx;
  321. }
  322. void * APR_THREAD_FUNC upload_part(apr_thread_t *thd, void *data)
  323. {
  324. aos_status_t *s = NULL;
  325. oss_thread_params_t *params = NULL;
  326. oss_upload_file_t *upload_file = NULL;
  327. aos_table_t *resp_headers = NULL;
  328. int part_num;
  329. char *etag;
  330. params = (oss_thread_params_t *)data;
  331. if (apr_atomic_read32(params->failed) > 0) {
  332. apr_atomic_inc32(params->launched);
  333. return NULL;
  334. }
  335. part_num = params->part->index + 1;
  336. upload_file = oss_create_upload_file(params->options.pool);
  337. aos_str_set(&upload_file->filename, params->filepath->data);
  338. upload_file->file_pos = params->part->offset;
  339. upload_file->file_last = params->part->offset + params->part->size;
  340. s = oss_upload_part_from_file(&params->options, params->bucket, params->object, params->upload_id,
  341. part_num, upload_file, &resp_headers);
  342. if (!aos_status_is_ok(s)) {
  343. apr_atomic_inc32(params->failed);
  344. params->result->s = s;
  345. apr_queue_push(params->failed_parts, params->result);
  346. return s;
  347. }
  348. etag = apr_pstrdup(params->options.pool, (char*)apr_table_get(resp_headers, "ETag"));
  349. aos_str_set(&params->result->etag, etag);
  350. apr_atomic_inc32(params->completed);
  351. apr_queue_push(params->completed_parts, params->result);
  352. return NULL;
  353. }
  354. aos_status_t *oss_resumable_upload_file_without_cp(oss_request_options_t *options,
  355. aos_string_t *bucket,
  356. aos_string_t *object,
  357. aos_string_t *filepath,
  358. aos_table_t *headers,
  359. aos_table_t *params,
  360. int32_t thread_num,
  361. int64_t part_size,
  362. apr_finfo_t *finfo,
  363. oss_progress_callback progress_callback,
  364. aos_table_t **resp_headers,
  365. aos_list_t *resp_body)
  366. {
  367. aos_pool_t *subpool = NULL;
  368. aos_pool_t *parent_pool = NULL;
  369. aos_status_t *s = NULL;
  370. aos_status_t *ret = NULL;
  371. aos_list_t completed_part_list;
  372. oss_complete_part_content_t *complete_content = NULL;
  373. aos_string_t upload_id;
  374. oss_checkpoint_part_t *parts;
  375. oss_part_task_result_t *results;
  376. oss_part_task_result_t *task_res;
  377. oss_thread_params_t *thd_params;
  378. aos_table_t *cb_headers = NULL;
  379. apr_thread_pool_t *thdp;
  380. apr_uint32_t launched = 0;
  381. apr_uint32_t failed = 0;
  382. apr_uint32_t completed = 0;
  383. apr_uint32_t total_num = 0;
  384. apr_queue_t *failed_parts;
  385. apr_queue_t *completed_parts;
  386. int64_t consume_bytes = 0;
  387. void *task_result;
  388. char *part_num_str;
  389. char *etag;
  390. int part_num = 0;
  391. int i = 0;
  392. int rv;
  393. // prepare
  394. parent_pool = options->pool;
  395. ret = aos_status_create(parent_pool);
  396. part_num = oss_get_part_num(finfo->size, part_size);
  397. parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * part_num);
  398. oss_build_parts(finfo->size, part_size, parts);
  399. results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num);
  400. thd_params = (oss_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_thread_params_t) * part_num);
  401. oss_build_thread_params(thd_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results);
  402. // init upload
  403. aos_pool_create(&subpool, parent_pool);
  404. options->pool = subpool;
  405. s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers);
  406. if (!aos_status_is_ok(s)) {
  407. s = aos_status_dup(parent_pool, s);
  408. aos_pool_destroy(subpool);
  409. options->pool = parent_pool;
  410. return s;
  411. }
  412. aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data));
  413. options->pool = parent_pool;
  414. aos_pool_destroy(subpool);
  415. // upload parts
  416. rv = apr_thread_pool_create(&thdp, 0, thread_num, parent_pool);
  417. if (APR_SUCCESS != rv) {
  418. aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL);
  419. return ret;
  420. }
  421. rv = apr_queue_create(&failed_parts, part_num, parent_pool);
  422. if (APR_SUCCESS != rv) {
  423. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  424. return ret;
  425. }
  426. rv = apr_queue_create(&completed_parts, part_num, parent_pool);
  427. if (APR_SUCCESS != rv) {
  428. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  429. return ret;
  430. }
  431. // launch
  432. oss_set_task_tracker(thd_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts);
  433. for (i = 0; i < part_num; i++) {
  434. apr_thread_pool_push(thdp, upload_part, thd_params + i, 0, NULL);
  435. }
  436. // wait until all tasks exit
  437. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  438. for ( ; total_num < (apr_uint32_t)part_num; ) {
  439. rv = apr_queue_trypop(completed_parts, &task_result);
  440. if (rv == APR_EINTR || rv == APR_EAGAIN) {
  441. apr_sleep(1000);
  442. } else if(rv == APR_EOF) {
  443. break;
  444. } else if(rv == APR_SUCCESS) {
  445. task_res = (oss_part_task_result_t*)task_result;
  446. if (NULL != progress_callback) {
  447. consume_bytes += task_res->part->size;
  448. progress_callback(consume_bytes, finfo->size);
  449. }
  450. }
  451. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  452. }
  453. // deal with left successful parts
  454. while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) {
  455. task_res = (oss_part_task_result_t*)task_result;
  456. if (NULL != progress_callback) {
  457. consume_bytes += task_res->part->size;
  458. progress_callback(consume_bytes, finfo->size);
  459. }
  460. }
  461. // failed
  462. if (apr_atomic_read32(&failed) > 0) {
  463. apr_queue_pop(failed_parts, &task_result);
  464. task_res = (oss_part_task_result_t*)task_result;
  465. s = aos_status_dup(parent_pool, task_res->s);
  466. oss_destroy_thread_pool(thd_params, part_num);
  467. return s;
  468. }
  469. // successful
  470. aos_pool_create(&subpool, parent_pool);
  471. aos_list_init(&completed_part_list);
  472. for (i = 0; i < part_num; i++) {
  473. complete_content = oss_create_complete_part_content(subpool);
  474. part_num_str = apr_psprintf(subpool, "%d", thd_params[i].part->index + 1);
  475. aos_str_set(&complete_content->part_number, part_num_str);
  476. etag = apr_pstrdup(subpool, thd_params[i].result->etag.data);
  477. aos_str_set(&complete_content->etag, etag);
  478. aos_list_add_tail(&complete_content->node, &completed_part_list);
  479. }
  480. oss_destroy_thread_pool(thd_params, part_num);
  481. // complete upload
  482. options->pool = subpool;
  483. if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) {
  484. cb_headers = aos_table_make(subpool, 2);
  485. apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK));
  486. if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) {
  487. apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR));
  488. }
  489. }
  490. s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id,
  491. &completed_part_list, cb_headers, NULL, resp_headers, resp_body);
  492. s = aos_status_dup(parent_pool, s);
  493. aos_pool_destroy(subpool);
  494. options->pool = parent_pool;
  495. return s;
  496. }
  497. aos_status_t *oss_resumable_upload_file_with_cp(oss_request_options_t *options,
  498. aos_string_t *bucket,
  499. aos_string_t *object,
  500. aos_string_t *filepath,
  501. aos_table_t *headers,
  502. aos_table_t *params,
  503. int32_t thread_num,
  504. int64_t part_size,
  505. aos_string_t *checkpoint_path,
  506. apr_finfo_t *finfo,
  507. oss_progress_callback progress_callback,
  508. aos_table_t **resp_headers,
  509. aos_list_t *resp_body)
  510. {
  511. aos_pool_t *subpool = NULL;
  512. aos_pool_t *parent_pool = NULL;
  513. aos_status_t *s = NULL;
  514. aos_status_t *ret = NULL;
  515. aos_list_t completed_part_list;
  516. oss_complete_part_content_t *complete_content = NULL;
  517. aos_string_t upload_id;
  518. oss_checkpoint_part_t *parts;
  519. oss_part_task_result_t *results;
  520. oss_part_task_result_t *task_res;
  521. oss_thread_params_t *thd_params;
  522. aos_table_t *cb_headers = NULL;
  523. apr_thread_pool_t *thdp;
  524. apr_uint32_t launched = 0;
  525. apr_uint32_t failed = 0;
  526. apr_uint32_t completed = 0;
  527. apr_uint32_t total_num = 0;
  528. apr_queue_t *failed_parts;
  529. apr_queue_t *completed_parts;
  530. oss_checkpoint_t *checkpoint = NULL;
  531. int need_init_upload = AOS_TRUE;
  532. int has_left_result = AOS_FALSE;
  533. int64_t consume_bytes = 0;
  534. void *task_result;
  535. char *part_num_str;
  536. int part_num = 0;
  537. int i = 0;
  538. int rv;
  539. // checkpoint
  540. parent_pool = options->pool;
  541. ret = aos_status_create(parent_pool);
  542. checkpoint = oss_create_checkpoint_content(parent_pool);
  543. if(oss_does_file_exist(checkpoint_path, parent_pool)) {
  544. if (AOSE_OK == oss_load_checkpoint(parent_pool, checkpoint_path, checkpoint) &&
  545. oss_is_upload_checkpoint_valid(parent_pool, checkpoint, finfo)) {
  546. aos_str_set(&upload_id, checkpoint->upload_id.data);
  547. need_init_upload = AOS_FALSE;
  548. } else {
  549. apr_file_remove(checkpoint_path->data, parent_pool);
  550. }
  551. }
  552. if (need_init_upload) {
  553. // init upload
  554. aos_pool_create(&subpool, parent_pool);
  555. options->pool = subpool;
  556. s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers);
  557. if (!aos_status_is_ok(s)) {
  558. s = aos_status_dup(parent_pool, s);
  559. aos_pool_destroy(subpool);
  560. options->pool = parent_pool;
  561. return s;
  562. }
  563. aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data));
  564. options->pool = parent_pool;
  565. aos_pool_destroy(subpool);
  566. // build checkpoint
  567. oss_build_upload_checkpoint(parent_pool, checkpoint, filepath, finfo, &upload_id, part_size);
  568. }
  569. rv = oss_open_checkpoint_file(parent_pool, checkpoint_path, checkpoint);
  570. if (rv != APR_SUCCESS) {
  571. aos_status_set(ret, rv, AOS_OPEN_FILE_ERROR_CODE, NULL);
  572. return ret;
  573. }
  574. // prepare
  575. ret = aos_status_create(parent_pool);
  576. parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * (checkpoint->part_num));
  577. oss_get_checkpoint_todo_parts(checkpoint, &part_num, parts);
  578. results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num);
  579. thd_params = (oss_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_thread_params_t) * part_num);
  580. oss_build_thread_params(thd_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results);
  581. // upload parts
  582. rv = apr_thread_pool_create(&thdp, 0, thread_num, parent_pool);
  583. if (APR_SUCCESS != rv) {
  584. aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL);
  585. return ret;
  586. }
  587. rv = apr_queue_create(&failed_parts, part_num, parent_pool);
  588. if (APR_SUCCESS != rv) {
  589. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  590. return ret;
  591. }
  592. rv = apr_queue_create(&completed_parts, part_num, parent_pool);
  593. if (APR_SUCCESS != rv) {
  594. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  595. return ret;
  596. }
  597. // launch
  598. oss_set_task_tracker(thd_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts);
  599. for (i = 0; i < part_num; i++) {
  600. apr_thread_pool_push(thdp, upload_part, thd_params + i, 0, NULL);
  601. }
  602. // wait until all tasks exit
  603. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  604. for ( ; total_num < (apr_uint32_t)part_num; ) {
  605. rv = apr_queue_trypop(completed_parts, &task_result);
  606. if (rv == APR_EINTR || rv == APR_EAGAIN) {
  607. apr_sleep(1000);
  608. } else if(rv == APR_EOF) {
  609. break;
  610. } else if(rv == APR_SUCCESS) {
  611. task_res = (oss_part_task_result_t*)task_result;
  612. oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag, 0);
  613. rv = oss_dump_checkpoint(parent_pool, checkpoint);
  614. if (rv != AOSE_OK) {
  615. int idx = task_res->part->index;
  616. aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL);
  617. apr_atomic_inc32(&failed);
  618. thd_params[idx].result->s = ret;
  619. apr_queue_push(failed_parts, thd_params[idx].result);
  620. }
  621. if (NULL != progress_callback) {
  622. consume_bytes += task_res->part->size;
  623. progress_callback(consume_bytes, finfo->size);
  624. }
  625. }
  626. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  627. }
  628. // deal with left successful parts
  629. while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) {
  630. task_res = (oss_part_task_result_t*)task_result;
  631. oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag, 0);
  632. consume_bytes += task_res->part->size;
  633. has_left_result = AOS_TRUE;
  634. }
  635. if (has_left_result) {
  636. rv = oss_dump_checkpoint(parent_pool, checkpoint);
  637. if (rv != AOSE_OK) {
  638. aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL);
  639. return ret;
  640. }
  641. if (NULL != progress_callback) {
  642. progress_callback(consume_bytes, finfo->size);
  643. }
  644. }
  645. apr_file_close(checkpoint->thefile);
  646. // failed
  647. if (apr_atomic_read32(&failed) > 0) {
  648. apr_queue_pop(failed_parts, &task_result);
  649. task_res = (oss_part_task_result_t*)task_result;
  650. s = aos_status_dup(parent_pool, task_res->s);
  651. oss_destroy_thread_pool(thd_params, part_num);
  652. return s;
  653. }
  654. // successful
  655. aos_pool_create(&subpool, parent_pool);
  656. aos_list_init(&completed_part_list);
  657. for (i = 0; i < checkpoint->part_num; i++) {
  658. complete_content = oss_create_complete_part_content(subpool);
  659. part_num_str = apr_psprintf(subpool, "%d", checkpoint->parts[i].index + 1);
  660. aos_str_set(&complete_content->part_number, part_num_str);
  661. aos_str_set(&complete_content->etag, checkpoint->parts[i].etag.data);
  662. aos_list_add_tail(&complete_content->node, &completed_part_list);
  663. }
  664. oss_destroy_thread_pool(thd_params, part_num);
  665. // complete upload
  666. options->pool = subpool;
  667. if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) {
  668. cb_headers = aos_table_make(subpool, 2);
  669. apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK));
  670. if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) {
  671. apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR));
  672. }
  673. }
  674. s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id,
  675. &completed_part_list, cb_headers, NULL, resp_headers, resp_body);
  676. s = aos_status_dup(parent_pool, s);
  677. aos_pool_destroy(subpool);
  678. options->pool = parent_pool;
  679. // remove chepoint file
  680. apr_file_remove(checkpoint_path->data, parent_pool);
  681. return s;
  682. }
  683. aos_status_t *oss_resumable_upload_file(oss_request_options_t *options,
  684. aos_string_t *bucket,
  685. aos_string_t *object,
  686. aos_string_t *filepath,
  687. aos_table_t *headers,
  688. aos_table_t *params,
  689. oss_resumable_clt_params_t *clt_params,
  690. oss_progress_callback progress_callback,
  691. aos_table_t **resp_headers,
  692. aos_list_t *resp_body)
  693. {
  694. int32_t thread_num = 0;
  695. int64_t part_size = 0;
  696. aos_string_t checkpoint_path;
  697. aos_pool_t *sub_pool;
  698. apr_finfo_t finfo;
  699. aos_status_t *s;
  700. int res;
  701. thread_num = oss_get_thread_num(clt_params);
  702. aos_pool_create(&sub_pool, options->pool);
  703. res = oss_get_file_info(filepath, sub_pool, &finfo);
  704. if (res != AOSE_OK) {
  705. aos_error_log("Open read file fail, filename:%s\n", filepath->data);
  706. s = aos_status_create(options->pool);
  707. aos_file_error_status_set(s, res);
  708. aos_pool_destroy(sub_pool);
  709. return s;
  710. }
  711. part_size = clt_params->part_size;
  712. oss_get_part_size(finfo.size, &part_size);
  713. if (NULL != clt_params && clt_params->enable_checkpoint) {
  714. oss_get_upload_checkpoint_path(clt_params, filepath, sub_pool, &checkpoint_path);
  715. s = oss_resumable_upload_file_with_cp(options, bucket, object, filepath, headers, params, thread_num,
  716. part_size, &checkpoint_path, &finfo, progress_callback, resp_headers, resp_body);
  717. } else {
  718. s = oss_resumable_upload_file_without_cp(options, bucket, object, filepath, headers, params, thread_num,
  719. part_size, &finfo, progress_callback, resp_headers, resp_body);
  720. }
  721. aos_pool_destroy(sub_pool);
  722. return s;
  723. }
  724. static void download_part(oss_request_options_t *options,
  725. const aos_string_t *bucket,
  726. const aos_string_t *object,
  727. oss_checkpoint_part_t *part,
  728. const aos_string_t *filepath,
  729. oss_part_task_result_t *result)
  730. {
  731. aos_status_t *s = NULL;
  732. aos_table_t *resp_headers = NULL;
  733. int rv;
  734. aos_http_request_t *req = NULL;
  735. aos_http_response_t *resp = NULL;
  736. aos_table_t *headers = NULL;
  737. aos_table_t *query_params = NULL;
  738. aos_file_buf_t *fb = NULL;
  739. apr_off_t offset = 0;
  740. headers = aos_table_create_if_null(options, headers, 0);
  741. query_params = aos_table_create_if_null(options, query_params, 0);
  742. oss_headers_add_range(options->pool, headers, part->offset, part->size);
  743. fb = aos_create_file_buf(options->pool);
  744. if ((rv = aos_open_file_for_write_notrunc(options->pool,
  745. filepath->data, fb)) != AOSE_OK) {
  746. aos_error_log("Open write file fail, filename:%s\n", filepath->data);
  747. result->s = aos_status_create(options->pool);
  748. aos_file_error_status_set(result->s, rv);
  749. return;
  750. }
  751. offset = part->offset;
  752. apr_file_seek(fb->file, APR_SET, &offset);
  753. oss_init_object_request(options, bucket, object, HTTP_GET,
  754. &req, query_params, headers, NULL, 0, &resp);
  755. oss_init_read_response_body_to_fb(fb, filepath, resp);
  756. s = oss_process_request(options, req, resp);
  757. if (!aos_status_is_ok(s)) {
  758. result->s = s;
  759. } else {
  760. // success
  761. const char *etag;
  762. oss_fill_read_response_header(resp, &resp_headers);
  763. etag = apr_table_get(resp_headers, "Etag");
  764. if (etag) {
  765. aos_str_set(&result->etag, apr_pstrdup(options->pool, etag));
  766. }
  767. result->crc64 = resp->crc64;
  768. result->s = s;
  769. }
  770. apr_file_close(fb->file);
  771. return;
  772. }
  773. void *APR_THREAD_FUNC download_part_thread(apr_thread_t *thd, void *data)
  774. {
  775. apr_queue_t *task_queue = (apr_queue_t *)data;
  776. while (1) {
  777. apr_status_t status;
  778. oss_thread_params_t *params;
  779. status = apr_queue_trypop(task_queue, (void **)&params);
  780. if (status != APR_SUCCESS)
  781. break;
  782. if (apr_atomic_read32(params->failed) > 0) {
  783. // skip unstarted parts if parts failure happened
  784. apr_queue_push(params->task_result_queue, NULL);
  785. } else {
  786. download_part(&params->options,
  787. params->bucket, params->object,
  788. params->part, params->filepath,
  789. params->result);
  790. apr_queue_push(params->task_result_queue, params->result);
  791. }
  792. }
  793. return NULL;
  794. }
  795. aos_status_t *oss_resumable_download_file_internal(oss_request_options_t *options,
  796. aos_string_t *bucket,
  797. aos_string_t *object,
  798. aos_string_t *filepath,
  799. aos_table_t *headers,
  800. aos_table_t *params,
  801. int32_t thread_num,
  802. int64_t part_size,
  803. aos_string_t *checkpoint_path,
  804. oss_progress_callback progress_callback,
  805. aos_table_t **resp_headers)
  806. {
  807. aos_status_t *s = NULL;
  808. int need_init_download = AOS_TRUE;
  809. oss_checkpoint_t *checkpoint = NULL;
  810. aos_table_t *head_resp_headers = NULL;
  811. aos_string_t tmp_filename;
  812. int i = 0;
  813. int part_num = 0;
  814. int64_t object_size = 0;
  815. const char *object_size_str = NULL;
  816. const char *object_last_modified = NULL;
  817. const char *object_etag = NULL;
  818. const char *crc64_str = NULL;
  819. oss_checkpoint_part_t *parts;
  820. oss_part_task_result_t *results;
  821. oss_part_task_result_t *task_res;
  822. oss_thread_params_t *thd_params;
  823. apr_uint32_t failed = 0;
  824. apr_queue_t *failed_parts;
  825. apr_queue_t *completed_parts;
  826. apr_queue_t *task_queue;
  827. apr_queue_t *task_result_queue;
  828. int64_t consume_bytes = 0;
  829. aos_file_buf_t *fb = NULL;
  830. apr_thread_t **thd_ids = NULL;
  831. int rv = AOSE_OK;
  832. oss_get_temporary_file_name(options->pool, filepath, &tmp_filename);
  833. // head object info
  834. s = oss_head_object(options, bucket, object, headers, &head_resp_headers);
  835. *resp_headers = head_resp_headers;
  836. if (!aos_status_is_ok(s))
  837. return s;
  838. object_last_modified = apr_table_get(head_resp_headers, "Last-Modified");
  839. object_etag = apr_table_get(head_resp_headers, "ETag");
  840. object_size_str = apr_table_get(head_resp_headers, OSS_CONTENT_LENGTH);
  841. crc64_str = apr_table_get(head_resp_headers, OSS_HASH_CRC64_ECMA);
  842. if (!object_last_modified || !object_etag || !object_size_str) {
  843. // Invalid http response header
  844. s = aos_status_create(options->pool);
  845. aos_status_set(s, AOSE_INTERNAL_ERROR, AOS_SERVER_ERROR_CODE, "Unexpected response header");
  846. return s;
  847. }
  848. object_size = aos_atoi64(object_size_str);
  849. // ensure part_num will not exceed OSS_MAX_PART_NUM
  850. if (part_size * OSS_MAX_PART_NUM < object_size) {
  851. part_size = (object_size + OSS_MAX_PART_NUM - 1) / OSS_MAX_PART_NUM;
  852. aos_warn_log("Part number larger than max limit, "
  853. "part size Changed to:%" APR_INT64_T_FMT "\n",
  854. part_size);
  855. }
  856. need_init_download = AOS_TRUE;
  857. checkpoint = oss_create_checkpoint_content(options->pool);
  858. if (checkpoint_path) {
  859. do {
  860. apr_finfo_t tmp_finfo;
  861. if (!oss_does_file_exist(checkpoint_path, options->pool))
  862. break;
  863. if (AOSE_OK != oss_load_checkpoint(options->pool, checkpoint_path, checkpoint))
  864. break;
  865. if (!oss_is_download_checkpoint_valid(options->pool, checkpoint, object->data,
  866. object_size, object_last_modified, object_etag))
  867. break;
  868. if (!oss_does_file_exist(&tmp_filename, options->pool))
  869. break;
  870. if (apr_stat(&tmp_finfo, tmp_filename.data, APR_FINFO_SIZE,
  871. options->pool) != APR_SUCCESS ||
  872. object_size != tmp_finfo.size)
  873. break;
  874. need_init_download = AOS_FALSE;
  875. } while (0);
  876. }
  877. if (need_init_download) {
  878. aos_debug_log("need init download\n");
  879. // build checkpoint
  880. oss_build_download_checkpoint(options->pool, checkpoint, filepath, object->data,
  881. object_size, object_last_modified, object_etag, part_size);
  882. }
  883. if (checkpoint_path) {
  884. if ((rv = oss_open_checkpoint_file(options->pool, checkpoint_path, checkpoint)) != APR_SUCCESS) {
  885. s = aos_status_create(options->pool);
  886. aos_status_set(s, rv, AOS_OPEN_FILE_ERROR_CODE, NULL);
  887. return s;
  888. }
  889. }
  890. // Open and truncate the tmp file.
  891. fb = aos_create_file_buf(options->pool);
  892. if ((rv = aos_open_file_for_write_notrunc(options->pool, tmp_filename.data, fb)) != AOSE_OK) {
  893. if (checkpoint->thefile)
  894. apr_file_close(checkpoint->thefile);
  895. aos_error_log("Open write file fail, filename:%s\n", tmp_filename.data);
  896. aos_file_error_status_set(s, rv);
  897. return s;
  898. }
  899. apr_file_trunc(fb->file, object_size);
  900. apr_file_close(fb->file);
  901. parts = (oss_checkpoint_part_t *)aos_palloc(options->pool, sizeof(*parts) * checkpoint->part_num);
  902. oss_get_checkpoint_todo_parts(checkpoint, &part_num, parts);
  903. results = (oss_part_task_result_t *)aos_palloc(options->pool, sizeof(*results) * part_num);
  904. thd_params = (oss_thread_params_t *)aos_palloc(options->pool, sizeof(*thd_params) * part_num);
  905. oss_build_thread_params(thd_params, part_num, options->pool, options, bucket, object, &tmp_filename, NULL, parts, results);
  906. thd_ids = (apr_thread_t **)aos_palloc(options->pool, sizeof(*thd_ids) * thread_num);
  907. aos_debug_log("object_size: %" APR_INT64_T_FMT ", total parts: %d, parts to download: %d\n",
  908. object_size, checkpoint->part_num, part_num);
  909. if ((rv = apr_queue_create(&failed_parts, part_num, options->pool)) != APR_SUCCESS ||
  910. (rv = apr_queue_create(&completed_parts, part_num, options->pool)) != APR_SUCCESS ||
  911. (rv = apr_queue_create(&task_queue, part_num, options->pool)) != APR_SUCCESS ||
  912. (rv = apr_queue_create(&task_result_queue, part_num, options->pool)) != APR_SUCCESS) {
  913. if (checkpoint->thefile)
  914. apr_file_close(checkpoint->thefile);
  915. s = aos_status_create(options->pool);
  916. aos_status_set(s, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  917. return s;
  918. }
  919. // launch
  920. for (i = 0; i < part_num; i++) {
  921. thd_params[i].failed = &failed;
  922. thd_params[i].task_result_queue = task_result_queue;
  923. apr_queue_push(task_queue, &thd_params[i]);
  924. }
  925. // download parts
  926. for (i = 0; i < thread_num; i++) {
  927. apr_thread_create(&thd_ids[i], NULL, download_part_thread, (void *)task_queue, options->pool);
  928. }
  929. // wait until all tasks exit
  930. for (i = 0 ; i < part_num; i++) {
  931. rv = apr_queue_pop(task_result_queue, (void **)&task_res);
  932. if (task_res && aos_status_is_ok(task_res->s) &&
  933. !strcasecmp(object_etag, task_res->etag.data)) {
  934. // completed part
  935. oss_update_checkpoint(options->pool, checkpoint, task_res->part->index,
  936. &task_res->etag, task_res->crc64);
  937. if (checkpoint->thefile) {
  938. if ((rv = oss_dump_checkpoint(options->pool, checkpoint)) != AOSE_OK) {
  939. aos_warn_log("failed to persist checkpoint file %s: %d\n",
  940. checkpoint_path->data, rv);
  941. }
  942. }
  943. apr_queue_push(completed_parts, task_res);
  944. if (progress_callback) {
  945. consume_bytes += task_res->part->size;
  946. progress_callback(consume_bytes, object_size);
  947. }
  948. } else if (task_res) {
  949. // failed parts
  950. apr_atomic_inc32(&failed);
  951. apr_queue_push(failed_parts, task_res);
  952. } else {
  953. // skipped parts
  954. }
  955. }
  956. if (checkpoint->thefile) {
  957. apr_file_close(checkpoint->thefile);
  958. checkpoint->thefile = NULL;
  959. }
  960. aos_debug_log("completed: %u, failed: %u, skipped: %u\n",
  961. apr_queue_size(completed_parts), apr_queue_size(failed_parts),
  962. part_num - apr_queue_size(completed_parts) - apr_queue_size(failed_parts));
  963. if (apr_atomic_read32(&failed) > 0) {
  964. // any parts failure
  965. apr_queue_pop(failed_parts, (void **)&task_res);
  966. s = aos_status_dup(options->pool, task_res->s);
  967. } else {
  968. // complete download for all parts
  969. rv = AOSE_OK;
  970. if (is_enable_crc(options) && crc64_str) {
  971. uint64_t iter_crc64 = 0;
  972. for (i = 0; i < checkpoint->part_num; i++) {
  973. iter_crc64 = aos_crc64_combine(iter_crc64, checkpoint->parts[i].crc64,
  974. checkpoint->parts[i].size);
  975. }
  976. if ((rv = oss_check_crc_consistent(iter_crc64, head_resp_headers, s)) != AOSE_OK) {
  977. if (checkpoint_path) {
  978. // checkpoint file should be removed here, otherwise retry downloads will
  979. // always be skipped and failed here in crc64 check
  980. if (apr_file_remove(checkpoint_path->data, options->pool) != APR_SUCCESS) {
  981. aos_warn_log("Failed to remove checkpoint file %s\n",
  982. checkpoint_path->data);
  983. }
  984. }
  985. apr_file_remove(tmp_filename.data, options->pool);
  986. }
  987. }
  988. if (rv == AOSE_OK) {
  989. if (apr_file_rename(tmp_filename.data, filepath->data, options->pool) != APR_SUCCESS) {
  990. s = aos_status_create(options->pool);
  991. aos_status_set(s, rv, AOS_RENAME_FILE_ERROR_CODE, NULL);
  992. } else {
  993. if (checkpoint_path) {
  994. apr_file_remove(checkpoint_path->data, options->pool);
  995. }
  996. }
  997. }
  998. }
  999. for (i = 0; i < thread_num; i++) {
  1000. apr_status_t retval;
  1001. apr_thread_join(&retval, thd_ids[i]);
  1002. }
  1003. oss_destroy_thread_pool(thd_params, part_num);
  1004. return s;
  1005. }
  1006. aos_status_t *oss_resumable_download_file(oss_request_options_t *options,
  1007. aos_string_t *bucket,
  1008. aos_string_t *object,
  1009. aos_string_t *filepath,
  1010. aos_table_t *headers,
  1011. aos_table_t *params,
  1012. oss_resumable_clt_params_t *clt_params,
  1013. oss_progress_callback progress_callback,
  1014. aos_table_t **resp_headers)
  1015. {
  1016. int32_t thread_num = 0;
  1017. int64_t part_size = 0;
  1018. aos_string_t checkpoint_path;
  1019. aos_pool_t *sub_pool;
  1020. aos_status_t *s;
  1021. thread_num = oss_get_thread_num(clt_params);
  1022. aos_pool_create(&sub_pool, options->pool);
  1023. part_size = clt_params->part_size;
  1024. if (NULL != clt_params && clt_params->enable_checkpoint) {
  1025. oss_get_download_checkpoint_path(clt_params, filepath, sub_pool, &checkpoint_path);
  1026. s = oss_resumable_download_file_internal(options, bucket, object, filepath, headers, params, thread_num,
  1027. part_size, &checkpoint_path, progress_callback, resp_headers);
  1028. } else {
  1029. s = oss_resumable_download_file_internal(options, bucket, object, filepath, headers, params, thread_num,
  1030. part_size, NULL, progress_callback, resp_headers);
  1031. }
  1032. aos_pool_destroy(sub_pool);
  1033. return s;
  1034. }