Skip to content
Snippets Groups Projects
Commit 28debbeb authored by Morris Jette's avatar Morris Jette
Browse files

burst_buffer/cray

Do not re-load state on a periodic basis as this will break our tracking until
  integrated with actual cray commands and we address race conditions
decrement usage counters when burst buffer teardown completes
Correct burst buffer string parsing for gres
Preeempt allocated buffers for higher priority jobs based upon GRES needs
parent 7d19601a
No related branches found
No related tags found
No related merge requests found
...@@ -105,11 +105,8 @@ typedef struct bb_user { ...@@ -105,11 +105,8 @@ typedef struct bb_user {
typedef struct { typedef struct {
char * name; /* Generic burst buffer resource, e.g. "nodes" */ char * name; /* Generic burst buffer resource, e.g. "nodes" */
uint32_t count; /* Count of required resources */ uint32_t count; /* Count of required resources */
uint32_t add_cnt; /* Count of additional resources required,
* temporary value used for scheduling logic */
uint32_t avail_cnt; /* Count of additional resources available,
* temporary value used for scheduling logic */
} bb_gres_t; } bb_gres_t;
typedef struct { typedef struct {
uint32_t gres_cnt; /* number of records in gres_ptr */ uint32_t gres_cnt; /* number of records in gres_ptr */
bb_gres_t *gres_ptr; bb_gres_t *gres_ptr;
......
...@@ -122,6 +122,12 @@ typedef struct { ...@@ -122,6 +122,12 @@ typedef struct {
} }
stage_args_t; stage_args_t;
typedef struct { /* Used for scheduling */
char * name; /* BB GRES name, e.g. "nodes" */
uint32_t add_cnt; /* Additional GRES required */
uint32_t avail_cnt; /* Additional GRES available */
} needed_gres_t;
static int _alloc_job_bb(struct job_record *job_ptr, bb_job_t *bb_spec); static int _alloc_job_bb(struct job_record *job_ptr, bb_job_t *bb_spec);
static void * _bb_agent(void *args); static void * _bb_agent(void *args);
static bb_entry_t *_bb_entry_get(int *num_ent, bb_state_t *state_ptr); static bb_entry_t *_bb_entry_get(int *num_ent, bb_state_t *state_ptr);
...@@ -424,6 +430,10 @@ static void _load_state(void) ...@@ -424,6 +430,10 @@ static void _load_state(void)
bb_entry_t *ents; bb_entry_t *ents;
int num_ents; int num_ents;
int i; int i;
static bool first_load = true;
//FIXME: Need logic to handle resource allocation/free in progress
if (!first_load) return;
first_load = false;
bb_state.last_load_time = time(NULL); bb_state.last_load_time = time(NULL);
ents = _bb_entry_get(&num_ents, &bb_state); ents = _bb_entry_get(&num_ents, &bb_state);
...@@ -432,7 +442,7 @@ static void _load_state(void) ...@@ -432,7 +442,7 @@ static void _load_state(void)
__func__); __func__);
return; return;
} }
//FIXME: Need logic to handle resource allocation/free in progress
for (i = 0; i < num_ents; i++) { for (i = 0; i < num_ents; i++) {
/* ID: "bytes" */ /* ID: "bytes" */
if (strcmp(ents[i].id, "bytes") == 0) { if (strcmp(ents[i].id, "bytes") == 0) {
...@@ -559,7 +569,7 @@ static int _queue_stage_in(struct job_record *job_ptr) ...@@ -559,7 +569,7 @@ static int _queue_stage_in(struct job_record *job_ptr)
} else { } else {
tok = strstr(job_ptr->burst_buffer, "SLURM_GRES="); tok = strstr(job_ptr->burst_buffer, "SLURM_GRES=");
if (tok) { if (tok) {
tok = strstr(tok, "nodes="); tok = strstr(tok, "nodes:");
if (tok) { if (tok) {
i = atoi(tok + 6); i = atoi(tok + 6);
xstrfmtcat(capacity, "nodes:%u", i); xstrfmtcat(capacity, "nodes:%u", i);
...@@ -924,7 +934,7 @@ static void *_start_teardown(void *x) ...@@ -924,7 +934,7 @@ static void *_start_teardown(void *x)
{ {
stage_args_t *teardown_args; stage_args_t *teardown_args;
char *bbs_teardown_path, **teardown_argv, *resp_msg = NULL; char *bbs_teardown_path, **teardown_argv, *resp_msg = NULL;
int i, status = 0, timeout; int i, j, status = 0, timeout;
struct job_record *job_ptr; struct job_record *job_ptr;
bb_alloc_t *bb_ptr = NULL; bb_alloc_t *bb_ptr = NULL;
DEF_TIMERS; DEF_TIMERS;
...@@ -960,6 +970,30 @@ static void *_start_teardown(void *x) ...@@ -960,6 +970,30 @@ static void *_start_teardown(void *x)
bb_ptr->end_time = 0; bb_ptr->end_time = 0;
bb_ptr->state = BB_STATE_COMPLETE; bb_ptr->state = BB_STATE_COMPLETE;
bb_ptr->state_time = time(NULL); bb_ptr->state_time = time(NULL);
if (bb_ptr->size > bb_state.used_space) {
error("%s: space underflow", __func__);
bb_state.used_space = 0;
} else {
bb_state.used_space -= bb_ptr->size;
}
for (i = 0; i < bb_ptr->gres_cnt; i++) {
for (j = 0; j < bb_state.bb_config.gres_cnt; j++) {
if (strcmp(bb_ptr->gres_ptr[i].name,
bb_state.bb_config.gres_ptr[j].name))
continue;
if (bb_ptr->gres_ptr[i].used_cnt >
bb_state.bb_config.gres_ptr[j].used_cnt) {
error("%s: gres %s counter underflow",
__func__,
bb_state.bb_config.gres_ptr[j].name);
bb_state.bb_config.gres_ptr[j].used_cnt=0;
} else {
bb_state.bb_config.gres_ptr[j].used_cnt-=
bb_ptr->gres_ptr[i].used_cnt;
}
break;
}
}
} else { } else {
error("%s: unable to find bb record for job %u", error("%s: unable to find bb record for job %u",
__func__, teardown_args->job_id); __func__, teardown_args->job_id);
...@@ -975,6 +1009,18 @@ static void *_start_teardown(void *x) ...@@ -975,6 +1009,18 @@ static void *_start_teardown(void *x)
return NULL; return NULL;
} }
static void _free_needed_gres_struct(needed_gres_t *needed_gres_ptr,
int gres_cnt)
{
int i;
if (needed_gres_ptr == NULL)
return;
for (i = 0; i < gres_cnt; i++)
xfree(needed_gres_ptr->name);
xfree(needed_gres_ptr);
}
/* Test if a job can be allocated a burst buffer. /* Test if a job can be allocated a burst buffer.
* This may preempt currently active stage-in for higher priority jobs. * This may preempt currently active stage-in for higher priority jobs.
* *
...@@ -985,6 +1031,9 @@ static void *_start_teardown(void *x) ...@@ -985,6 +1031,9 @@ static void *_start_teardown(void *x)
*/ */
static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec) static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec)
{ {
needed_gres_t *needed_gres_ptr = NULL;
int add_total_gres_needed = 0, add_total_gres_avail = 0;
struct preempt_bb_recs *preempt_ptr = NULL; struct preempt_bb_recs *preempt_ptr = NULL;
List preempt_list; List preempt_list;
ListIterator preempt_iter; ListIterator preempt_iter;
...@@ -992,10 +1041,9 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec) ...@@ -992,10 +1041,9 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec)
uint32_t tmp_f, tmp_g, tmp_u, tmp_j, lim_u, add_space; uint32_t tmp_f, tmp_g, tmp_u, tmp_j, lim_u, add_space;
int add_total_space_needed = 0, add_user_space_needed = 0; int add_total_space_needed = 0, add_user_space_needed = 0;
int add_total_space_avail = 0, add_user_space_avail = 0; int add_total_space_avail = 0, add_user_space_avail = 0;
int add_gres_needed = 0;
time_t now = time(NULL); time_t now = time(NULL);
bb_alloc_t *bb_ptr = NULL; bb_alloc_t *bb_ptr = NULL;
int i, j; int d, i, j, k;
xassert(bb_spec); xassert(bb_spec);
add_space = bb_spec->total_size; add_space = bb_spec->total_size;
...@@ -1016,12 +1064,16 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec) ...@@ -1016,12 +1064,16 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec)
tmp_u = user_ptr->size; tmp_u = user_ptr->size;
tmp_j = add_space; tmp_j = add_space;
lim_u = bb_state.bb_config.user_size_limit; lim_u = bb_state.bb_config.user_size_limit;
if (tmp_u + tmp_j > lim_u)
add_user_space_needed = tmp_u + tmp_j - lim_u; add_user_space_needed = tmp_u + tmp_j - lim_u;
} }
add_total_space_needed = bb_state.used_space + add_space - if (bb_state.used_space + add_space > bb_state.total_space) {
bb_state.total_space; add_total_space_needed = bb_state.used_space + add_space -
bb_state.total_space;
}
needed_gres_ptr = xmalloc(sizeof(needed_gres_t) * bb_spec->gres_cnt);
for (i = 0; i < bb_spec->gres_cnt; i++) { for (i = 0; i < bb_spec->gres_cnt; i++) {
needed_gres_ptr[i].name = xstrdup(bb_spec->gres_ptr[i].name);
for (j = 0; j < bb_state.bb_config.gres_cnt; j++) { for (j = 0; j < bb_state.bb_config.gres_cnt; j++) {
if (strcmp(bb_spec->gres_ptr[i].name, if (strcmp(bb_spec->gres_ptr[i].name,
bb_state.bb_config.gres_ptr[j].name)) bb_state.bb_config.gres_ptr[j].name))
...@@ -1034,33 +1086,33 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec) ...@@ -1034,33 +1086,33 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec)
bb_spec->gres_ptr[i].count = tmp_g; bb_spec->gres_ptr[i].count = tmp_g;
if (tmp_g > bb_state.bb_config.gres_ptr[j].avail_cnt) { if (tmp_g > bb_state.bb_config.gres_ptr[j].avail_cnt) {
debug("%s: job %u requests more %s GRES than" debug("%s: job %u requests more %s GRES than"
"confingured", "configured",
__func__, job_ptr->job_id, __func__, job_ptr->job_id,
bb_spec->gres_ptr[i].name); bb_spec->gres_ptr[i].name);
_free_needed_gres_struct(needed_gres_ptr,
bb_spec->gres_cnt);
return 1; return 1;
} }
tmp_f = bb_state.bb_config.gres_ptr[j].avail_cnt - tmp_f = bb_state.bb_config.gres_ptr[j].avail_cnt -
bb_state.bb_config.gres_ptr[j].used_cnt; bb_state.bb_config.gres_ptr[j].used_cnt;
if (tmp_g > tmp_f) if (tmp_g > tmp_f)
bb_spec->gres_ptr[i].add_cnt = tmp_g - tmp_f; needed_gres_ptr[i].add_cnt = tmp_g - tmp_f;
else add_total_gres_needed += needed_gres_ptr[i].add_cnt;
bb_spec->gres_ptr[i].add_cnt = 0;
add_gres_needed += bb_spec->gres_ptr[i].add_cnt;
bb_spec->gres_ptr[i].avail_cnt = 0;
break; break;
} }
if (j >= bb_state.bb_config.gres_cnt) { if (j >= bb_state.bb_config.gres_cnt) {
debug("%s: job %u requests %s GRES which are undefined", debug("%s: job %u requests %s GRES which are undefined",
__func__, job_ptr->job_id, __func__, job_ptr->job_id,
bb_spec->gres_ptr[i].name); bb_spec->gres_ptr[i].name);
_free_needed_gres_struct(needed_gres_ptr,
bb_spec->gres_cnt);
return 1; return 1;
} }
} }
if ((add_total_space_needed <= 0) && if ((add_total_space_needed <= 0) &&
(add_user_space_needed <= 0) && (add_gres_needed <= 0)) (add_user_space_needed <= 0) && (add_total_gres_needed <= 0))
return 0; return 0;
//FIXME: More work needed below to identify jobs which can release GRES and tear them down
/* Identify candidate burst buffers to revoke for higher priority job */ /* Identify candidate burst buffers to revoke for higher priority job */
preempt_list = list_create(bb_job_queue_del); preempt_list = list_create(bb_job_queue_del);
for (i = 0; i < BB_HASH_SIZE; i++) { for (i = 0; i < BB_HASH_SIZE; i++) {
...@@ -1077,40 +1129,82 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec) ...@@ -1077,40 +1129,82 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec)
preempt_ptr->use_time = bb_ptr->use_time; preempt_ptr->use_time = bb_ptr->use_time;
preempt_ptr->user_id = bb_ptr->user_id; preempt_ptr->user_id = bb_ptr->user_id;
list_push(preempt_list, preempt_ptr); list_push(preempt_list, preempt_ptr);
add_total_space_avail += bb_ptr->size; add_total_space_avail += bb_ptr->size;
if (bb_ptr->user_id == job_ptr->user_id); if (bb_ptr->user_id == job_ptr->user_id);
add_user_space_avail += bb_ptr->size; add_user_space_avail += bb_ptr->size;
if (add_total_gres_needed<add_total_gres_avail)
j = bb_ptr->gres_cnt;
else
j = 0;
for ( ; j < bb_ptr->gres_cnt; j++) {
d = needed_gres_ptr[j].add_cnt -
needed_gres_ptr[j].avail_cnt;
if (d <= 0)
continue;
for (k = 0; k < bb_spec->gres_cnt; k++){
if (strcmp(needed_gres_ptr[j].name,
bb_spec->gres_ptr[k].name))
continue;
if (bb_spec->gres_ptr[k].count <
d) {
d = bb_spec->
gres_ptr[k].count;
}
add_total_gres_avail += d;
needed_gres_ptr[j].avail_cnt+=d;
}
}
} }
bb_ptr = bb_ptr->next; bb_ptr = bb_ptr->next;
} }
} }
if ((add_total_space_avail >= add_total_space_needed) && if ((add_total_space_avail >= add_total_space_needed) &&
(add_user_space_avail >= add_user_space_needed)) { (add_user_space_avail >= add_user_space_needed) &&
(add_total_gres_avail >= add_total_gres_needed)) {
list_sort(preempt_list, bb_preempt_queue_sort); list_sort(preempt_list, bb_preempt_queue_sort);
preempt_iter = list_iterator_create(preempt_list); preempt_iter = list_iterator_create(preempt_list);
while ((preempt_ptr = list_next(preempt_iter)) && while ((preempt_ptr = list_next(preempt_iter)) &&
(add_total_space_needed || add_user_space_needed || (add_total_space_needed || add_user_space_needed ||
add_gres_needed)) { add_total_gres_needed)) {
bool do_preempt = false;
if (add_user_space_needed && if (add_user_space_needed &&
(preempt_ptr->user_id == job_ptr->user_id)) { (preempt_ptr->user_id == job_ptr->user_id)) {
preempt_ptr->bb_ptr->cancelled = true; do_preempt = true;
preempt_ptr->bb_ptr->end_time = 0;
preempt_ptr->bb_ptr->state = BB_STATE_TEARDOWN;
preempt_ptr->bb_ptr->state_time = time(NULL);
_queue_teardown(preempt_ptr->job_id, true);
if (bb_state.bb_config.debug_flag) {
info("%s: %s: Preempting stage-in of "
"job %u for job %u", plugin_type,
__func__, preempt_ptr->job_id,
job_ptr->job_id);
}
add_user_space_needed -= preempt_ptr->size; add_user_space_needed -= preempt_ptr->size;
add_total_space_needed -= preempt_ptr->size; add_total_space_needed -= preempt_ptr->size;
continue;
} }
if ((add_total_space_needed > add_user_space_needed) && if ((add_total_space_needed > add_user_space_needed) &&
(preempt_ptr->user_id != job_ptr->user_id)) { (preempt_ptr->user_id != job_ptr->user_id)) {
do_preempt = true;
add_total_space_needed -= preempt_ptr->size;
continue;
}
if (add_total_gres_needed) {
for (j = 0; j < bb_spec->gres_cnt; j++) {
d = needed_gres_ptr[j].add_cnt;
if (d <= 0)
continue;
for (k = 0;
k < preempt_ptr->bb_ptr->gres_cnt;
k++) {
if (strcmp(needed_gres_ptr[j].name,
preempt_ptr->bb_ptr->
gres_ptr[k].name))
continue;
if (preempt_ptr->bb_ptr->
gres_ptr[k].used_cnt < d) {
d = preempt_ptr->bb_ptr->
gres_ptr[k].used_cnt;
}
add_total_gres_needed -= d;
needed_gres_ptr[j].add_cnt -= d;
do_preempt = true;
}
}
}
if (do_preempt) {
preempt_ptr->bb_ptr->cancelled = true; preempt_ptr->bb_ptr->cancelled = true;
preempt_ptr->bb_ptr->end_time = 0; preempt_ptr->bb_ptr->end_time = 0;
preempt_ptr->bb_ptr->state = BB_STATE_TEARDOWN; preempt_ptr->bb_ptr->state = BB_STATE_TEARDOWN;
...@@ -1122,13 +1216,12 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec) ...@@ -1122,13 +1216,12 @@ static int _test_size_limit(struct job_record *job_ptr, bb_job_t *bb_spec)
__func__, preempt_ptr->job_id, __func__, preempt_ptr->job_id,
job_ptr->job_id); job_ptr->job_id);
} }
add_total_space_needed -= preempt_ptr->size;
} }
} }
list_iterator_destroy(preempt_iter); list_iterator_destroy(preempt_iter);
} }
list_destroy(preempt_list); list_destroy(preempt_list);
_free_needed_gres_struct(needed_gres_ptr, bb_spec->gres_cnt);
return 2; return 2;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment