#include "aos_log.h" #include "aos_define.h" #include "aos_util.h" #include "aos_string.h" #include "aos_status.h" #include "oss_auth.h" #include "oss_util.h" #include "oss_xml.h" #include "oss_api.h" #include "oss_resumable.h" int32_t oss_get_thread_num(oss_resumable_clt_params_t *clt_params) { if ((NULL == clt_params) || (clt_params->thread_num <= 0 || clt_params->thread_num > 1024)) { return 1; } return clt_params->thread_num; } void oss_get_checkpoint_path(oss_resumable_clt_params_t *clt_params, const aos_string_t *filepath, aos_pool_t *pool, aos_string_t *checkpoint_path) { if ((NULL == checkpoint_path) || (NULL == clt_params) || (!clt_params->enable_checkpoint)) { return; } if (aos_is_null_string(&clt_params->checkpoint_path)) { int len = filepath->len + strlen(".cp") + 1; char *buffer = (char *)aos_pcalloc(pool, len); apr_snprintf(buffer, len, "%.*s.cp", filepath->len, filepath->data); aos_str_set(checkpoint_path , buffer); return; } checkpoint_path->data = clt_params->checkpoint_path.data; checkpoint_path->len = clt_params->checkpoint_path.len; } int oss_get_file_info(const aos_string_t *filepath, aos_pool_t *pool, apr_finfo_t *finfo) { apr_status_t s; char buf[256]; apr_file_t *thefile; s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool); if (s != APR_SUCCESS) { aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); return s; } s = apr_file_info_get(finfo, APR_FINFO_NORM, thefile); if (s != APR_SUCCESS) { apr_file_close(thefile); aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); return s; } apr_file_close(thefile); return AOSE_OK; } int oss_does_file_exist(const aos_string_t *filepath, aos_pool_t *pool) { apr_status_t s; apr_file_t *thefile; s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool); if (s != APR_SUCCESS) { return AOS_FALSE; } apr_file_close(thefile); return AOS_TRUE; } int oss_open_checkpoint_file(aos_pool_t *pool, aos_string_t *checkpoint_path, oss_checkpoint_t *checkpoint) { apr_status_t s; apr_file_t *thefile; char buf[256]; s = apr_file_open(&thefile, checkpoint_path->data, APR_CREATE | APR_WRITE, APR_UREAD | APR_UWRITE | APR_GREAD, pool); if (s == APR_SUCCESS) { checkpoint->thefile = thefile; } else { aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); } return s; } int oss_get_part_num(int64_t file_size, int64_t part_size) { int64_t num = 0; int64_t left = 0; left = (file_size % part_size == 0) ? 0 : 1; num = file_size / part_size + left; return (int)num; } void oss_build_parts(int64_t file_size, int64_t part_size, oss_checkpoint_part_t *parts) { int i = 0; for (; i * part_size < file_size; i++) { parts[i].index = i; parts[i].offset = i * part_size; parts[i].size = aos_min(part_size, (file_size - i * part_size)); parts[i].completed = AOS_FALSE; } } void oss_build_thread_params(oss_upload_thread_params_t *thr_params, int part_num, aos_pool_t *parent_pool, oss_request_options_t *options, aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath, aos_string_t *upload_id, oss_checkpoint_part_t *parts, oss_part_task_result_t *result) { int i = 0; aos_pool_t *subpool = NULL; oss_config_t *config = NULL; aos_http_controller_t *ctl; for (; i < part_num; i++) { aos_pool_create(&subpool, parent_pool); config = oss_config_create(subpool); aos_str_set(&config->endpoint, options->config->endpoint.data); aos_str_set(&config->access_key_id, options->config->access_key_id.data); aos_str_set(&config->access_key_secret, options->config->access_key_secret.data); config->is_cname = options->config->is_cname; ctl = aos_http_controller_create(subpool, 0); thr_params[i].options.config = config; thr_params[i].options.ctl = ctl; thr_params[i].options.pool = subpool; thr_params[i].bucket = bucket; thr_params[i].object = object; thr_params[i].filepath = filepath; thr_params[i].upload_id = upload_id; thr_params[i].part = parts + i; thr_params[i].result = result + i; thr_params[i].result->part = thr_params[i].part; } } void oss_destroy_thread_pool(oss_upload_thread_params_t *thr_params, int part_num) { int i = 0; for (; i < part_num; i++) { aos_pool_destroy(thr_params[i].options.pool); } } void oss_set_task_tracker(oss_upload_thread_params_t *thr_params, int part_num, apr_uint32_t *launched, apr_uint32_t *failed, apr_uint32_t *completed, apr_queue_t *failed_parts, apr_queue_t *completed_parts) { int i = 0; for (; i < part_num; i++) { thr_params[i].launched = launched; thr_params[i].failed = failed; thr_params[i].completed = completed; thr_params[i].failed_parts = failed_parts; thr_params[i].completed_parts = completed_parts; } } int oss_verify_checkpoint_md5(aos_pool_t *pool, const oss_checkpoint_t *checkpoint) { return AOS_TRUE; } void oss_build_upload_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, aos_string_t *file_path, apr_finfo_t *finfo, aos_string_t *upload_id, int64_t part_size) { int i = 0; checkpoint->cp_type = OSS_CP_UPLOAD; aos_str_set(&checkpoint->file_path, aos_pstrdup(pool, file_path)); checkpoint->file_size = finfo->size; checkpoint->file_last_modified = finfo->mtime; aos_str_set(&checkpoint->upload_id, aos_pstrdup(pool, upload_id)); checkpoint->part_size = part_size; for (; i * part_size < finfo->size; i++) { checkpoint->parts[i].index = i; checkpoint->parts[i].offset = i * part_size; checkpoint->parts[i].size = aos_min(part_size, (finfo->size - i * part_size)); checkpoint->parts[i].completed = AOS_FALSE; aos_str_set(&checkpoint->parts[i].etag , ""); } checkpoint->part_num = i; } int oss_dump_checkpoint(aos_pool_t *pool, const oss_checkpoint_t *checkpoint) { char *xml_body = NULL; apr_status_t s; char buf[256]; apr_size_t len; // to xml xml_body = oss_build_checkpoint_xml(pool, checkpoint); if (NULL == xml_body) { return AOSE_OUT_MEMORY; } // truncate to empty s = apr_file_trunc(checkpoint->thefile, 0); if (s != APR_SUCCESS) { aos_error_log("apr_file_write fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); return AOSE_FILE_TRUNC_ERROR; } // write to file len = strlen(xml_body); s = apr_file_write(checkpoint->thefile, xml_body, &len); if (s != APR_SUCCESS) { aos_error_log("apr_file_write fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); return AOSE_FILE_WRITE_ERROR; } // flush file s = apr_file_flush(checkpoint->thefile); if (s != APR_SUCCESS) { aos_error_log("apr_file_flush fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); return AOSE_FILE_FLUSH_ERROR; } return AOSE_OK; } int oss_load_checkpoint(aos_pool_t *pool, const aos_string_t *filepath, oss_checkpoint_t *checkpoint) { apr_status_t s; char buf[256]; apr_size_t len; apr_finfo_t finfo; char *xml_body = NULL; apr_file_t *thefile; // open file s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool); if (s != APR_SUCCESS) { aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); return AOSE_OPEN_FILE_ERROR; } // get file stat s = apr_file_info_get(&finfo, APR_FINFO_NORM, thefile); if (s != APR_SUCCESS) { aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); apr_file_close(thefile); return AOSE_FILE_INFO_ERROR; } xml_body = (char *)aos_palloc(pool, (apr_size_t)(finfo.size + 1)); // read s = apr_file_read_full(thefile, xml_body, (apr_size_t)finfo.size, &len); if (s != APR_SUCCESS) { aos_error_log("apr_file_read_full fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf))); apr_file_close(thefile); return AOSE_FILE_READ_ERROR; } apr_file_close(thefile); xml_body[len] = '\0'; // parse return oss_checkpoint_parse_from_body(pool, xml_body, checkpoint); } int oss_is_upload_checkpoint_valid(aos_pool_t *pool, oss_checkpoint_t *checkpoint, apr_finfo_t *finfo) { if (oss_verify_checkpoint_md5(pool, checkpoint) && (checkpoint->file_size == finfo->size) && (checkpoint->file_last_modified == finfo->mtime)) { return AOS_TRUE; } return AOS_FALSE; } void oss_update_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, int32_t part_index, aos_string_t *etag) { char *p = NULL; checkpoint->parts[part_index].completed = AOS_TRUE; p = apr_pstrdup(pool, etag->data); aos_str_set(&checkpoint->parts[part_index].etag, p); } void oss_get_checkpoint_undo_parts(oss_checkpoint_t *checkpoint, int *part_num, oss_checkpoint_part_t *parts) { int i = 0; int idx = 0; for (; i < checkpoint->part_num; i++) { if (!checkpoint->parts[i].completed) { parts[idx].index = checkpoint->parts[i].index; parts[idx].offset = checkpoint->parts[i].offset; parts[idx].size = checkpoint->parts[i].size; parts[idx].completed = checkpoint->parts[i].completed; idx++; } } *part_num = idx; } void * APR_THREAD_FUNC upload_part(apr_thread_t *thd, void *data) { aos_status_t *s = NULL; oss_upload_thread_params_t *params = NULL; oss_upload_file_t *upload_file = NULL; aos_table_t *resp_headers = NULL; int part_num; char *etag; params = (oss_upload_thread_params_t *)data; if (apr_atomic_read32(params->failed) > 0) { apr_atomic_inc32(params->launched); return NULL; } part_num = params->part->index + 1; upload_file = oss_create_upload_file(params->options.pool); aos_str_set(&upload_file->filename, params->filepath->data); upload_file->file_pos = params->part->offset; upload_file->file_last = params->part->offset + params->part->size; s = oss_upload_part_from_file(¶ms->options, params->bucket, params->object, params->upload_id, part_num, upload_file, &resp_headers); if (!aos_status_is_ok(s)) { apr_atomic_inc32(params->failed); params->result->s = s; apr_queue_push(params->failed_parts, params->result); return s; } etag = apr_pstrdup(params->options.pool, (char*)apr_table_get(resp_headers, "ETag")); aos_str_set(¶ms->result->etag, etag); apr_atomic_inc32(params->completed); apr_queue_push(params->completed_parts, params->result); return NULL; } aos_status_t *oss_resumable_upload_file_without_cp(oss_request_options_t *options, aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath, aos_table_t *headers, aos_table_t *params, int32_t thread_num, int64_t part_size, apr_finfo_t *finfo, oss_progress_callback progress_callback, aos_table_t **resp_headers, aos_list_t *resp_body) { aos_pool_t *subpool = NULL; aos_pool_t *parent_pool = NULL; aos_status_t *s = NULL; aos_status_t *ret = NULL; aos_list_t completed_part_list; oss_complete_part_content_t *complete_content = NULL; aos_string_t upload_id; oss_checkpoint_part_t *parts; oss_part_task_result_t *results; oss_part_task_result_t *task_res; oss_upload_thread_params_t *thr_params; aos_table_t *cb_headers = NULL; apr_thread_pool_t *thrp; apr_uint32_t launched = 0; apr_uint32_t failed = 0; apr_uint32_t completed = 0; apr_uint32_t total_num = 0; apr_queue_t *failed_parts; apr_queue_t *completed_parts; int64_t consume_bytes = 0; void *task_result; char *part_num_str; char *etag; int part_num = 0; int i = 0; int rv; // prepare parent_pool = options->pool; ret = aos_status_create(parent_pool); part_num = oss_get_part_num(finfo->size, part_size); parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * part_num); oss_build_parts(finfo->size, part_size, parts); results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num); thr_params = (oss_upload_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_upload_thread_params_t) * part_num); oss_build_thread_params(thr_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results); // init upload aos_pool_create(&subpool, parent_pool); options->pool = subpool; s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers); if (!aos_status_is_ok(s)) { s = aos_status_dup(parent_pool, s); aos_pool_destroy(subpool); options->pool = parent_pool; return s; } aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data)); options->pool = parent_pool; aos_pool_destroy(subpool); // upload parts rv = apr_thread_pool_create(&thrp, 0, thread_num, parent_pool); if (APR_SUCCESS != rv) { aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL); return ret; } rv = apr_queue_create(&failed_parts, part_num, parent_pool); if (APR_SUCCESS != rv) { aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL); return ret; } rv = apr_queue_create(&completed_parts, part_num, parent_pool); if (APR_SUCCESS != rv) { aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL); return ret; } // launch oss_set_task_tracker(thr_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts); for (i = 0; i < part_num; i++) { apr_thread_pool_push(thrp, upload_part, thr_params + i, 0, NULL); } // wait until all tasks exit total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed); for ( ; total_num < (apr_uint32_t)part_num; ) { rv = apr_queue_trypop(completed_parts, &task_result); if (rv == APR_EINTR || rv == APR_EAGAIN) { apr_sleep(1000); } else if(rv == APR_EOF) { break; } else if(rv == APR_SUCCESS) { task_res = (oss_part_task_result_t*)task_result; if (NULL != progress_callback) { consume_bytes += task_res->part->size; progress_callback(consume_bytes, finfo->size); } } total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed); } // deal with left successful parts while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) { task_res = (oss_part_task_result_t*)task_result; if (NULL != progress_callback) { consume_bytes += task_res->part->size; progress_callback(consume_bytes, finfo->size); } } // failed if (apr_atomic_read32(&failed) > 0) { apr_queue_pop(failed_parts, &task_result); task_res = (oss_part_task_result_t*)task_result; s = aos_status_dup(parent_pool, task_res->s); oss_destroy_thread_pool(thr_params, part_num); return s; } // successful aos_pool_create(&subpool, parent_pool); aos_list_init(&completed_part_list); for (i = 0; i < part_num; i++) { complete_content = oss_create_complete_part_content(subpool); part_num_str = apr_psprintf(subpool, "%d", thr_params[i].part->index + 1); aos_str_set(&complete_content->part_number, part_num_str); etag = apr_pstrdup(subpool, thr_params[i].result->etag.data); aos_str_set(&complete_content->etag, etag); aos_list_add_tail(&complete_content->node, &completed_part_list); } oss_destroy_thread_pool(thr_params, part_num); // complete upload options->pool = subpool; if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) { cb_headers = aos_table_make(subpool, 2); apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK)); if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) { apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR)); } } s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id, &completed_part_list, cb_headers, NULL, resp_headers, resp_body); s = aos_status_dup(parent_pool, s); aos_pool_destroy(subpool); options->pool = parent_pool; return s; } aos_status_t *oss_resumable_upload_file_with_cp(oss_request_options_t *options, aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath, aos_table_t *headers, aos_table_t *params, int32_t thread_num, int64_t part_size, aos_string_t *checkpoint_path, apr_finfo_t *finfo, oss_progress_callback progress_callback, aos_table_t **resp_headers, aos_list_t *resp_body) { aos_pool_t *subpool = NULL; aos_pool_t *parent_pool = NULL; aos_status_t *s = NULL; aos_status_t *ret = NULL; aos_list_t completed_part_list; oss_complete_part_content_t *complete_content = NULL; aos_string_t upload_id; oss_checkpoint_part_t *parts; oss_part_task_result_t *results; oss_part_task_result_t *task_res; oss_upload_thread_params_t *thr_params; aos_table_t *cb_headers = NULL; apr_thread_pool_t *thrp; apr_uint32_t launched = 0; apr_uint32_t failed = 0; apr_uint32_t completed = 0; apr_uint32_t total_num = 0; apr_queue_t *failed_parts; apr_queue_t *completed_parts; oss_checkpoint_t *checkpoint = NULL; int need_init_upload = AOS_TRUE; int has_left_result = AOS_FALSE; int64_t consume_bytes = 0; void *task_result; char *part_num_str; int part_num = 0; int i = 0; int rv; // checkpoint parent_pool = options->pool; ret = aos_status_create(parent_pool); checkpoint = oss_create_checkpoint_content(parent_pool); if(oss_does_file_exist(checkpoint_path, parent_pool)) { if (AOSE_OK == oss_load_checkpoint(parent_pool, checkpoint_path, checkpoint) && oss_is_upload_checkpoint_valid(parent_pool, checkpoint, finfo)) { aos_str_set(&upload_id, checkpoint->upload_id.data); need_init_upload = AOS_FALSE; } else { apr_file_remove(checkpoint_path->data, parent_pool); } } if (need_init_upload) { // init upload aos_pool_create(&subpool, parent_pool); options->pool = subpool; s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers); if (!aos_status_is_ok(s)) { s = aos_status_dup(parent_pool, s); aos_pool_destroy(subpool); options->pool = parent_pool; return s; } aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data)); options->pool = parent_pool; aos_pool_destroy(subpool); // build checkpoint oss_build_upload_checkpoint(parent_pool, checkpoint, filepath, finfo, &upload_id, part_size); } rv = oss_open_checkpoint_file(parent_pool, checkpoint_path, checkpoint); if (rv != APR_SUCCESS) { aos_status_set(ret, rv, AOS_OPEN_FILE_ERROR_CODE, NULL); return ret; } // prepare ret = aos_status_create(parent_pool); parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * (checkpoint->part_num)); oss_get_checkpoint_undo_parts(checkpoint, &part_num, parts); results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num); thr_params = (oss_upload_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_upload_thread_params_t) * part_num); oss_build_thread_params(thr_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results); // upload parts rv = apr_thread_pool_create(&thrp, 0, thread_num, parent_pool); if (APR_SUCCESS != rv) { aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL); return ret; } rv = apr_queue_create(&failed_parts, part_num, parent_pool); if (APR_SUCCESS != rv) { aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL); return ret; } rv = apr_queue_create(&completed_parts, part_num, parent_pool); if (APR_SUCCESS != rv) { aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL); return ret; } // launch oss_set_task_tracker(thr_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts); for (i = 0; i < part_num; i++) { apr_thread_pool_push(thrp, upload_part, thr_params + i, 0, NULL); } // wait until all tasks exit total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed); for ( ; total_num < (apr_uint32_t)part_num; ) { rv = apr_queue_trypop(completed_parts, &task_result); if (rv == APR_EINTR || rv == APR_EAGAIN) { apr_sleep(1000); } else if(rv == APR_EOF) { break; } else if(rv == APR_SUCCESS) { task_res = (oss_part_task_result_t*)task_result; oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag); rv = oss_dump_checkpoint(parent_pool, checkpoint); if (rv != AOSE_OK) { int idx = task_res->part->index; aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL); apr_atomic_inc32(&failed); thr_params[idx].result->s = ret; apr_queue_push(failed_parts, thr_params[idx].result); } if (NULL != progress_callback) { consume_bytes += task_res->part->size; progress_callback(consume_bytes, finfo->size); } } total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed); } // deal with left successful parts while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) { task_res = (oss_part_task_result_t*)task_result; oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag); consume_bytes += task_res->part->size; has_left_result = AOS_TRUE; } if (has_left_result) { rv = oss_dump_checkpoint(parent_pool, checkpoint); if (rv != AOSE_OK) { aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL); return ret; } if (NULL != progress_callback) { progress_callback(consume_bytes, finfo->size); } } apr_file_close(checkpoint->thefile); // failed if (apr_atomic_read32(&failed) > 0) { apr_queue_pop(failed_parts, &task_result); task_res = (oss_part_task_result_t*)task_result; s = aos_status_dup(parent_pool, task_res->s); oss_destroy_thread_pool(thr_params, part_num); return s; } // successful aos_pool_create(&subpool, parent_pool); aos_list_init(&completed_part_list); for (i = 0; i < checkpoint->part_num; i++) { complete_content = oss_create_complete_part_content(subpool); part_num_str = apr_psprintf(subpool, "%d", checkpoint->parts[i].index + 1); aos_str_set(&complete_content->part_number, part_num_str); aos_str_set(&complete_content->etag, checkpoint->parts[i].etag.data); aos_list_add_tail(&complete_content->node, &completed_part_list); } oss_destroy_thread_pool(thr_params, part_num); // complete upload options->pool = subpool; if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) { cb_headers = aos_table_make(subpool, 2); apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK)); if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) { apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR)); } } s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id, &completed_part_list, cb_headers, NULL, resp_headers, resp_body); s = aos_status_dup(parent_pool, s); aos_pool_destroy(subpool); options->pool = parent_pool; // remove chepoint file apr_file_remove(checkpoint_path->data, parent_pool); return s; } aos_status_t *oss_resumable_upload_file(oss_request_options_t *options, aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath, aos_table_t *headers, aos_table_t *params, oss_resumable_clt_params_t *clt_params, oss_progress_callback progress_callback, aos_table_t **resp_headers, aos_list_t *resp_body) { int32_t thread_num = 0; int64_t part_size = 0; aos_string_t checkpoint_path; aos_pool_t *sub_pool; apr_finfo_t finfo; aos_status_t *s; int res; thread_num = oss_get_thread_num(clt_params); aos_pool_create(&sub_pool, options->pool); res = oss_get_file_info(filepath, sub_pool, &finfo); if (res != AOSE_OK) { aos_error_log("Open read file fail, filename:%s\n", filepath->data); s = aos_status_create(options->pool); aos_file_error_status_set(s, res); aos_pool_destroy(sub_pool); return s; } part_size = clt_params->part_size; oss_get_part_size(finfo.size, &part_size); if (NULL != clt_params && clt_params->enable_checkpoint) { oss_get_checkpoint_path(clt_params, filepath, sub_pool, &checkpoint_path); s = oss_resumable_upload_file_with_cp(options, bucket, object, filepath, headers, params, thread_num, part_size, &checkpoint_path, &finfo, progress_callback, resp_headers, resp_body); } else { s = oss_resumable_upload_file_without_cp(options, bucket, object, filepath, headers, params, thread_num, part_size, &finfo, progress_callback, resp_headers, resp_body); } aos_pool_destroy(sub_pool); return s; }