diff --git a/src/plugins/burst_buffer/cray/burst_buffer_cray.c b/src/plugins/burst_buffer/cray/burst_buffer_cray.c index c6250271a14e8dfa14f0b9eaa5658ff44b955730..5f904bea4f0a786dce71500166a7cb0913d082d7 100644 --- a/src/plugins/burst_buffer/cray/burst_buffer_cray.c +++ b/src/plugins/burst_buffer/cray/burst_buffer_cray.c @@ -211,7 +211,7 @@ static void _json_parse_pools_object(json_object *jobj, bb_pools_t *ent); static void _json_parse_sessions_object(json_object *jobj, bb_sessions_t *ent); static void _log_script_argv(char **script_argv, char *resp_msg); -static void _load_state(void); +static void _load_state(bool init_config); static int _open_part_state_file(char **state_file); static int _parse_bb_opts(struct job_descriptor *job_desc, uint64_t *bb_size); @@ -402,9 +402,9 @@ static void *_bb_agent(void *args) bb_sleep(&bb_state, AGENT_INTERVAL); if (bb_state.term_flag) break; + _load_state(false); /* Has own locking */ lock_slurmctld(job_write_lock); pthread_mutex_lock(&bb_state.bb_mutex); - _load_state(); _timeout_bb_rec(); pthread_mutex_unlock(&bb_state.bb_mutex); unlock_slurmctld(job_write_lock); @@ -820,7 +820,7 @@ unpack_error: /* * Determine the current actual burst buffer state. */ -static void _load_state(void) +static void _load_state(bool init_config) { burst_buffer_gres_t *gres_ptr; bb_configs_t *configs; @@ -831,7 +831,6 @@ static void _load_state(void) int num_configs = 0, num_instances = 0, num_pools = 0, num_sessions = 0; int i, j; char *end_ptr = NULL; - static bool first_load = true; /* * Load the pools information @@ -843,6 +842,7 @@ static void _load_state(void) return; } + pthread_mutex_lock(&bb_state.bb_mutex); if (!bb_state.bb_config.default_pool && (num_pools > 0)) { info("%s: Setting DefaultPool to %s", __func__, pools[0].id); bb_state.bb_config.default_pool = xstrdup(pools[0].id); @@ -862,25 +862,22 @@ static void _load_state(void) /* Everything else is a generic burst buffer resource */ bb_state.bb_config.gres_cnt = 0; - continue; + } else { + bb_state.bb_config.gres_ptr + = xrealloc(bb_state.bb_config.gres_ptr, + sizeof(burst_buffer_gres_t) * + (bb_state.bb_config.gres_cnt + 1)); + gres_ptr = bb_state.bb_config.gres_ptr + + bb_state.bb_config.gres_cnt; + bb_state.bb_config.gres_cnt++; + gres_ptr->avail_cnt = pools[i].quantity; + gres_ptr->granularity = pools[i].granularity; + gres_ptr->name = xstrdup(pools[i].id); + gres_ptr->used_cnt = pools[i].quantity - pools[i].free; } - - bb_state.bb_config.gres_ptr - = xrealloc(bb_state.bb_config.gres_ptr, - sizeof(burst_buffer_gres_t) * - (bb_state.bb_config.gres_cnt + 1)); - gres_ptr = bb_state.bb_config.gres_ptr + - bb_state.bb_config.gres_cnt; - bb_state.bb_config.gres_cnt++; - gres_ptr->avail_cnt = pools[i].quantity; - gres_ptr->granularity = pools[i].granularity; - gres_ptr->name = xstrdup(pools[i].id); - gres_ptr->used_cnt = pools[i].quantity - pools[i].free; } + pthread_mutex_unlock(&bb_state.bb_mutex); _bb_free_pools(pools, num_pools); - bb_state.last_load_time = time(NULL); - if (!first_load) - return; /* * Load the instances information @@ -890,9 +887,22 @@ static void _load_state(void) info("%s: failed to find DataWarp instances", __func__); } sessions = _bb_get_sessions(&num_sessions, &bb_state); + pthread_mutex_lock(&bb_state.bb_mutex); + bb_state.last_load_time = time(NULL); for (i = 0; i < num_sessions; i++) { - bb_alloc = bb_alloc_name_rec(&bb_state, - sessions[i].token, + if (!init_config) { + bb_alloc = bb_find_name_rec(sessions[i].token, + sessions[i].user_id, + &bb_state); + if (bb_alloc) { + bb_alloc->seen_time = bb_state.last_load_time; + continue; + } + error("%s: Unexpected burst buffer %s found", + __func__, sessions[i].token); + } + + bb_alloc = bb_alloc_name_rec(&bb_state, sessions[i].token, sessions[i].user_id); if ((sessions[i].token[0] >='0') && (sessions[i].token[0] <='9')) { @@ -901,10 +911,18 @@ static void _load_state(void) } for (j = 0; j < num_instances; j++) bb_alloc->size = instances[j].bytes; + bb_alloc->seen_time = bb_state.last_load_time; + + if (!init_config) /* Newly found buffer */ + bb_add_user_load(bb_alloc, &bb_state); } + pthread_mutex_unlock(&bb_state.bb_mutex); _bb_free_sessions(sessions, num_sessions); _bb_free_instances(instances, num_instances); + if (!init_config) + return; + /* * Load the configurations information */ @@ -912,13 +930,12 @@ static void _load_state(void) if (configs == NULL) { info("%s: failed to find DataWarp configurations", __func__); } -//FIXME: configurations data is currently unused, is it needed? _bb_free_configs(configs, num_sessions); +//FIXME: configurations data is currently unused, is it needed? _recover_limit_state(); _apply_limits(); - first_load = false; return; } @@ -1823,8 +1840,6 @@ static void _timeout_bb_rec(void) bb_pptr = &bb_state.bb_ahash[i]; bb_ptr = bb_state.bb_ahash[i]; while (bb_ptr) { -//FIXME: Need to add BBS load state logic to track persistent BB limits -bb_ptr->seen_time = bb_state.last_load_time; if (bb_ptr->seen_time < bb_state.last_load_time) { if (bb_ptr->job_id == 0) { info("%s: Persistent burst buffer %s " @@ -2257,12 +2272,16 @@ static void _purge_vestigial_bufs(void) */ extern int bb_p_load_state(bool init_config) { - pthread_mutex_lock(&bb_state.bb_mutex); + if (!init_config) + return SLURM_SUCCESS; + + /* In practice the Cray APIs are too slow to run inline on each + * scheduling cycle. Do so on a periodic basis from _bb_agent(). */ if (bb_state.bb_config.debug_flag) debug("%s: %s", plugin_type, __func__); - _load_state(); - if (init_config) - _purge_vestigial_bufs(); + _load_state(init_config); /* Has own locking */ + pthread_mutex_lock(&bb_state.bb_mutex); + _purge_vestigial_bufs(); pthread_mutex_unlock(&bb_state.bb_mutex); return SLURM_SUCCESS; @@ -3386,7 +3405,6 @@ if (0) { //FIXME: Cray bug: API exit code NOT 0 on success as documented _reset_buf_state(create_args->user_id, create_args->job_id, create_args->name, BB_STATE_ALLOCATED); -//FIXME: Review logic below bb_alloc = bb_alloc_name_rec(&bb_state, create_args->name, create_args->user_id); bb_alloc->size = create_args->size;