oss_resumable.c 28 KB


  1. #include "aos_log.h"
  2. #include "aos_define.h"
  3. #include "aos_util.h"
  4. #include "aos_string.h"
  5. #include "aos_status.h"
  6. #include "oss_auth.h"
  7. #include "oss_util.h"
  8. #include "oss_xml.h"
  9. #include "oss_api.h"
  10. #include "oss_resumable.h"
  11. int32_t oss_get_thread_num(oss_resumable_clt_params_t *clt_params)
  12. {
  13. if ((NULL == clt_params) || (clt_params->thread_num <= 0 || clt_params->thread_num > 1024)) {
  14. return 1;
  15. }
  16. return clt_params->thread_num;
  17. }
  18. void oss_get_checkpoint_path(oss_resumable_clt_params_t *clt_params, const aos_string_t *filepath,
  19. aos_pool_t *pool, aos_string_t *checkpoint_path)
  20. {
  21. if ((NULL == checkpoint_path) || (NULL == clt_params) || (!clt_params->enable_checkpoint)) {
  22. return;
  23. }
  24. if (aos_is_null_string(&clt_params->checkpoint_path)) {
  25. int len = filepath->len + strlen(".cp") + 1;
  26. char *buffer = (char *)aos_pcalloc(pool, len);
  27. apr_snprintf(buffer, len, "%.*s.cp", filepath->len, filepath->data);
  28. aos_str_set(checkpoint_path , buffer);
  29. return;
  30. }
  31. checkpoint_path->data = clt_params->checkpoint_path.data;
  32. checkpoint_path->len = clt_params->checkpoint_path.len;
  33. }
  34. int oss_get_file_info(const aos_string_t *filepath, aos_pool_t *pool, apr_finfo_t *finfo)
  35. {
  36. apr_status_t s;
  37. char buf[256];
  38. apr_file_t *thefile;
  39. s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
  40. if (s != APR_SUCCESS) {
  41. aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  42. return s;
  43. }
  44. s = apr_file_info_get(finfo, APR_FINFO_NORM, thefile);
  45. if (s != APR_SUCCESS) {
  46. apr_file_close(thefile);
  47. aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  48. return s;
  49. }
  50. apr_file_close(thefile);
  51. return AOSE_OK;
  52. }
  53. int oss_does_file_exist(const aos_string_t *filepath, aos_pool_t *pool)
  54. {
  55. apr_status_t s;
  56. apr_file_t *thefile;
  57. s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
  58. if (s != APR_SUCCESS) {
  59. return AOS_FALSE;
  60. }
  61. apr_file_close(thefile);
  62. return AOS_TRUE;
  63. }
  64. int oss_open_checkpoint_file(aos_pool_t *pool, aos_string_t *checkpoint_path, oss_checkpoint_t *checkpoint)
  65. {
  66. apr_status_t s;
  67. apr_file_t *thefile;
  68. char buf[256];
  69. s = apr_file_open(&thefile, checkpoint_path->data, APR_CREATE | APR_WRITE, APR_UREAD | APR_UWRITE | APR_GREAD, pool);
  70. if (s == APR_SUCCESS) {
  71. checkpoint->thefile = thefile;
  72. } else {
  73. aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  74. }
  75. return s;
  76. }
  77. int oss_get_part_num(int64_t file_size, int64_t part_size)
  78. {
  79. int64_t num = 0;
  80. int64_t left = 0;
  81. left = (file_size % part_size == 0) ? 0 : 1;
  82. num = file_size / part_size + left;
  83. return (int)num;
  84. }
  85. void oss_build_parts(int64_t file_size, int64_t part_size, oss_checkpoint_part_t *parts)
  86. {
  87. int i = 0;
  88. for (; i * part_size < file_size; i++) {
  89. parts[i].index = i;
  90. parts[i].offset = i * part_size;
  91. parts[i].size = aos_min(part_size, (file_size - i * part_size));
  92. parts[i].completed = AOS_FALSE;
  93. }
  94. }
  95. void oss_build_thread_params(oss_upload_thread_params_t *thr_params, int part_num,
  96. aos_pool_t *parent_pool, oss_request_options_t *options,
  97. aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath,
  98. aos_string_t *upload_id, oss_checkpoint_part_t *parts,
  99. oss_part_task_result_t *result)
  100. {
  101. int i = 0;
  102. aos_pool_t *subpool = NULL;
  103. oss_config_t *config = NULL;
  104. aos_http_controller_t *ctl;
  105. for (; i < part_num; i++) {
  106. aos_pool_create(&subpool, parent_pool);
  107. config = oss_config_create(subpool);
  108. aos_str_set(&config->endpoint, options->config->endpoint.data);
  109. aos_str_set(&config->access_key_id, options->config->access_key_id.data);
  110. aos_str_set(&config->access_key_secret, options->config->access_key_secret.data);
  111. config->is_cname = options->config->is_cname;
  112. ctl = aos_http_controller_create(subpool, 0);
  113. thr_params[i].options.config = config;
  114. thr_params[i].options.ctl = ctl;
  115. thr_params[i].options.pool = subpool;
  116. thr_params[i].bucket = bucket;
  117. thr_params[i].object = object;
  118. thr_params[i].filepath = filepath;
  119. thr_params[i].upload_id = upload_id;
  120. thr_params[i].part = parts + i;
  121. thr_params[i].result = result + i;
  122. thr_params[i].result->part = thr_params[i].part;
  123. }
  124. }
  125. void oss_destroy_thread_pool(oss_upload_thread_params_t *thr_params, int part_num)
  126. {
  127. int i = 0;
  128. for (; i < part_num; i++) {
  129. aos_pool_destroy(thr_params[i].options.pool);
  130. }
  131. }
  132. void oss_set_task_tracker(oss_upload_thread_params_t *thr_params, int part_num,
  133. apr_uint32_t *launched, apr_uint32_t *failed, apr_uint32_t *completed,
  134. apr_queue_t *failed_parts, apr_queue_t *completed_parts)
  135. {
  136. int i = 0;
  137. for (; i < part_num; i++) {
  138. thr_params[i].launched = launched;
  139. thr_params[i].failed = failed;
  140. thr_params[i].completed = completed;
  141. thr_params[i].failed_parts = failed_parts;
  142. thr_params[i].completed_parts = completed_parts;
  143. }
  144. }
  145. int oss_verify_checkpoint_md5(aos_pool_t *pool, const oss_checkpoint_t *checkpoint)
  146. {
  147. return AOS_TRUE;
  148. }
  149. void oss_build_upload_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, aos_string_t *file_path,
  150. apr_finfo_t *finfo, aos_string_t *upload_id, int64_t part_size)
  151. {
  152. int i = 0;
  153. checkpoint->cp_type = OSS_CP_UPLOAD;
  154. aos_str_set(&checkpoint->file_path, aos_pstrdup(pool, file_path));
  155. checkpoint->file_size = finfo->size;
  156. checkpoint->file_last_modified = finfo->mtime;
  157. aos_str_set(&checkpoint->upload_id, aos_pstrdup(pool, upload_id));
  158. checkpoint->part_size = part_size;
  159. for (; i * part_size < finfo->size; i++) {
  160. checkpoint->parts[i].index = i;
  161. checkpoint->parts[i].offset = i * part_size;
  162. checkpoint->parts[i].size = aos_min(part_size, (finfo->size - i * part_size));
  163. checkpoint->parts[i].completed = AOS_FALSE;
  164. aos_str_set(&checkpoint->parts[i].etag , "");
  165. }
  166. checkpoint->part_num = i;
  167. }
  168. int oss_dump_checkpoint(aos_pool_t *pool, const oss_checkpoint_t *checkpoint)
  169. {
  170. char *xml_body = NULL;
  171. apr_status_t s;
  172. char buf[256];
  173. apr_size_t len;
  174. // to xml
  175. xml_body = oss_build_checkpoint_xml(pool, checkpoint);
  176. if (NULL == xml_body) {
  177. return AOSE_OUT_MEMORY;
  178. }
  179. // truncate to empty
  180. s = apr_file_trunc(checkpoint->thefile, 0);
  181. if (s != APR_SUCCESS) {
  182. aos_error_log("apr_file_write fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  183. return AOSE_FILE_TRUNC_ERROR;
  184. }
  185. // write to file
  186. len = strlen(xml_body);
  187. s = apr_file_write(checkpoint->thefile, xml_body, &len);
  188. if (s != APR_SUCCESS) {
  189. aos_error_log("apr_file_write fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  190. return AOSE_FILE_WRITE_ERROR;
  191. }
  192. // flush file
  193. s = apr_file_flush(checkpoint->thefile);
  194. if (s != APR_SUCCESS) {
  195. aos_error_log("apr_file_flush fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  196. return AOSE_FILE_FLUSH_ERROR;
  197. }
  198. return AOSE_OK;
  199. }
  200. int oss_load_checkpoint(aos_pool_t *pool, const aos_string_t *filepath, oss_checkpoint_t *checkpoint)
  201. {
  202. apr_status_t s;
  203. char buf[256];
  204. apr_size_t len;
  205. apr_finfo_t finfo;
  206. char *xml_body = NULL;
  207. apr_file_t *thefile;
  208. // open file
  209. s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
  210. if (s != APR_SUCCESS) {
  211. aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  212. return AOSE_OPEN_FILE_ERROR;
  213. }
  214. // get file stat
  215. s = apr_file_info_get(&finfo, APR_FINFO_NORM, thefile);
  216. if (s != APR_SUCCESS) {
  217. aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  218. apr_file_close(thefile);
  219. return AOSE_FILE_INFO_ERROR;
  220. }
  221. xml_body = (char *)aos_palloc(pool, (apr_size_t)(finfo.size + 1));
  222. // read
  223. s = apr_file_read_full(thefile, xml_body, (apr_size_t)finfo.size, &len);
  224. if (s != APR_SUCCESS) {
  225. aos_error_log("apr_file_read_full fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
  226. apr_file_close(thefile);
  227. return AOSE_FILE_READ_ERROR;
  228. }
  229. apr_file_close(thefile);
  230. xml_body[len] = '\0';
  231. // parse
  232. return oss_checkpoint_parse_from_body(pool, xml_body, checkpoint);
  233. }
  234. int oss_is_upload_checkpoint_valid(aos_pool_t *pool, oss_checkpoint_t *checkpoint, apr_finfo_t *finfo)
  235. {
  236. if (oss_verify_checkpoint_md5(pool, checkpoint) &&
  237. (checkpoint->file_size == finfo->size) &&
  238. (checkpoint->file_last_modified == finfo->mtime)) {
  239. return AOS_TRUE;
  240. }
  241. return AOS_FALSE;
  242. }
  243. void oss_update_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, int32_t part_index, aos_string_t *etag)
  244. {
  245. char *p = NULL;
  246. checkpoint->parts[part_index].completed = AOS_TRUE;
  247. p = apr_pstrdup(pool, etag->data);
  248. aos_str_set(&checkpoint->parts[part_index].etag, p);
  249. }
  250. void oss_get_checkpoint_undo_parts(oss_checkpoint_t *checkpoint, int *part_num, oss_checkpoint_part_t *parts)
  251. {
  252. int i = 0;
  253. int idx = 0;
  254. for (; i < checkpoint->part_num; i++) {
  255. if (!checkpoint->parts[i].completed) {
  256. parts[idx].index = checkpoint->parts[i].index;
  257. parts[idx].offset = checkpoint->parts[i].offset;
  258. parts[idx].size = checkpoint->parts[i].size;
  259. parts[idx].completed = checkpoint->parts[i].completed;
  260. idx++;
  261. }
  262. }
  263. *part_num = idx;
  264. }
  265. void * APR_THREAD_FUNC upload_part(apr_thread_t *thd, void *data)
  266. {
  267. aos_status_t *s = NULL;
  268. oss_upload_thread_params_t *params = NULL;
  269. oss_upload_file_t *upload_file = NULL;
  270. aos_table_t *resp_headers = NULL;
  271. int part_num;
  272. char *etag;
  273. params = (oss_upload_thread_params_t *)data;
  274. if (apr_atomic_read32(params->failed) > 0) {
  275. apr_atomic_inc32(params->launched);
  276. return NULL;
  277. }
  278. part_num = params->part->index + 1;
  279. upload_file = oss_create_upload_file(params->options.pool);
  280. aos_str_set(&upload_file->filename, params->filepath->data);
  281. upload_file->file_pos = params->part->offset;
  282. upload_file->file_last = params->part->offset + params->part->size;
  283. s = oss_upload_part_from_file(&params->options, params->bucket, params->object, params->upload_id,
  284. part_num, upload_file, &resp_headers);
  285. if (!aos_status_is_ok(s)) {
  286. apr_atomic_inc32(params->failed);
  287. params->result->s = s;
  288. apr_queue_push(params->failed_parts, params->result);
  289. return s;
  290. }
  291. etag = apr_pstrdup(params->options.pool, (char*)apr_table_get(resp_headers, "ETag"));
  292. aos_str_set(&params->result->etag, etag);
  293. apr_atomic_inc32(params->completed);
  294. apr_queue_push(params->completed_parts, params->result);
  295. return NULL;
  296. }
  297. aos_status_t *oss_resumable_upload_file_without_cp(oss_request_options_t *options,
  298. aos_string_t *bucket,
  299. aos_string_t *object,
  300. aos_string_t *filepath,
  301. aos_table_t *headers,
  302. aos_table_t *params,
  303. int32_t thread_num,
  304. int64_t part_size,
  305. apr_finfo_t *finfo,
  306. oss_progress_callback progress_callback,
  307. aos_table_t **resp_headers,
  308. aos_list_t *resp_body)
  309. {
  310. aos_pool_t *subpool = NULL;
  311. aos_pool_t *parent_pool = NULL;
  312. aos_status_t *s = NULL;
  313. aos_status_t *ret = NULL;
  314. aos_list_t completed_part_list;
  315. oss_complete_part_content_t *complete_content = NULL;
  316. aos_string_t upload_id;
  317. oss_checkpoint_part_t *parts;
  318. oss_part_task_result_t *results;
  319. oss_part_task_result_t *task_res;
  320. oss_upload_thread_params_t *thr_params;
  321. aos_table_t *cb_headers = NULL;
  322. apr_thread_pool_t *thrp;
  323. apr_uint32_t launched = 0;
  324. apr_uint32_t failed = 0;
  325. apr_uint32_t completed = 0;
  326. apr_uint32_t total_num = 0;
  327. apr_queue_t *failed_parts;
  328. apr_queue_t *completed_parts;
  329. int64_t consume_bytes = 0;
  330. void *task_result;
  331. char *part_num_str;
  332. char *etag;
  333. int part_num = 0;
  334. int i = 0;
  335. int rv;
  336. // prepare
  337. parent_pool = options->pool;
  338. ret = aos_status_create(parent_pool);
  339. part_num = oss_get_part_num(finfo->size, part_size);
  340. parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * part_num);
  341. oss_build_parts(finfo->size, part_size, parts);
  342. results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num);
  343. thr_params = (oss_upload_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_upload_thread_params_t) * part_num);
  344. oss_build_thread_params(thr_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results);
  345. // init upload
  346. aos_pool_create(&subpool, parent_pool);
  347. options->pool = subpool;
  348. s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers);
  349. if (!aos_status_is_ok(s)) {
  350. s = aos_status_dup(parent_pool, s);
  351. aos_pool_destroy(subpool);
  352. options->pool = parent_pool;
  353. return s;
  354. }
  355. aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data));
  356. options->pool = parent_pool;
  357. aos_pool_destroy(subpool);
  358. // upload parts
  359. rv = apr_thread_pool_create(&thrp, 0, thread_num, parent_pool);
  360. if (APR_SUCCESS != rv) {
  361. aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL);
  362. return ret;
  363. }
  364. rv = apr_queue_create(&failed_parts, part_num, parent_pool);
  365. if (APR_SUCCESS != rv) {
  366. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  367. return ret;
  368. }
  369. rv = apr_queue_create(&completed_parts, part_num, parent_pool);
  370. if (APR_SUCCESS != rv) {
  371. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  372. return ret;
  373. }
  374. // launch
  375. oss_set_task_tracker(thr_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts);
  376. for (i = 0; i < part_num; i++) {
  377. apr_thread_pool_push(thrp, upload_part, thr_params + i, 0, NULL);
  378. }
  379. // wait until all tasks exit
  380. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  381. for ( ; total_num < (apr_uint32_t)part_num; ) {
  382. rv = apr_queue_trypop(completed_parts, &task_result);
  383. if (rv == APR_EINTR || rv == APR_EAGAIN) {
  384. apr_sleep(1000);
  385. } else if(rv == APR_EOF) {
  386. break;
  387. } else if(rv == APR_SUCCESS) {
  388. task_res = (oss_part_task_result_t*)task_result;
  389. if (NULL != progress_callback) {
  390. consume_bytes += task_res->part->size;
  391. progress_callback(consume_bytes, finfo->size);
  392. }
  393. }
  394. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  395. }
  396. // deal with left successful parts
  397. while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) {
  398. task_res = (oss_part_task_result_t*)task_result;
  399. if (NULL != progress_callback) {
  400. consume_bytes += task_res->part->size;
  401. progress_callback(consume_bytes, finfo->size);
  402. }
  403. }
  404. // failed
  405. if (apr_atomic_read32(&failed) > 0) {
  406. apr_queue_pop(failed_parts, &task_result);
  407. task_res = (oss_part_task_result_t*)task_result;
  408. s = aos_status_dup(parent_pool, task_res->s);
  409. oss_destroy_thread_pool(thr_params, part_num);
  410. return s;
  411. }
  412. // successful
  413. aos_pool_create(&subpool, parent_pool);
  414. aos_list_init(&completed_part_list);
  415. for (i = 0; i < part_num; i++) {
  416. complete_content = oss_create_complete_part_content(subpool);
  417. part_num_str = apr_psprintf(subpool, "%d", thr_params[i].part->index + 1);
  418. aos_str_set(&complete_content->part_number, part_num_str);
  419. etag = apr_pstrdup(subpool, thr_params[i].result->etag.data);
  420. aos_str_set(&complete_content->etag, etag);
  421. aos_list_add_tail(&complete_content->node, &completed_part_list);
  422. }
  423. oss_destroy_thread_pool(thr_params, part_num);
  424. // complete upload
  425. options->pool = subpool;
  426. if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) {
  427. cb_headers = aos_table_make(subpool, 2);
  428. apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK));
  429. if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) {
  430. apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR));
  431. }
  432. }
  433. s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id,
  434. &completed_part_list, cb_headers, NULL, resp_headers, resp_body);
  435. s = aos_status_dup(parent_pool, s);
  436. aos_pool_destroy(subpool);
  437. options->pool = parent_pool;
  438. return s;
  439. }
  440. aos_status_t *oss_resumable_upload_file_with_cp(oss_request_options_t *options,
  441. aos_string_t *bucket,
  442. aos_string_t *object,
  443. aos_string_t *filepath,
  444. aos_table_t *headers,
  445. aos_table_t *params,
  446. int32_t thread_num,
  447. int64_t part_size,
  448. aos_string_t *checkpoint_path,
  449. apr_finfo_t *finfo,
  450. oss_progress_callback progress_callback,
  451. aos_table_t **resp_headers,
  452. aos_list_t *resp_body)
  453. {
  454. aos_pool_t *subpool = NULL;
  455. aos_pool_t *parent_pool = NULL;
  456. aos_status_t *s = NULL;
  457. aos_status_t *ret = NULL;
  458. aos_list_t completed_part_list;
  459. oss_complete_part_content_t *complete_content = NULL;
  460. aos_string_t upload_id;
  461. oss_checkpoint_part_t *parts;
  462. oss_part_task_result_t *results;
  463. oss_part_task_result_t *task_res;
  464. oss_upload_thread_params_t *thr_params;
  465. aos_table_t *cb_headers = NULL;
  466. apr_thread_pool_t *thrp;
  467. apr_uint32_t launched = 0;
  468. apr_uint32_t failed = 0;
  469. apr_uint32_t completed = 0;
  470. apr_uint32_t total_num = 0;
  471. apr_queue_t *failed_parts;
  472. apr_queue_t *completed_parts;
  473. oss_checkpoint_t *checkpoint = NULL;
  474. int need_init_upload = AOS_TRUE;
  475. int has_left_result = AOS_FALSE;
  476. int64_t consume_bytes = 0;
  477. void *task_result;
  478. char *part_num_str;
  479. int part_num = 0;
  480. int i = 0;
  481. int rv;
  482. // checkpoint
  483. parent_pool = options->pool;
  484. ret = aos_status_create(parent_pool);
  485. checkpoint = oss_create_checkpoint_content(parent_pool);
  486. if(oss_does_file_exist(checkpoint_path, parent_pool)) {
  487. if (AOSE_OK == oss_load_checkpoint(parent_pool, checkpoint_path, checkpoint) &&
  488. oss_is_upload_checkpoint_valid(parent_pool, checkpoint, finfo)) {
  489. aos_str_set(&upload_id, checkpoint->upload_id.data);
  490. need_init_upload = AOS_FALSE;
  491. } else {
  492. apr_file_remove(checkpoint_path->data, parent_pool);
  493. }
  494. }
  495. if (need_init_upload) {
  496. // init upload
  497. aos_pool_create(&subpool, parent_pool);
  498. options->pool = subpool;
  499. s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers);
  500. if (!aos_status_is_ok(s)) {
  501. s = aos_status_dup(parent_pool, s);
  502. aos_pool_destroy(subpool);
  503. options->pool = parent_pool;
  504. return s;
  505. }
  506. aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data));
  507. options->pool = parent_pool;
  508. aos_pool_destroy(subpool);
  509. // build checkpoint
  510. oss_build_upload_checkpoint(parent_pool, checkpoint, filepath, finfo, &upload_id, part_size);
  511. }
  512. rv = oss_open_checkpoint_file(parent_pool, checkpoint_path, checkpoint);
  513. if (rv != APR_SUCCESS) {
  514. aos_status_set(ret, rv, AOS_OPEN_FILE_ERROR_CODE, NULL);
  515. return ret;
  516. }
  517. // prepare
  518. ret = aos_status_create(parent_pool);
  519. parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * (checkpoint->part_num));
  520. oss_get_checkpoint_undo_parts(checkpoint, &part_num, parts);
  521. results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num);
  522. thr_params = (oss_upload_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_upload_thread_params_t) * part_num);
  523. oss_build_thread_params(thr_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results);
  524. // upload parts
  525. rv = apr_thread_pool_create(&thrp, 0, thread_num, parent_pool);
  526. if (APR_SUCCESS != rv) {
  527. aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL);
  528. return ret;
  529. }
  530. rv = apr_queue_create(&failed_parts, part_num, parent_pool);
  531. if (APR_SUCCESS != rv) {
  532. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  533. return ret;
  534. }
  535. rv = apr_queue_create(&completed_parts, part_num, parent_pool);
  536. if (APR_SUCCESS != rv) {
  537. aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
  538. return ret;
  539. }
  540. // launch
  541. oss_set_task_tracker(thr_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts);
  542. for (i = 0; i < part_num; i++) {
  543. apr_thread_pool_push(thrp, upload_part, thr_params + i, 0, NULL);
  544. }
  545. // wait until all tasks exit
  546. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  547. for ( ; total_num < (apr_uint32_t)part_num; ) {
  548. rv = apr_queue_trypop(completed_parts, &task_result);
  549. if (rv == APR_EINTR || rv == APR_EAGAIN) {
  550. apr_sleep(1000);
  551. } else if(rv == APR_EOF) {
  552. break;
  553. } else if(rv == APR_SUCCESS) {
  554. task_res = (oss_part_task_result_t*)task_result;
  555. oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag);
  556. rv = oss_dump_checkpoint(parent_pool, checkpoint);
  557. if (rv != AOSE_OK) {
  558. int idx = task_res->part->index;
  559. aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL);
  560. apr_atomic_inc32(&failed);
  561. thr_params[idx].result->s = ret;
  562. apr_queue_push(failed_parts, thr_params[idx].result);
  563. }
  564. if (NULL != progress_callback) {
  565. consume_bytes += task_res->part->size;
  566. progress_callback(consume_bytes, finfo->size);
  567. }
  568. }
  569. total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
  570. }
  571. // deal with left successful parts
  572. while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) {
  573. task_res = (oss_part_task_result_t*)task_result;
  574. oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag);
  575. consume_bytes += task_res->part->size;
  576. has_left_result = AOS_TRUE;
  577. }
  578. if (has_left_result) {
  579. rv = oss_dump_checkpoint(parent_pool, checkpoint);
  580. if (rv != AOSE_OK) {
  581. aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL);
  582. return ret;
  583. }
  584. if (NULL != progress_callback) {
  585. progress_callback(consume_bytes, finfo->size);
  586. }
  587. }
  588. apr_file_close(checkpoint->thefile);
  589. // failed
  590. if (apr_atomic_read32(&failed) > 0) {
  591. apr_queue_pop(failed_parts, &task_result);
  592. task_res = (oss_part_task_result_t*)task_result;
  593. s = aos_status_dup(parent_pool, task_res->s);
  594. oss_destroy_thread_pool(thr_params, part_num);
  595. return s;
  596. }
  597. // successful
  598. aos_pool_create(&subpool, parent_pool);
  599. aos_list_init(&completed_part_list);
  600. for (i = 0; i < checkpoint->part_num; i++) {
  601. complete_content = oss_create_complete_part_content(subpool);
  602. part_num_str = apr_psprintf(subpool, "%d", checkpoint->parts[i].index + 1);
  603. aos_str_set(&complete_content->part_number, part_num_str);
  604. aos_str_set(&complete_content->etag, checkpoint->parts[i].etag.data);
  605. aos_list_add_tail(&complete_content->node, &completed_part_list);
  606. }
  607. oss_destroy_thread_pool(thr_params, part_num);
  608. // complete upload
  609. options->pool = subpool;
  610. if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) {
  611. cb_headers = aos_table_make(subpool, 2);
  612. apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK));
  613. if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) {
  614. apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR));
  615. }
  616. }
  617. s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id,
  618. &completed_part_list, cb_headers, NULL, resp_headers, resp_body);
  619. s = aos_status_dup(parent_pool, s);
  620. aos_pool_destroy(subpool);
  621. options->pool = parent_pool;
  622. // remove chepoint file
  623. apr_file_remove(checkpoint_path->data, parent_pool);
  624. return s;
  625. }
  626. aos_status_t *oss_resumable_upload_file(oss_request_options_t *options,
  627. aos_string_t *bucket,
  628. aos_string_t *object,
  629. aos_string_t *filepath,
  630. aos_table_t *headers,
  631. aos_table_t *params,
  632. oss_resumable_clt_params_t *clt_params,
  633. oss_progress_callback progress_callback,
  634. aos_table_t **resp_headers,
  635. aos_list_t *resp_body)
  636. {
  637. int32_t thread_num = 0;
  638. int64_t part_size = 0;
  639. aos_string_t checkpoint_path;
  640. aos_pool_t *sub_pool;
  641. apr_finfo_t finfo;
  642. aos_status_t *s;
  643. int res;
  644. thread_num = oss_get_thread_num(clt_params);
  645. aos_pool_create(&sub_pool, options->pool);
  646. res = oss_get_file_info(filepath, sub_pool, &finfo);
  647. if (res != AOSE_OK) {
  648. aos_error_log("Open read file fail, filename:%s\n", filepath->data);
  649. s = aos_status_create(options->pool);
  650. aos_file_error_status_set(s, res);
  651. aos_pool_destroy(sub_pool);
  652. return s;
  653. }
  654. part_size = clt_params->part_size;
  655. oss_get_part_size(finfo.size, &part_size);
  656. if (NULL != clt_params && clt_params->enable_checkpoint) {
  657. oss_get_checkpoint_path(clt_params, filepath, sub_pool, &checkpoint_path);
  658. s = oss_resumable_upload_file_with_cp(options, bucket, object, filepath, headers, params, thread_num,
  659. part_size, &checkpoint_path, &finfo, progress_callback, resp_headers, resp_body);
  660. } else {
  661. s = oss_resumable_upload_file_without_cp(options, bucket, object, filepath, headers, params, thread_num,
  662. part_size, &finfo, progress_callback, resp_headers, resp_body);
  663. }
  664. aos_pool_destroy(sub_pool);
  665. return s;
  666. }