Skip to content
Snippets Groups Projects
Commit 0400c27f authored by Morris Jette's avatar Morris Jette
Browse files

burst_buffer/cray: Recognize external buffer mods

If a burst buffer is found that was created outside of Slurm then
  add it to Slurm's data, including limits logic.
If a Slurm burst buffer was destroyted outside of Slurm, then
  remove it from Slurm's data, including limits logic.
parent 70be2b4a
No related branches found
No related tags found
No related merge requests found
...@@ -211,7 +211,7 @@ static void _json_parse_pools_object(json_object *jobj, bb_pools_t *ent); ...@@ -211,7 +211,7 @@ static void _json_parse_pools_object(json_object *jobj, bb_pools_t *ent);
static void _json_parse_sessions_object(json_object *jobj, static void _json_parse_sessions_object(json_object *jobj,
bb_sessions_t *ent); bb_sessions_t *ent);
static void _log_script_argv(char **script_argv, char *resp_msg); static void _log_script_argv(char **script_argv, char *resp_msg);
static void _load_state(void); static void _load_state(bool init_config);
static int _open_part_state_file(char **state_file); static int _open_part_state_file(char **state_file);
static int _parse_bb_opts(struct job_descriptor *job_desc, static int _parse_bb_opts(struct job_descriptor *job_desc,
uint64_t *bb_size); uint64_t *bb_size);
...@@ -402,9 +402,9 @@ static void *_bb_agent(void *args) ...@@ -402,9 +402,9 @@ static void *_bb_agent(void *args)
bb_sleep(&bb_state, AGENT_INTERVAL); bb_sleep(&bb_state, AGENT_INTERVAL);
if (bb_state.term_flag) if (bb_state.term_flag)
break; break;
_load_state(false); /* Has own locking */
lock_slurmctld(job_write_lock); lock_slurmctld(job_write_lock);
pthread_mutex_lock(&bb_state.bb_mutex); pthread_mutex_lock(&bb_state.bb_mutex);
_load_state();
_timeout_bb_rec(); _timeout_bb_rec();
pthread_mutex_unlock(&bb_state.bb_mutex); pthread_mutex_unlock(&bb_state.bb_mutex);
unlock_slurmctld(job_write_lock); unlock_slurmctld(job_write_lock);
...@@ -820,7 +820,7 @@ unpack_error: ...@@ -820,7 +820,7 @@ unpack_error:
/* /*
* Determine the current actual burst buffer state. * Determine the current actual burst buffer state.
*/ */
static void _load_state(void) static void _load_state(bool init_config)
{ {
burst_buffer_gres_t *gres_ptr; burst_buffer_gres_t *gres_ptr;
bb_configs_t *configs; bb_configs_t *configs;
...@@ -831,7 +831,6 @@ static void _load_state(void) ...@@ -831,7 +831,6 @@ static void _load_state(void)
int num_configs = 0, num_instances = 0, num_pools = 0, num_sessions = 0; int num_configs = 0, num_instances = 0, num_pools = 0, num_sessions = 0;
int i, j; int i, j;
char *end_ptr = NULL; char *end_ptr = NULL;
static bool first_load = true;
/* /*
* Load the pools information * Load the pools information
...@@ -843,6 +842,7 @@ static void _load_state(void) ...@@ -843,6 +842,7 @@ static void _load_state(void)
return; return;
} }
pthread_mutex_lock(&bb_state.bb_mutex);
if (!bb_state.bb_config.default_pool && (num_pools > 0)) { if (!bb_state.bb_config.default_pool && (num_pools > 0)) {
info("%s: Setting DefaultPool to %s", __func__, pools[0].id); info("%s: Setting DefaultPool to %s", __func__, pools[0].id);
bb_state.bb_config.default_pool = xstrdup(pools[0].id); bb_state.bb_config.default_pool = xstrdup(pools[0].id);
...@@ -862,25 +862,22 @@ static void _load_state(void) ...@@ -862,25 +862,22 @@ static void _load_state(void)
/* Everything else is a generic burst buffer resource */ /* Everything else is a generic burst buffer resource */
bb_state.bb_config.gres_cnt = 0; bb_state.bb_config.gres_cnt = 0;
continue; } else {
bb_state.bb_config.gres_ptr
= xrealloc(bb_state.bb_config.gres_ptr,
sizeof(burst_buffer_gres_t) *
(bb_state.bb_config.gres_cnt + 1));
gres_ptr = bb_state.bb_config.gres_ptr +
bb_state.bb_config.gres_cnt;
bb_state.bb_config.gres_cnt++;
gres_ptr->avail_cnt = pools[i].quantity;
gres_ptr->granularity = pools[i].granularity;
gres_ptr->name = xstrdup(pools[i].id);
gres_ptr->used_cnt = pools[i].quantity - pools[i].free;
} }
bb_state.bb_config.gres_ptr
= xrealloc(bb_state.bb_config.gres_ptr,
sizeof(burst_buffer_gres_t) *
(bb_state.bb_config.gres_cnt + 1));
gres_ptr = bb_state.bb_config.gres_ptr +
bb_state.bb_config.gres_cnt;
bb_state.bb_config.gres_cnt++;
gres_ptr->avail_cnt = pools[i].quantity;
gres_ptr->granularity = pools[i].granularity;
gres_ptr->name = xstrdup(pools[i].id);
gres_ptr->used_cnt = pools[i].quantity - pools[i].free;
} }
pthread_mutex_unlock(&bb_state.bb_mutex);
_bb_free_pools(pools, num_pools); _bb_free_pools(pools, num_pools);
bb_state.last_load_time = time(NULL);
if (!first_load)
return;
/* /*
* Load the instances information * Load the instances information
...@@ -890,9 +887,22 @@ static void _load_state(void) ...@@ -890,9 +887,22 @@ static void _load_state(void)
info("%s: failed to find DataWarp instances", __func__); info("%s: failed to find DataWarp instances", __func__);
} }
sessions = _bb_get_sessions(&num_sessions, &bb_state); sessions = _bb_get_sessions(&num_sessions, &bb_state);
pthread_mutex_lock(&bb_state.bb_mutex);
bb_state.last_load_time = time(NULL);
for (i = 0; i < num_sessions; i++) { for (i = 0; i < num_sessions; i++) {
bb_alloc = bb_alloc_name_rec(&bb_state, if (!init_config) {
sessions[i].token, bb_alloc = bb_find_name_rec(sessions[i].token,
sessions[i].user_id,
&bb_state);
if (bb_alloc) {
bb_alloc->seen_time = bb_state.last_load_time;
continue;
}
error("%s: Unexpected burst buffer %s found",
__func__, sessions[i].token);
}
bb_alloc = bb_alloc_name_rec(&bb_state, sessions[i].token,
sessions[i].user_id); sessions[i].user_id);
if ((sessions[i].token[0] >='0') && if ((sessions[i].token[0] >='0') &&
(sessions[i].token[0] <='9')) { (sessions[i].token[0] <='9')) {
...@@ -901,10 +911,18 @@ static void _load_state(void) ...@@ -901,10 +911,18 @@ static void _load_state(void)
} }
for (j = 0; j < num_instances; j++) for (j = 0; j < num_instances; j++)
bb_alloc->size = instances[j].bytes; bb_alloc->size = instances[j].bytes;
bb_alloc->seen_time = bb_state.last_load_time;
if (!init_config) /* Newly found buffer */
bb_add_user_load(bb_alloc, &bb_state);
} }
pthread_mutex_unlock(&bb_state.bb_mutex);
_bb_free_sessions(sessions, num_sessions); _bb_free_sessions(sessions, num_sessions);
_bb_free_instances(instances, num_instances); _bb_free_instances(instances, num_instances);
if (!init_config)
return;
/* /*
* Load the configurations information * Load the configurations information
*/ */
...@@ -912,13 +930,12 @@ static void _load_state(void) ...@@ -912,13 +930,12 @@ static void _load_state(void)
if (configs == NULL) { if (configs == NULL) {
info("%s: failed to find DataWarp configurations", __func__); info("%s: failed to find DataWarp configurations", __func__);
} }
//FIXME: configurations data is currently unused, is it needed?
_bb_free_configs(configs, num_sessions); _bb_free_configs(configs, num_sessions);
//FIXME: configurations data is currently unused, is it needed?
_recover_limit_state(); _recover_limit_state();
_apply_limits(); _apply_limits();
first_load = false;
return; return;
} }
...@@ -1823,8 +1840,6 @@ static void _timeout_bb_rec(void) ...@@ -1823,8 +1840,6 @@ static void _timeout_bb_rec(void)
bb_pptr = &bb_state.bb_ahash[i]; bb_pptr = &bb_state.bb_ahash[i];
bb_ptr = bb_state.bb_ahash[i]; bb_ptr = bb_state.bb_ahash[i];
while (bb_ptr) { while (bb_ptr) {
//FIXME: Need to add BBS load state logic to track persistent BB limits
bb_ptr->seen_time = bb_state.last_load_time;
if (bb_ptr->seen_time < bb_state.last_load_time) { if (bb_ptr->seen_time < bb_state.last_load_time) {
if (bb_ptr->job_id == 0) { if (bb_ptr->job_id == 0) {
info("%s: Persistent burst buffer %s " info("%s: Persistent burst buffer %s "
...@@ -2257,12 +2272,16 @@ static void _purge_vestigial_bufs(void) ...@@ -2257,12 +2272,16 @@ static void _purge_vestigial_bufs(void)
*/ */
extern int bb_p_load_state(bool init_config) extern int bb_p_load_state(bool init_config)
{ {
pthread_mutex_lock(&bb_state.bb_mutex); if (!init_config)
return SLURM_SUCCESS;
/* In practice the Cray APIs are too slow to run inline on each
* scheduling cycle. Do so on a periodic basis from _bb_agent(). */
if (bb_state.bb_config.debug_flag) if (bb_state.bb_config.debug_flag)
debug("%s: %s", plugin_type, __func__); debug("%s: %s", plugin_type, __func__);
_load_state(); _load_state(init_config); /* Has own locking */
if (init_config) pthread_mutex_lock(&bb_state.bb_mutex);
_purge_vestigial_bufs(); _purge_vestigial_bufs();
pthread_mutex_unlock(&bb_state.bb_mutex); pthread_mutex_unlock(&bb_state.bb_mutex);
return SLURM_SUCCESS; return SLURM_SUCCESS;
...@@ -3386,7 +3405,6 @@ if (0) { //FIXME: Cray bug: API exit code NOT 0 on success as documented ...@@ -3386,7 +3405,6 @@ if (0) { //FIXME: Cray bug: API exit code NOT 0 on success as documented
_reset_buf_state(create_args->user_id, _reset_buf_state(create_args->user_id,
create_args->job_id, create_args->name, create_args->job_id, create_args->name,
BB_STATE_ALLOCATED); BB_STATE_ALLOCATED);
//FIXME: Review logic below
bb_alloc = bb_alloc_name_rec(&bb_state, create_args->name, bb_alloc = bb_alloc_name_rec(&bb_state, create_args->name,
create_args->user_id); create_args->user_id);
bb_alloc->size = create_args->size; bb_alloc->size = create_args->size;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment