123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742 |
- #include "aos_log.h"
- #include "aos_define.h"
- #include "aos_util.h"
- #include "aos_string.h"
- #include "aos_status.h"
- #include "oss_auth.h"
- #include "oss_util.h"
- #include "oss_xml.h"
- #include "oss_api.h"
- #include "oss_resumable.h"
- int32_t oss_get_thread_num(oss_resumable_clt_params_t *clt_params)
- {
- if ((NULL == clt_params) || (clt_params->thread_num <= 0 || clt_params->thread_num > 1024)) {
- return 1;
- }
- return clt_params->thread_num;
- }
- void oss_get_checkpoint_path(oss_resumable_clt_params_t *clt_params, const aos_string_t *filepath,
- aos_pool_t *pool, aos_string_t *checkpoint_path)
- {
- if ((NULL == checkpoint_path) || (NULL == clt_params) || (!clt_params->enable_checkpoint)) {
- return;
- }
- if (aos_is_null_string(&clt_params->checkpoint_path)) {
- int len = filepath->len + strlen(".cp") + 1;
- char *buffer = (char *)aos_pcalloc(pool, len);
- apr_snprintf(buffer, len, "%.*s.cp", filepath->len, filepath->data);
- aos_str_set(checkpoint_path , buffer);
- return;
- }
- checkpoint_path->data = clt_params->checkpoint_path.data;
- checkpoint_path->len = clt_params->checkpoint_path.len;
- }
- int oss_get_file_info(const aos_string_t *filepath, aos_pool_t *pool, apr_finfo_t *finfo)
- {
- apr_status_t s;
- char buf[256];
- apr_file_t *thefile;
- s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- return s;
- }
- s = apr_file_info_get(finfo, APR_FINFO_NORM, thefile);
- if (s != APR_SUCCESS) {
- apr_file_close(thefile);
- aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- return s;
- }
- apr_file_close(thefile);
- return AOSE_OK;
- }
- int oss_does_file_exist(const aos_string_t *filepath, aos_pool_t *pool)
- {
- apr_status_t s;
- apr_file_t *thefile;
- s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
- if (s != APR_SUCCESS) {
- return AOS_FALSE;
- }
- apr_file_close(thefile);
- return AOS_TRUE;
- }
- int oss_open_checkpoint_file(aos_pool_t *pool, aos_string_t *checkpoint_path, oss_checkpoint_t *checkpoint)
- {
- apr_status_t s;
- apr_file_t *thefile;
- char buf[256];
- s = apr_file_open(&thefile, checkpoint_path->data, APR_CREATE | APR_WRITE, APR_UREAD | APR_UWRITE | APR_GREAD, pool);
- if (s == APR_SUCCESS) {
- checkpoint->thefile = thefile;
- } else {
- aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- }
- return s;
- }
- int oss_get_part_num(int64_t file_size, int64_t part_size)
- {
- int64_t num = 0;
- int64_t left = 0;
- left = (file_size % part_size == 0) ? 0 : 1;
- num = file_size / part_size + left;
- return (int)num;
- }
- void oss_build_parts(int64_t file_size, int64_t part_size, oss_checkpoint_part_t *parts)
- {
- int i = 0;
- for (; i * part_size < file_size; i++) {
- parts[i].index = i;
- parts[i].offset = i * part_size;
- parts[i].size = aos_min(part_size, (file_size - i * part_size));
- parts[i].completed = AOS_FALSE;
- }
- }
- void oss_build_thread_params(oss_upload_thread_params_t *thr_params, int part_num,
- aos_pool_t *parent_pool, oss_request_options_t *options,
- aos_string_t *bucket, aos_string_t *object, aos_string_t *filepath,
- aos_string_t *upload_id, oss_checkpoint_part_t *parts,
- oss_part_task_result_t *result)
- {
- int i = 0;
- aos_pool_t *subpool = NULL;
- oss_config_t *config = NULL;
- aos_http_controller_t *ctl;
- for (; i < part_num; i++) {
- aos_pool_create(&subpool, parent_pool);
- config = oss_config_create(subpool);
- aos_str_set(&config->endpoint, options->config->endpoint.data);
- aos_str_set(&config->access_key_id, options->config->access_key_id.data);
- aos_str_set(&config->access_key_secret, options->config->access_key_secret.data);
- config->is_cname = options->config->is_cname;
- ctl = aos_http_controller_create(subpool, 0);
- thr_params[i].options.config = config;
- thr_params[i].options.ctl = ctl;
- thr_params[i].options.pool = subpool;
- thr_params[i].bucket = bucket;
- thr_params[i].object = object;
- thr_params[i].filepath = filepath;
- thr_params[i].upload_id = upload_id;
- thr_params[i].part = parts + i;
- thr_params[i].result = result + i;
- thr_params[i].result->part = thr_params[i].part;
- }
- }
- void oss_destroy_thread_pool(oss_upload_thread_params_t *thr_params, int part_num)
- {
- int i = 0;
- for (; i < part_num; i++) {
- aos_pool_destroy(thr_params[i].options.pool);
- }
- }
- void oss_set_task_tracker(oss_upload_thread_params_t *thr_params, int part_num,
- apr_uint32_t *launched, apr_uint32_t *failed, apr_uint32_t *completed,
- apr_queue_t *failed_parts, apr_queue_t *completed_parts)
- {
- int i = 0;
- for (; i < part_num; i++) {
- thr_params[i].launched = launched;
- thr_params[i].failed = failed;
- thr_params[i].completed = completed;
- thr_params[i].failed_parts = failed_parts;
- thr_params[i].completed_parts = completed_parts;
- }
- }
- int oss_verify_checkpoint_md5(aos_pool_t *pool, const oss_checkpoint_t *checkpoint)
- {
- return AOS_TRUE;
- }
- void oss_build_upload_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, aos_string_t *file_path,
- apr_finfo_t *finfo, aos_string_t *upload_id, int64_t part_size)
- {
- int i = 0;
- checkpoint->cp_type = OSS_CP_UPLOAD;
- aos_str_set(&checkpoint->file_path, aos_pstrdup(pool, file_path));
- checkpoint->file_size = finfo->size;
- checkpoint->file_last_modified = finfo->mtime;
- aos_str_set(&checkpoint->upload_id, aos_pstrdup(pool, upload_id));
- checkpoint->part_size = part_size;
- for (; i * part_size < finfo->size; i++) {
- checkpoint->parts[i].index = i;
- checkpoint->parts[i].offset = i * part_size;
- checkpoint->parts[i].size = aos_min(part_size, (finfo->size - i * part_size));
- checkpoint->parts[i].completed = AOS_FALSE;
- aos_str_set(&checkpoint->parts[i].etag , "");
- }
- checkpoint->part_num = i;
- }
- int oss_dump_checkpoint(aos_pool_t *pool, const oss_checkpoint_t *checkpoint)
- {
- char *xml_body = NULL;
- apr_status_t s;
- char buf[256];
- apr_size_t len;
-
- // to xml
- xml_body = oss_build_checkpoint_xml(pool, checkpoint);
- if (NULL == xml_body) {
- return AOSE_OUT_MEMORY;
- }
- // truncate to empty
- s = apr_file_trunc(checkpoint->thefile, 0);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_write fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- return AOSE_FILE_TRUNC_ERROR;
- }
-
- // write to file
- len = strlen(xml_body);
- s = apr_file_write(checkpoint->thefile, xml_body, &len);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_write fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- return AOSE_FILE_WRITE_ERROR;
- }
- // flush file
- s = apr_file_flush(checkpoint->thefile);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_flush fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- return AOSE_FILE_FLUSH_ERROR;
- }
- return AOSE_OK;
- }
- int oss_load_checkpoint(aos_pool_t *pool, const aos_string_t *filepath, oss_checkpoint_t *checkpoint)
- {
- apr_status_t s;
- char buf[256];
- apr_size_t len;
- apr_finfo_t finfo;
- char *xml_body = NULL;
- apr_file_t *thefile;
- // open file
- s = apr_file_open(&thefile, filepath->data, APR_READ, APR_UREAD | APR_GREAD, pool);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_open failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- return AOSE_OPEN_FILE_ERROR;
- }
- // get file stat
- s = apr_file_info_get(&finfo, APR_FINFO_NORM, thefile);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_info_get failure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- apr_file_close(thefile);
- return AOSE_FILE_INFO_ERROR;
- }
- xml_body = (char *)aos_palloc(pool, (apr_size_t)(finfo.size + 1));
- // read
- s = apr_file_read_full(thefile, xml_body, (apr_size_t)finfo.size, &len);
- if (s != APR_SUCCESS) {
- aos_error_log("apr_file_read_full fialure, code:%d %s.", s, apr_strerror(s, buf, sizeof(buf)));
- apr_file_close(thefile);
- return AOSE_FILE_READ_ERROR;
- }
- apr_file_close(thefile);
- xml_body[len] = '\0';
- // parse
- return oss_checkpoint_parse_from_body(pool, xml_body, checkpoint);
- }
- int oss_is_upload_checkpoint_valid(aos_pool_t *pool, oss_checkpoint_t *checkpoint, apr_finfo_t *finfo)
- {
- if (oss_verify_checkpoint_md5(pool, checkpoint) &&
- (checkpoint->file_size == finfo->size) &&
- (checkpoint->file_last_modified == finfo->mtime)) {
- return AOS_TRUE;
- }
- return AOS_FALSE;
- }
- void oss_update_checkpoint(aos_pool_t *pool, oss_checkpoint_t *checkpoint, int32_t part_index, aos_string_t *etag)
- {
- char *p = NULL;
- checkpoint->parts[part_index].completed = AOS_TRUE;
- p = apr_pstrdup(pool, etag->data);
- aos_str_set(&checkpoint->parts[part_index].etag, p);
- }
- void oss_get_checkpoint_undo_parts(oss_checkpoint_t *checkpoint, int *part_num, oss_checkpoint_part_t *parts)
- {
- int i = 0;
- int idx = 0;
- for (; i < checkpoint->part_num; i++) {
- if (!checkpoint->parts[i].completed) {
- parts[idx].index = checkpoint->parts[i].index;
- parts[idx].offset = checkpoint->parts[i].offset;
- parts[idx].size = checkpoint->parts[i].size;
- parts[idx].completed = checkpoint->parts[i].completed;
- idx++;
- }
- }
- *part_num = idx;
- }
- void * APR_THREAD_FUNC upload_part(apr_thread_t *thd, void *data)
- {
- aos_status_t *s = NULL;
- oss_upload_thread_params_t *params = NULL;
- oss_upload_file_t *upload_file = NULL;
- aos_table_t *resp_headers = NULL;
- int part_num;
- char *etag;
-
- params = (oss_upload_thread_params_t *)data;
- if (apr_atomic_read32(params->failed) > 0) {
- apr_atomic_inc32(params->launched);
- return NULL;
- }
- part_num = params->part->index + 1;
- upload_file = oss_create_upload_file(params->options.pool);
- aos_str_set(&upload_file->filename, params->filepath->data);
- upload_file->file_pos = params->part->offset;
- upload_file->file_last = params->part->offset + params->part->size;
- s = oss_upload_part_from_file(¶ms->options, params->bucket, params->object, params->upload_id,
- part_num, upload_file, &resp_headers);
- if (!aos_status_is_ok(s)) {
- apr_atomic_inc32(params->failed);
- params->result->s = s;
- apr_queue_push(params->failed_parts, params->result);
- return s;
- }
- etag = apr_pstrdup(params->options.pool, (char*)apr_table_get(resp_headers, "ETag"));
- aos_str_set(¶ms->result->etag, etag);
- apr_atomic_inc32(params->completed);
- apr_queue_push(params->completed_parts, params->result);
- return NULL;
- }
- aos_status_t *oss_resumable_upload_file_without_cp(oss_request_options_t *options,
- aos_string_t *bucket,
- aos_string_t *object,
- aos_string_t *filepath,
- aos_table_t *headers,
- aos_table_t *params,
- int32_t thread_num,
- int64_t part_size,
- apr_finfo_t *finfo,
- oss_progress_callback progress_callback,
- aos_table_t **resp_headers,
- aos_list_t *resp_body)
- {
- aos_pool_t *subpool = NULL;
- aos_pool_t *parent_pool = NULL;
- aos_status_t *s = NULL;
- aos_status_t *ret = NULL;
- aos_list_t completed_part_list;
- oss_complete_part_content_t *complete_content = NULL;
- aos_string_t upload_id;
- oss_checkpoint_part_t *parts;
- oss_part_task_result_t *results;
- oss_part_task_result_t *task_res;
- oss_upload_thread_params_t *thr_params;
- aos_table_t *cb_headers = NULL;
- apr_thread_pool_t *thrp;
- apr_uint32_t launched = 0;
- apr_uint32_t failed = 0;
- apr_uint32_t completed = 0;
- apr_uint32_t total_num = 0;
- apr_queue_t *failed_parts;
- apr_queue_t *completed_parts;
- int64_t consume_bytes = 0;
- void *task_result;
- char *part_num_str;
- char *etag;
- int part_num = 0;
- int i = 0;
- int rv;
- // prepare
- parent_pool = options->pool;
- ret = aos_status_create(parent_pool);
- part_num = oss_get_part_num(finfo->size, part_size);
- parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * part_num);
- oss_build_parts(finfo->size, part_size, parts);
- results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num);
- thr_params = (oss_upload_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_upload_thread_params_t) * part_num);
- oss_build_thread_params(thr_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results);
-
- // init upload
- aos_pool_create(&subpool, parent_pool);
- options->pool = subpool;
- s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers);
- if (!aos_status_is_ok(s)) {
- s = aos_status_dup(parent_pool, s);
- aos_pool_destroy(subpool);
- options->pool = parent_pool;
- return s;
- }
- aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data));
- options->pool = parent_pool;
- aos_pool_destroy(subpool);
- // upload parts
- rv = apr_thread_pool_create(&thrp, 0, thread_num, parent_pool);
- if (APR_SUCCESS != rv) {
- aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL);
- return ret;
- }
- rv = apr_queue_create(&failed_parts, part_num, parent_pool);
- if (APR_SUCCESS != rv) {
- aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
- return ret;
- }
- rv = apr_queue_create(&completed_parts, part_num, parent_pool);
- if (APR_SUCCESS != rv) {
- aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
- return ret;
- }
- // launch
- oss_set_task_tracker(thr_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts);
- for (i = 0; i < part_num; i++) {
- apr_thread_pool_push(thrp, upload_part, thr_params + i, 0, NULL);
- }
- // wait until all tasks exit
- total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
- for ( ; total_num < (apr_uint32_t)part_num; ) {
- rv = apr_queue_trypop(completed_parts, &task_result);
- if (rv == APR_EINTR || rv == APR_EAGAIN) {
- apr_sleep(1000);
- } else if(rv == APR_EOF) {
- break;
- } else if(rv == APR_SUCCESS) {
- task_res = (oss_part_task_result_t*)task_result;
- if (NULL != progress_callback) {
- consume_bytes += task_res->part->size;
- progress_callback(consume_bytes, finfo->size);
- }
- }
- total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
- }
- // deal with left successful parts
- while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) {
- task_res = (oss_part_task_result_t*)task_result;
- if (NULL != progress_callback) {
- consume_bytes += task_res->part->size;
- progress_callback(consume_bytes, finfo->size);
- }
- }
- // failed
- if (apr_atomic_read32(&failed) > 0) {
- apr_queue_pop(failed_parts, &task_result);
- task_res = (oss_part_task_result_t*)task_result;
- s = aos_status_dup(parent_pool, task_res->s);
- oss_destroy_thread_pool(thr_params, part_num);
- return s;
- }
- // successful
- aos_pool_create(&subpool, parent_pool);
- aos_list_init(&completed_part_list);
- for (i = 0; i < part_num; i++) {
- complete_content = oss_create_complete_part_content(subpool);
- part_num_str = apr_psprintf(subpool, "%d", thr_params[i].part->index + 1);
- aos_str_set(&complete_content->part_number, part_num_str);
- etag = apr_pstrdup(subpool, thr_params[i].result->etag.data);
- aos_str_set(&complete_content->etag, etag);
- aos_list_add_tail(&complete_content->node, &completed_part_list);
- }
- oss_destroy_thread_pool(thr_params, part_num);
- // complete upload
- options->pool = subpool;
- if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) {
- cb_headers = aos_table_make(subpool, 2);
- apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK));
- if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) {
- apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR));
- }
- }
- s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id,
- &completed_part_list, cb_headers, NULL, resp_headers, resp_body);
- s = aos_status_dup(parent_pool, s);
- aos_pool_destroy(subpool);
- options->pool = parent_pool;
- return s;
- }
- aos_status_t *oss_resumable_upload_file_with_cp(oss_request_options_t *options,
- aos_string_t *bucket,
- aos_string_t *object,
- aos_string_t *filepath,
- aos_table_t *headers,
- aos_table_t *params,
- int32_t thread_num,
- int64_t part_size,
- aos_string_t *checkpoint_path,
- apr_finfo_t *finfo,
- oss_progress_callback progress_callback,
- aos_table_t **resp_headers,
- aos_list_t *resp_body)
- {
- aos_pool_t *subpool = NULL;
- aos_pool_t *parent_pool = NULL;
- aos_status_t *s = NULL;
- aos_status_t *ret = NULL;
- aos_list_t completed_part_list;
- oss_complete_part_content_t *complete_content = NULL;
- aos_string_t upload_id;
- oss_checkpoint_part_t *parts;
- oss_part_task_result_t *results;
- oss_part_task_result_t *task_res;
- oss_upload_thread_params_t *thr_params;
- aos_table_t *cb_headers = NULL;
- apr_thread_pool_t *thrp;
- apr_uint32_t launched = 0;
- apr_uint32_t failed = 0;
- apr_uint32_t completed = 0;
- apr_uint32_t total_num = 0;
- apr_queue_t *failed_parts;
- apr_queue_t *completed_parts;
- oss_checkpoint_t *checkpoint = NULL;
- int need_init_upload = AOS_TRUE;
- int has_left_result = AOS_FALSE;
- int64_t consume_bytes = 0;
- void *task_result;
- char *part_num_str;
- int part_num = 0;
- int i = 0;
- int rv;
- // checkpoint
- parent_pool = options->pool;
- ret = aos_status_create(parent_pool);
- checkpoint = oss_create_checkpoint_content(parent_pool);
- if(oss_does_file_exist(checkpoint_path, parent_pool)) {
- if (AOSE_OK == oss_load_checkpoint(parent_pool, checkpoint_path, checkpoint) &&
- oss_is_upload_checkpoint_valid(parent_pool, checkpoint, finfo)) {
- aos_str_set(&upload_id, checkpoint->upload_id.data);
- need_init_upload = AOS_FALSE;
- } else {
- apr_file_remove(checkpoint_path->data, parent_pool);
- }
- }
- if (need_init_upload) {
- // init upload
- aos_pool_create(&subpool, parent_pool);
- options->pool = subpool;
- s = oss_init_multipart_upload(options, bucket, object, &upload_id, headers, resp_headers);
- if (!aos_status_is_ok(s)) {
- s = aos_status_dup(parent_pool, s);
- aos_pool_destroy(subpool);
- options->pool = parent_pool;
- return s;
- }
- aos_str_set(&upload_id, apr_pstrdup(parent_pool, upload_id.data));
- options->pool = parent_pool;
- aos_pool_destroy(subpool);
- // build checkpoint
- oss_build_upload_checkpoint(parent_pool, checkpoint, filepath, finfo, &upload_id, part_size);
- }
- rv = oss_open_checkpoint_file(parent_pool, checkpoint_path, checkpoint);
- if (rv != APR_SUCCESS) {
- aos_status_set(ret, rv, AOS_OPEN_FILE_ERROR_CODE, NULL);
- return ret;
- }
- // prepare
- ret = aos_status_create(parent_pool);
- parts = (oss_checkpoint_part_t *)aos_palloc(parent_pool, sizeof(oss_checkpoint_part_t) * (checkpoint->part_num));
- oss_get_checkpoint_undo_parts(checkpoint, &part_num, parts);
- results = (oss_part_task_result_t *)aos_palloc(parent_pool, sizeof(oss_part_task_result_t) * part_num);
- thr_params = (oss_upload_thread_params_t *)aos_palloc(parent_pool, sizeof(oss_upload_thread_params_t) * part_num);
- oss_build_thread_params(thr_params, part_num, parent_pool, options, bucket, object, filepath, &upload_id, parts, results);
- // upload parts
- rv = apr_thread_pool_create(&thrp, 0, thread_num, parent_pool);
- if (APR_SUCCESS != rv) {
- aos_status_set(ret, rv, AOS_CREATE_THREAD_POOL_ERROR_CODE, NULL);
- return ret;
- }
- rv = apr_queue_create(&failed_parts, part_num, parent_pool);
- if (APR_SUCCESS != rv) {
- aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
- return ret;
- }
- rv = apr_queue_create(&completed_parts, part_num, parent_pool);
- if (APR_SUCCESS != rv) {
- aos_status_set(ret, rv, AOS_CREATE_QUEUE_ERROR_CODE, NULL);
- return ret;
- }
- // launch
- oss_set_task_tracker(thr_params, part_num, &launched, &failed, &completed, failed_parts, completed_parts);
- for (i = 0; i < part_num; i++) {
- apr_thread_pool_push(thrp, upload_part, thr_params + i, 0, NULL);
- }
- // wait until all tasks exit
- total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
- for ( ; total_num < (apr_uint32_t)part_num; ) {
- rv = apr_queue_trypop(completed_parts, &task_result);
- if (rv == APR_EINTR || rv == APR_EAGAIN) {
- apr_sleep(1000);
- } else if(rv == APR_EOF) {
- break;
- } else if(rv == APR_SUCCESS) {
- task_res = (oss_part_task_result_t*)task_result;
- oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag);
- rv = oss_dump_checkpoint(parent_pool, checkpoint);
- if (rv != AOSE_OK) {
- int idx = task_res->part->index;
- aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL);
- apr_atomic_inc32(&failed);
- thr_params[idx].result->s = ret;
- apr_queue_push(failed_parts, thr_params[idx].result);
- }
- if (NULL != progress_callback) {
- consume_bytes += task_res->part->size;
- progress_callback(consume_bytes, finfo->size);
- }
- }
- total_num = apr_atomic_read32(&launched) + apr_atomic_read32(&failed) + apr_atomic_read32(&completed);
- }
- // deal with left successful parts
- while(APR_SUCCESS == apr_queue_trypop(completed_parts, &task_result)) {
- task_res = (oss_part_task_result_t*)task_result;
- oss_update_checkpoint(parent_pool, checkpoint, task_res->part->index, &task_res->etag);
- consume_bytes += task_res->part->size;
- has_left_result = AOS_TRUE;
- }
- if (has_left_result) {
- rv = oss_dump_checkpoint(parent_pool, checkpoint);
- if (rv != AOSE_OK) {
- aos_status_set(ret, rv, AOS_WRITE_FILE_ERROR_CODE, NULL);
- return ret;
- }
- if (NULL != progress_callback) {
- progress_callback(consume_bytes, finfo->size);
- }
- }
- apr_file_close(checkpoint->thefile);
- // failed
- if (apr_atomic_read32(&failed) > 0) {
- apr_queue_pop(failed_parts, &task_result);
- task_res = (oss_part_task_result_t*)task_result;
- s = aos_status_dup(parent_pool, task_res->s);
- oss_destroy_thread_pool(thr_params, part_num);
- return s;
- }
-
- // successful
- aos_pool_create(&subpool, parent_pool);
- aos_list_init(&completed_part_list);
- for (i = 0; i < checkpoint->part_num; i++) {
- complete_content = oss_create_complete_part_content(subpool);
- part_num_str = apr_psprintf(subpool, "%d", checkpoint->parts[i].index + 1);
- aos_str_set(&complete_content->part_number, part_num_str);
- aos_str_set(&complete_content->etag, checkpoint->parts[i].etag.data);
- aos_list_add_tail(&complete_content->node, &completed_part_list);
- }
- oss_destroy_thread_pool(thr_params, part_num);
- // complete upload
- options->pool = subpool;
- if (NULL != headers && NULL != apr_table_get(headers, OSS_CALLBACK)) {
- cb_headers = aos_table_make(subpool, 2);
- apr_table_set(cb_headers, OSS_CALLBACK, apr_table_get(headers, OSS_CALLBACK));
- if (NULL != apr_table_get(headers, OSS_CALLBACK_VAR)) {
- apr_table_set(cb_headers, OSS_CALLBACK_VAR, apr_table_get(headers, OSS_CALLBACK_VAR));
- }
- }
- s = oss_do_complete_multipart_upload(options, bucket, object, &upload_id,
- &completed_part_list, cb_headers, NULL, resp_headers, resp_body);
- s = aos_status_dup(parent_pool, s);
- aos_pool_destroy(subpool);
- options->pool = parent_pool;
- // remove chepoint file
- apr_file_remove(checkpoint_path->data, parent_pool);
-
- return s;
- }
- aos_status_t *oss_resumable_upload_file(oss_request_options_t *options,
- aos_string_t *bucket,
- aos_string_t *object,
- aos_string_t *filepath,
- aos_table_t *headers,
- aos_table_t *params,
- oss_resumable_clt_params_t *clt_params,
- oss_progress_callback progress_callback,
- aos_table_t **resp_headers,
- aos_list_t *resp_body)
- {
- int32_t thread_num = 0;
- int64_t part_size = 0;
- aos_string_t checkpoint_path;
- aos_pool_t *sub_pool;
- apr_finfo_t finfo;
- aos_status_t *s;
- int res;
- thread_num = oss_get_thread_num(clt_params);
- aos_pool_create(&sub_pool, options->pool);
- res = oss_get_file_info(filepath, sub_pool, &finfo);
- if (res != AOSE_OK) {
- aos_error_log("Open read file fail, filename:%s\n", filepath->data);
- s = aos_status_create(options->pool);
- aos_file_error_status_set(s, res);
- aos_pool_destroy(sub_pool);
- return s;
- }
- part_size = clt_params->part_size;
- oss_get_part_size(finfo.size, &part_size);
- if (NULL != clt_params && clt_params->enable_checkpoint) {
- oss_get_checkpoint_path(clt_params, filepath, sub_pool, &checkpoint_path);
- s = oss_resumable_upload_file_with_cp(options, bucket, object, filepath, headers, params, thread_num,
- part_size, &checkpoint_path, &finfo, progress_callback, resp_headers, resp_body);
- } else {
- s = oss_resumable_upload_file_without_cp(options, bucket, object, filepath, headers, params, thread_num,
- part_size, &finfo, progress_callback, resp_headers, resp_body);
- }
- aos_pool_destroy(sub_pool);
- return s;
- }
|