diff --git a/doc/man/man5/burst_buffer.conf.5 b/doc/man/man5/burst_buffer.conf.5 index 19e788f46b957f0e35401d6708004d8109bba74e..ff18d6e035d23765772e46a24071c269b7f4cdd9 100644 --- a/doc/man/man5/burst_buffer.conf.5 +++ b/doc/man/man5/burst_buffer.conf.5 @@ -1,4 +1,4 @@ -.TH "burst_buffer.conf" "5" "November 2014" "burst_buffer.conf 14.11" "Slurm configuration file" +.TH "burst_buffer.conf" "5" "December 2014" "burst_buffer.conf 15.08" "Slurm configuration file" .SH "NAME" burst_buffer.conf \- Slurm configuration file for burst buffer management. @@ -12,6 +12,11 @@ DEFAULT_SLURM_CONF parameter or at execution time by setting the SLURM_CONF environment variable. The file will always be located in the same directory as the \fBslurm.conf\fP file. +In order to support multiple configuration files for mutliple burst buffer +plugins, the configuration file may alternately be given a name containing +the plugin name. +For example, if "burst_buffer.conf" is not found, the burst_buffer/generic +configuration could be read from a file named "burst_buffer_generic.conf". .LP Parameter names are case insensitive. Any text following a "#" in the configuration file is treated @@ -63,6 +68,13 @@ This option will adjust a job's nice value, so user specified nice values for a job will be replaced by this value. he \fBPrioBoostAlloc\fR value will override the \fBPrioBoostUse\fR value. +.TP +\fBPrivateData\fR +If set to "true" then users will only be able to view burst buffers they can +use. +Slurm administrators will still be able to view all burst buffers. +By default, users can view all burst buffers. + .TP \fBStageInTimeout\fR If the stage in of files for a job takes more than this number of seconds, @@ -119,6 +131,8 @@ By default there is no job allocation size limit. .br AllowUsers=alan:brenda .br +PrivateData=true +.br # .br JobSizeLimit=20GB # Applies to each job diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index b213510bd27101d831fd5dccd42055c5a355f26b..7df6f61ac99b9690cf75c4cfa6ac9a630dd7e01f 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -4289,6 +4289,7 @@ typedef struct { char *get_sys_state; uint32_t job_size_limit; /* In GB or NO_VAL */ char *name; /* Plugin name */ + uint16_t private_data; /* limit access to data */ uint32_t prio_boost_alloc; /* job priority boost after staged in */ uint32_t prio_boost_use; /* job priority boost on use */ uint32_t stage_in_timeout; /* Seconds or zero */ diff --git a/src/api/burst_buffer_info.c b/src/api/burst_buffer_info.c index 2f7a256188645bb915f47d5e972118ec1c4cb580..de357c1722ae047e7377d6107975111381982940 100644 --- a/src/api/burst_buffer_info.c +++ b/src/api/burst_buffer_info.c @@ -277,7 +277,7 @@ extern void slurm_print_burst_buffer_record(FILE *out, /****** Line 6 ******/ snprintf(tmp_line, sizeof(tmp_line), - "GetSysState=%s", burst_buffer_ptr->get_sys_state); + "PrivateData=%u ", burst_buffer_ptr->private_data); xstrcat(out_buf, tmp_line); if (one_liner) xstrcat(out_buf, " "); @@ -286,7 +286,7 @@ extern void slurm_print_burst_buffer_record(FILE *out, /****** Line 7 ******/ snprintf(tmp_line, sizeof(tmp_line), - "StartStageIn=%s", burst_buffer_ptr->start_stage_in); + "GetSysState=%s", burst_buffer_ptr->get_sys_state); xstrcat(out_buf, tmp_line); if (one_liner) xstrcat(out_buf, " "); @@ -295,7 +295,7 @@ extern void slurm_print_burst_buffer_record(FILE *out, /****** Line 8 ******/ snprintf(tmp_line, sizeof(tmp_line), - "StartStageIn=%s", burst_buffer_ptr->start_stage_out); + "StartStageIn=%s", burst_buffer_ptr->start_stage_in); xstrcat(out_buf, tmp_line); if (one_liner) xstrcat(out_buf, " "); @@ -304,7 +304,7 @@ extern void slurm_print_burst_buffer_record(FILE *out, /****** Line 9 ******/ snprintf(tmp_line, sizeof(tmp_line), - "StopStageIn=%s", burst_buffer_ptr->stop_stage_in); + "StartStageIn=%s", burst_buffer_ptr->start_stage_out); xstrcat(out_buf, tmp_line); if (one_liner) xstrcat(out_buf, " "); @@ -312,6 +312,15 @@ extern void slurm_print_burst_buffer_record(FILE *out, xstrcat(out_buf, "\n "); /****** Line 10 ******/ + snprintf(tmp_line, sizeof(tmp_line), + "StopStageIn=%s", burst_buffer_ptr->stop_stage_in); + xstrcat(out_buf, tmp_line); + if (one_liner) + xstrcat(out_buf, " "); + else + xstrcat(out_buf, "\n "); + + /****** Line 11 ******/ snprintf(tmp_line, sizeof(tmp_line), "StopStageIn=%s", burst_buffer_ptr->stop_stage_out); xstrcat(out_buf, tmp_line); diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index dc22d9d1596d8d82e8d87fd0cdbeae4212b337ac..a77c2199c30c3f842ace72053719b9077a3d5595 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -9312,14 +9312,13 @@ static int _unpack_burst_buffer_info_msg( i < bb_msg_ptr->record_count; i++, bb_info_ptr++) { safe_unpackstr_xmalloc(&bb_info_ptr->name, &uint32_tmp, buffer); safe_unpack32(&bb_info_ptr->record_count, buffer); - safe_unpack32(&bb_info_ptr->total_space, buffer); - safe_unpack32(&bb_info_ptr->used_space, buffer); safe_unpackstr_xmalloc(&bb_info_ptr->allow_users, &uint32_tmp, buffer); safe_unpackstr_xmalloc(&bb_info_ptr->deny_users, &uint32_tmp, buffer); safe_unpackstr_xmalloc(&bb_info_ptr->get_sys_state, &uint32_tmp, buffer); + safe_unpack16(&bb_info_ptr->private_data, buffer); safe_unpackstr_xmalloc(&bb_info_ptr->start_stage_in, &uint32_tmp, buffer); safe_unpackstr_xmalloc(&bb_info_ptr->start_stage_out, @@ -9333,6 +9332,8 @@ static int _unpack_burst_buffer_info_msg( safe_unpack32(&bb_info_ptr->prio_boost_use, buffer); safe_unpack32(&bb_info_ptr->stage_in_timeout, buffer); safe_unpack32(&bb_info_ptr->stage_out_timeout, buffer); + safe_unpack32(&bb_info_ptr->total_space, buffer); + safe_unpack32(&bb_info_ptr->used_space, buffer); safe_unpack32(&bb_info_ptr->user_size_limit, buffer); bb_info_ptr->burst_buffer_resv_ptr = xmalloc(sizeof(burst_buffer_resv_t) * diff --git a/src/plugins/burst_buffer/common/burst_buffer_common.c b/src/plugins/burst_buffer/common/burst_buffer_common.c index f9d0a79aefb99a2192a9dbdb2701e22acc202749..3ee07dc39f3cbacbfb5b8e889c148054dfaad731 100644 --- a/src/plugins/burst_buffer/common/burst_buffer_common.c +++ b/src/plugins/burst_buffer/common/burst_buffer_common.c @@ -1,5 +1,11 @@ /*****************************************************************************\ * burst_buffer_common.c - Common logic for managing burst_buffers + * + * NOTE: These functions are designed so they can be used by multiple burst + * buffer plugins at the same time (e.g. you might provide users access to + * both burst_buffer/cray and burst_buffer/generic on the same system), so + * the state information is largely in the individual plugin and passed as + * a pointer argument to these functions. ***************************************************************************** * Copyright (C) 2014 SchedMD LLC. * Written by Morris Jette <jette@schedmd.com> @@ -38,9 +44,11 @@ # include "config.h" #endif -#define _GNU_SOURCE /* For POLLRDHUP */ +#include <fcntl.h> #include <poll.h> #include <stdlib.h> +#include <sys/stat.h> +#include <sys/types.h> #include <unistd.h> #include "slurm/slurm.h" @@ -280,10 +288,11 @@ extern void bb_remove_user_load(bb_alloc_t *bb_ptr, bb_state_t *state_ptr) } /* Load and process configuration parameters */ -extern void bb_load_config(bb_state_t *state_ptr) +extern void bb_load_config(bb_state_t *state_ptr, char *type) { s_p_hashtbl_t *bb_hashtbl = NULL; char *bb_conf, *tmp = NULL, *value; + int fd; static s_p_options_t bb_options[] = { {"AllowUsers", S_P_STRING}, {"DenyUsers", S_P_STRING}, @@ -291,6 +300,7 @@ extern void bb_load_config(bb_state_t *state_ptr) {"JobSizeLimit", S_P_STRING}, {"PrioBoostAlloc", S_P_UINT32}, {"PrioBoostUse", S_P_UINT32}, + {"PrivateData", S_P_STRING}, {"StageInTimeout", S_P_UINT32}, {"StageOutTimeout", S_P_UINT32}, {"StartStageIn", S_P_STRING}, @@ -305,7 +315,25 @@ extern void bb_load_config(bb_state_t *state_ptr) if (slurm_get_debug_flags() & DEBUG_FLAG_BURST_BUF) state_ptr->bb_config.debug_flag = true; + /* First look for "burst_buffer.conf" then with "type" field, + * for example "burst_buffer_cray.conf" */ bb_conf = get_extra_conf_path("burst_buffer.conf"); + fd = open(bb_conf, 0); + if (fd >= 0) { + close(fd); + } else { + char *new_path = NULL; + xfree(bb_conf); + xstrfmtcat(new_path, "burst_buffer_%s.conf", type); + bb_conf = get_extra_conf_path(new_path); + fd = open(bb_conf, 0); + if (fd < 0) { + fatal("%s: Unable to find configuration file %s or " + "burst_buffer.conf", __func__, new_path); + } + xfree(new_path); + } + bb_hashtbl = s_p_hashtbl_create(bb_options); if (s_p_parse_file(bb_hashtbl, NULL, bb_conf, false) == SLURM_ERROR) { fatal("%s: something wrong with opening/reading %s: %m", @@ -341,6 +369,13 @@ extern void bb_load_config(bb_state_t *state_ptr) __func__, NICE_OFFSET); state_ptr->bb_config.prio_boost_use = NICE_OFFSET; } + if (s_p_get_string(&tmp, "PrivateData", bb_hashtbl)) { + if (!strcasecmp(tmp, "true") || + !strcasecmp(tmp, "yes") || + !strcasecmp(tmp, "1")) + state_ptr->bb_config.private_data = 1; + xfree(tmp); + } s_p_get_uint32(&state_ptr->bb_config.stage_in_timeout, "StageInTimeout", bb_hashtbl); s_p_get_uint32(&state_ptr->bb_config.stage_out_timeout, @@ -396,7 +431,7 @@ extern void bb_load_config(bb_state_t *state_ptr) } /* Pack individual burst buffer records into a buffer */ -extern int bb_pack_bufs(bb_alloc_t **bb_hash, Buf buffer, +extern int bb_pack_bufs(uid_t uid, bb_alloc_t **bb_hash, Buf buffer, uint16_t protocol_version) { int i, rec_count = 0; @@ -408,15 +443,17 @@ extern int bb_pack_bufs(bb_alloc_t **bb_hash, Buf buffer, for (i = 0; i < BB_HASH_SIZE; i++) { bb_next = bb_hash[i]; while (bb_next) { - pack32(bb_next->array_job_id, buffer); - pack32(bb_next->array_task_id, buffer); - pack32(bb_next->job_id, buffer); - packstr(bb_next->name, buffer); - pack32(bb_next->size, buffer); - pack16(bb_next->state, buffer); - pack_time(bb_next->state_time, buffer); - pack32(bb_next->user_id, buffer); - rec_count++; + if ((uid == 0) || (uid == bb_next->user_id)) { + pack32(bb_next->array_job_id, buffer); + pack32(bb_next->array_task_id, buffer); + pack32(bb_next->job_id, buffer); + packstr(bb_next->name, buffer); + pack32(bb_next->size, buffer); + pack16(bb_next->state, buffer); + pack_time(bb_next->state_time, buffer); + pack32(bb_next->user_id, buffer); + rec_count++; + } bb_next = bb_next->next; } } @@ -424,13 +461,16 @@ extern int bb_pack_bufs(bb_alloc_t **bb_hash, Buf buffer, return rec_count; } -/* Pack configuration parameters into a buffer */ -extern void bb_pack_config(bb_config_t *config_ptr, Buf buffer, - uint16_t protocol_version) +/* Pack state and configuration parameters into a buffer */ +extern void bb_pack_state(bb_state_t *state_ptr, Buf buffer, + uint16_t protocol_version) { + bb_config_t *config_ptr = &state_ptr->bb_config; + packstr(config_ptr->allow_users_str, buffer); packstr(config_ptr->deny_users_str, buffer); packstr(config_ptr->get_sys_state, buffer); + pack16(config_ptr->private_data, buffer); packstr(config_ptr->start_stage_in, buffer); packstr(config_ptr->start_stage_out, buffer); packstr(config_ptr->stop_stage_in, buffer); @@ -440,9 +480,12 @@ extern void bb_pack_config(bb_config_t *config_ptr, Buf buffer, pack32(config_ptr->prio_boost_use, buffer); pack32(config_ptr->stage_in_timeout, buffer); pack32(config_ptr->stage_out_timeout,buffer); + pack32(state_ptr->total_space, buffer); + pack32(state_ptr->used_space, buffer); pack32(config_ptr->user_size_limit, buffer); } + /* Translate a burst buffer size specification in string form to numeric form, * recognizing various sufficies (MB, GB, TB, PB, and Nodes). */ extern uint32_t bb_get_size_num(char *tok) @@ -501,3 +544,152 @@ extern int bb_preempt_queue_sort(void *x, void *y) return 1; return 0; }; + +/* For each burst buffer record, set the use_time to the time at which its + * use is expected to begin (i.e. each job's expected start time) */ +extern void bb_set_use_time(bb_state_t *state_ptr) +{ + struct job_record *job_ptr; + bb_alloc_t *bb_ptr = NULL; + time_t now = time(NULL); + int i; + + state_ptr->next_end_time = now + 60 * 60; /* Start estimate now+1hour */ + for (i = 0; i < BB_HASH_SIZE; i++) { + bb_ptr = state_ptr->bb_hash[i]; + while (bb_ptr) { + if (bb_ptr->job_id && + ((bb_ptr->state == BB_STATE_STAGING_IN) || + (bb_ptr->state == BB_STATE_STAGED_IN))) { + job_ptr = find_job_record(bb_ptr->job_id); + if (!job_ptr) { + error("%s: job %u with allocated burst " + "buffers not found", + __func__, bb_ptr->job_id); + bb_ptr->use_time = now + 24 * 60 * 60; + } else if (job_ptr->start_time) { + bb_ptr->end_time = job_ptr->end_time; + bb_ptr->use_time = job_ptr->start_time; + } else { + /* Unknown start time */ + bb_ptr->use_time = now + 60 * 60; + } + } else if (bb_ptr->job_id) { + job_ptr = find_job_record(bb_ptr->job_id); + if (job_ptr) + bb_ptr->end_time = job_ptr->end_time; + } else { + bb_ptr->use_time = now; + } + if (bb_ptr->end_time && bb_ptr->size) { + if (bb_ptr->end_time <= now) + state_ptr->next_end_time = now; + else if (state_ptr->next_end_time > + bb_ptr->end_time) { + state_ptr->next_end_time = + bb_ptr->end_time; + } + } + bb_ptr = bb_ptr->next; + } + } +} + +/* Sleep function, also handles termination signal */ +extern void bb_sleep(bb_state_t *state_ptr, int add_secs) +{ + struct timespec ts = {0, 0}; + struct timeval tv = {0, 0}; + + if (gettimeofday(&tv, NULL)) { /* Some error */ + sleep(1); + return; + } + + ts.tv_sec = tv.tv_sec + add_secs; + ts.tv_nsec = tv.tv_usec * 1000; + pthread_mutex_lock(&state_ptr->term_mutex); + if (!state_ptr->term_flag) { + pthread_cond_timedwait(&state_ptr->term_cond, + &state_ptr->term_mutex, &ts); + } + pthread_mutex_unlock(&state_ptr->term_mutex); +} + + +/* Allocate a named burst buffer record for a specific user. + * Return a pointer to that record. */ +extern bb_alloc_t *bb_alloc_name_rec(bb_state_t *state_ptr, char *name, + uint32_t user_id) +{ + bb_alloc_t *bb_ptr = NULL; + int i; + + xassert(state_ptr->bb_hash); + bb_ptr = xmalloc(sizeof(bb_alloc_t)); + i = user_id % BB_HASH_SIZE; + bb_ptr->next = state_ptr->bb_hash[i]; + state_ptr->bb_hash[i] = bb_ptr; + bb_ptr->name = xstrdup(name); + bb_ptr->state = BB_STATE_ALLOCATED; + bb_ptr->state_time = time(NULL); + bb_ptr->seen_time = time(NULL); + bb_ptr->user_id = user_id; + + return bb_ptr; +} + +/* Allocate a per-job burst buffer record for a specific job. + * Return a pointer to that record. */ +extern bb_alloc_t *bb_alloc_job_rec(bb_state_t *state_ptr, + struct job_record *job_ptr, + uint32_t bb_size) +{ + bb_alloc_t *bb_ptr = NULL; + int i; + + xassert(state_ptr->bb_hash); + xassert(job_ptr); + bb_ptr = xmalloc(sizeof(bb_alloc_t)); + bb_ptr->array_job_id = job_ptr->array_job_id; + bb_ptr->array_task_id = job_ptr->array_task_id; + bb_ptr->job_id = job_ptr->job_id; + i = job_ptr->user_id % BB_HASH_SIZE; + bb_ptr->next = state_ptr->bb_hash[i]; + state_ptr->bb_hash[i] = bb_ptr; + bb_ptr->size = bb_size; + bb_ptr->state = BB_STATE_ALLOCATED; + bb_ptr->state_time = time(NULL); + bb_ptr->seen_time = time(NULL); + bb_ptr->user_id = job_ptr->user_id; + + return bb_ptr; +} + +/* Allocate a burst buffer record for a job and increase the job priority + * if so configured. */ +extern bb_alloc_t *bb_alloc_job(bb_state_t *state_ptr, + struct job_record *job_ptr, uint32_t bb_size) +{ + bb_alloc_t *bb_ptr; + uint16_t new_nice; + + if (state_ptr->bb_config.prio_boost_use && job_ptr && job_ptr->details){ + new_nice = (NICE_OFFSET - state_ptr->bb_config.prio_boost_use); + if (new_nice < job_ptr->details->nice) { + int64_t new_prio = job_ptr->priority; + new_prio += job_ptr->details->nice; + new_prio -= new_nice; + job_ptr->priority = new_prio; + job_ptr->details->nice = new_nice; + info("%s: Uses burst buffer, reset priority to %u " + "for job_id %u", __func__, + job_ptr->priority, job_ptr->job_id); + } + } + + bb_ptr = bb_alloc_job_rec(state_ptr, job_ptr, bb_size); + bb_add_user_load(bb_ptr, state_ptr); + + return bb_ptr; +} diff --git a/src/plugins/burst_buffer/common/burst_buffer_common.h b/src/plugins/burst_buffer/common/burst_buffer_common.h index 13eef68057b4c6c3005bf48e98ad5dadcd3c94ac..0c298b1066a340de29145e9066fa5f669baf8a03 100644 --- a/src/plugins/burst_buffer/common/burst_buffer_common.h +++ b/src/plugins/burst_buffer/common/burst_buffer_common.h @@ -1,5 +1,11 @@ /*****************************************************************************\ * burst_buffer_common.h - Common header for managing burst_buffers + * + * NOTE: These functions are designed so they can be used by multiple burst + * buffer plugins at the same time (e.g. you might provide users access to + * both burst_buffer/cray and burst_buffer/generic on the same system), so + * the state information is largely in the individual plugin and passed as + * a pointer argument to these functions. ***************************************************************************** * Copyright (C) 2014 SchedMD LLC. * Written by Morris Jette <jette@schedmd.com> @@ -57,6 +63,7 @@ typedef struct bb_config { uint32_t job_size_limit; uint32_t prio_boost_alloc; uint32_t prio_boost_use; + uint16_t private_data; uint32_t stage_in_timeout; uint32_t stage_out_timeout; char *start_stage_in; @@ -122,6 +129,22 @@ extern void bb_add_user_load(bb_alloc_t *bb_ptr, bb_state_t *state_ptr); /* Allocate burst buffer hash tables */ extern void bb_alloc_cache(bb_state_t *state_ptr); +/* Allocate a per-job burst buffer record for a specific job. + * Return a pointer to that record. */ +extern bb_alloc_t *bb_alloc_job_rec(bb_state_t *state_ptr, + struct job_record *job_ptr, + uint32_t bb_size); + +/* Allocate a burst buffer record for a job and increase the job priority + * if so configured. */ +extern bb_alloc_t *bb_alloc_job(bb_state_t *state_ptr, + struct job_record *job_ptr, uint32_t bb_size); + +/* Allocate a named burst buffer record for a specific user. + * Return a pointer to that record. */ +extern bb_alloc_t *bb_alloc_name_rec(bb_state_t *state_ptr, char *name, + uint32_t user_id); + /* Clear all cached burst buffer records, freeing all memory. */ extern void bb_clear_cache(bb_state_t *state_ptr); @@ -146,15 +169,15 @@ extern void bb_job_queue_del(void *x); extern int bb_job_queue_sort(void *x, void *y); /* Load and process configuration parameters */ -extern void bb_load_config(bb_state_t *state_ptr); +extern void bb_load_config(bb_state_t *state_ptr, char *type); /* Pack individual burst buffer records into a buffer */ -extern int bb_pack_bufs(bb_alloc_t **bb_hash, Buf buffer, +extern int bb_pack_bufs(uid_t uid, bb_alloc_t **bb_hash, Buf buffer, uint16_t protocol_version); -/* Pack configuration parameters into a buffer */ -extern void bb_pack_config(bb_config_t *config_ptr, Buf buffer, - uint16_t protocol_version); +/* Pack state and configuration parameters into a buffer */ +extern void bb_pack_state(bb_state_t *state_ptr, Buf buffer, + uint16_t protocol_version); /* Sort preempt_bb_recs in order of DECREASING use_time */ extern int bb_preempt_queue_sort(void *x, void *y); @@ -162,4 +185,11 @@ extern int bb_preempt_queue_sort(void *x, void *y); /* Remove a burst buffer allocation from a user's load */ extern void bb_remove_user_load(bb_alloc_t *bb_ptr, bb_state_t *state_ptr); +/* For each burst buffer record, set the use_time to the time at which its + * use is expected to begin (i.e. each job's expected start time) */ +extern void bb_set_use_time(bb_state_t *state_ptr); + +/* Sleep function, also handles termination signal */ +extern void bb_sleep(bb_state_t *state_ptr, int add_secs); + #endif /* __BURST_BUFFER_COMMON_H__ */ diff --git a/src/plugins/burst_buffer/cray/burst_buffer_cray.c b/src/plugins/burst_buffer/cray/burst_buffer_cray.c index 3e5abc8fecbc4eb25c9b8ec183f1543207c56ff6..fd85adbc01a157231f8f07f7ecb8d346ca0684dd 100644 --- a/src/plugins/burst_buffer/cray/burst_buffer_cray.c +++ b/src/plugins/burst_buffer/cray/burst_buffer_cray.c @@ -93,13 +93,26 @@ const uint32_t plugin_version = 100; * easily use common functions from multiple burst buffer plugins */ static bb_state_t bb_state; +static void _alloc_job_bb(struct job_record *job_ptr, uint32_t bb_size); static void * _bb_agent(void *args); static uint32_t _get_bb_size(struct job_record *job_ptr); static void _load_state(uint32_t job_id); -static void _my_sleep(int add_secs); +static void _start_stage_in(uint32_t job_id); +static void _start_stage_out(uint32_t job_id); +static void _stop_stage_in(uint32_t job_id); +static void _stop_stage_out(uint32_t job_id); static int _test_size_limit(struct job_record *job_ptr,uint32_t add_space); static void _timeout_bb_rec(void); +static void _alloc_job_bb(struct job_record *job_ptr, uint32_t bb_size) +{ + (void) bb_alloc_job(&bb_state, job_ptr, bb_size); + + if (bb_state.bb_config.debug_flag) + info("%s: start stage-in job_id:%u", __func__, job_ptr->job_id); + _start_stage_in(job_ptr->job_id); +} + /* Perform periodic background activities */ static void *_bb_agent(void *args) { @@ -108,7 +121,7 @@ static void *_bb_agent(void *args) NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK }; while (!bb_state.term_flag) { - _my_sleep(AGENT_INTERVAL); + bb_sleep(&bb_state, AGENT_INTERVAL); if (bb_state.term_flag) break; lock_slurmctld(job_write_lock); @@ -167,27 +180,6 @@ static void _stop_stage_out(uint32_t job_id) // FIXME: Need Cray interface here... } -/* Local sleep function, handles termination signal */ -static void _my_sleep(int add_secs) -{ - struct timespec ts = {0, 0}; - struct timeval tv = {0, 0}; - - if (gettimeofday(&tv, NULL)) { /* Some error */ - sleep(1); - return; - } - - ts.tv_sec = tv.tv_sec + add_secs; - ts.tv_nsec = tv.tv_usec * 1000; - pthread_mutex_lock(&bb_state.term_mutex); - if (!bb_state.term_flag) { - pthread_cond_timedwait(&bb_state.term_cond, - &bb_state.term_mutex, &ts); - } - pthread_mutex_unlock(&bb_state.term_mutex); -} - /* Test if a job can be allocated a burst buffer. * This may preempt currently active stage-in for higher priority jobs. * @@ -225,7 +217,8 @@ static int _test_size_limit(struct job_record *job_ptr, uint32_t add_space) add_user_space_needed = tmp_u + tmp_j - lim_u; } - add_total_space_needed = bb_state.used_space + add_space - bb_state.total_space; + add_total_space_needed = bb_state.used_space + add_space - + bb_state.total_space; if ((add_total_space_needed <= 0) && (add_user_space_needed <= 0)) return 0; @@ -392,7 +385,7 @@ extern int init(void) pthread_attr_t attr; pthread_mutex_lock(&bb_state.bb_mutex); - bb_load_config(&bb_state); + bb_load_config(&bb_state, "cray"); if (bb_state.bb_config.debug_flag) info("%s: %s", plugin_type, __func__); bb_alloc_cache(&bb_state); @@ -459,7 +452,7 @@ extern int bb_p_reconfig(void) pthread_mutex_lock(&bb_state.bb_mutex); if (bb_state.bb_config.debug_flag) info("%s: %s", plugin_type, __func__); - bb_load_config(&bb_state); + bb_load_config(&bb_state, "cray"); pthread_mutex_unlock(&bb_state.bb_mutex); return SLURM_SUCCESS; @@ -471,7 +464,7 @@ extern int bb_p_reconfig(void) * * Returns a SLURM errno. */ -extern int bb_p_state_pack(Buf buffer, uint16_t protocol_version) +extern int bb_p_state_pack(uid_t uid, Buf buffer, uint16_t protocol_version) { uint32_t rec_count = 0; int eof, offset; @@ -482,11 +475,10 @@ extern int bb_p_state_pack(Buf buffer, uint16_t protocol_version) packstr((char *)plugin_type, buffer); /* Remove "const" qualifier */ offset = get_buf_offset(buffer); pack32(rec_count, buffer); - pack32(bb_state.total_space, buffer); - pack32(bb_state.used_space, buffer); - bb_pack_config(&bb_state.bb_config, buffer, protocol_version); -//FIXME: Add private data option - rec_count = bb_pack_bufs(bb_state.bb_hash, buffer, protocol_version); + bb_pack_state(&bb_state, buffer, protocol_version); + if (bb_state.bb_config.private_data == 0) + uid = 0; /* User can see all data */ + rec_count = bb_pack_bufs(uid, bb_state.bb_hash,buffer,protocol_version); if (rec_count != 0) { eof = get_buf_offset(buffer); set_buf_offset(buffer, offset); @@ -519,7 +511,7 @@ extern int bb_p_job_validate(struct job_descriptor *job_desc, info("%s: script:%s", __func__, job_desc->script); } -//FIXME: Add function callout to set job_desc->burst_buffer based upon job_desc->script +//FIXME: Add Cray API callout to set job_desc->burst_buffer based upon job_desc->script if (job_desc->burst_buffer) { key = strstr(job_desc->burst_buffer, "size="); if (key) @@ -652,7 +644,7 @@ extern int bb_p_job_try_stage_in(List job_queue) list_sort(job_candidates, bb_job_queue_sort); pthread_mutex_lock(&bb_state.bb_mutex); -//FIXME _set_bb_use_time(bb_state.bb_hash); + bb_set_use_time(&bb_state); job_iter = list_iterator_create(job_candidates); while ((job_rec = list_next(job_iter))) { job_ptr = job_rec->job_ptr; @@ -666,8 +658,8 @@ extern int bb_p_job_try_stage_in(List job_queue) continue; else if (rc == 2) break; -_start_stage_in(job_ptr->job_id); -//FIXME _alloc_job_bb(job_ptr, bb_size); + + _alloc_job_bb(job_ptr, bb_size); } list_iterator_destroy(job_iter); pthread_mutex_unlock(&bb_state.bb_mutex); @@ -708,8 +700,7 @@ extern int bb_p_job_test_stage_in(struct job_record *job_ptr, bool test_only) rc = -1; if ((test_only == false) && (_test_size_limit(job_ptr, bb_size) == 0)) -_start_stage_in(job_ptr->job_id); -//FIXME _alloc_job_bb(job_ptr, bb_size); + _alloc_job_bb(job_ptr, bb_size); } else { if (bb_ptr->state < BB_STATE_STAGED_IN) _load_state(job_ptr->job_id); diff --git a/src/plugins/burst_buffer/generic/burst_buffer_generic.c b/src/plugins/burst_buffer/generic/burst_buffer_generic.c index 8e965529c48f9d5d57069891736e160ac970c35f..9272588e19401a281ee04119cd5acf54f2ab766d 100644 --- a/src/plugins/burst_buffer/generic/burst_buffer_generic.c +++ b/src/plugins/burst_buffer/generic/burst_buffer_generic.c @@ -94,9 +94,6 @@ const uint32_t plugin_version = 100; static bb_state_t bb_state; /* Local function defintions */ -static bb_alloc_t *_alloc_bb_job_rec(struct job_record *job_ptr, - uint32_t bb_size); -static bb_alloc_t *_alloc_bb_name_rec(char *name, uint32_t user_id); static void _alloc_job_bb(struct job_record *job_ptr, uint32_t bb_size); static void * _bb_agent(void *args); static char ** _build_stage_args(char *cmd, char *opt, @@ -105,13 +102,11 @@ static void _destroy_job_info(void *data); static bb_alloc_t *_find_bb_name_rec(char *name, uint32_t user_id); static uint32_t _get_bb_size(struct job_record *job_ptr); static void _load_state(uint32_t job_id); -static void _my_sleep(int usec); static int _parse_job_info(void **dest, slurm_parser_enum_t type, const char *key, const char *value, const char *line, char **leftover); static char * _run_script(char *script_type, char *script_path, char **script_argv, int max_wait); -static void _set_bb_use_time(void); static void _stop_stage_in(uint32_t job_id); static void _stop_stage_out(uint32_t job_id); static void _test_config(void); @@ -148,53 +143,6 @@ static uint32_t _get_bb_size(struct job_record *job_ptr) return bb_size_u; } -/* Allocate a per-job burst buffer record for a specific job. - * Return a pointer to that record. */ -static bb_alloc_t *_alloc_bb_job_rec(struct job_record *job_ptr, - uint32_t bb_size) -{ - bb_alloc_t *bb_ptr = NULL; - int i; - - xassert(bb_state.bb_hash); - xassert(job_ptr); - bb_ptr = xmalloc(sizeof(bb_alloc_t)); - bb_ptr->array_job_id = job_ptr->array_job_id; - bb_ptr->array_task_id = job_ptr->array_task_id; - bb_ptr->job_id = job_ptr->job_id; - i = job_ptr->user_id % BB_HASH_SIZE; - bb_ptr->next = bb_state.bb_hash[i]; - bb_state.bb_hash[i] = bb_ptr; - bb_ptr->size = bb_size; - bb_ptr->state = BB_STATE_ALLOCATED; - bb_ptr->state_time = time(NULL); - bb_ptr->seen_time = time(NULL); - bb_ptr->user_id = job_ptr->user_id; - - return bb_ptr; -} - -/* Allocate a named burst buffer record for a specific user. - * Return a pointer to that record. */ -static bb_alloc_t *_alloc_bb_name_rec(char *name, uint32_t user_id) -{ - bb_alloc_t *bb_ptr = NULL; - int i; - - xassert(bb_state.bb_hash); - bb_ptr = xmalloc(sizeof(bb_alloc_t)); - i = user_id % BB_HASH_SIZE; - bb_ptr->next = bb_state.bb_hash[i]; - bb_state.bb_hash[i] = bb_ptr; - bb_ptr->name = xstrdup(name); - bb_ptr->state = BB_STATE_ALLOCATED; - bb_ptr->state_time = time(NULL); - bb_ptr->seen_time = time(NULL); - bb_ptr->user_id = user_id; - - return bb_ptr; -} - static char **_build_stage_args(char *cmd, char *opt, struct job_record *job_ptr, uint32_t bb_size) { @@ -406,55 +354,6 @@ static void _timeout_bb_rec(void) } } -/* For each burst buffer record, set the use_time to the time at which its - * use is expected to begin (i.e. the job's expected start time) */ -static void _set_bb_use_time(void) -{ - struct job_record *job_ptr; - bb_alloc_t *bb_ptr = NULL; - time_t now = time(NULL); - int i; - - bb_state.next_end_time = now + 60 * 60; /* Start estimate one hour in future */ - for (i = 0; i < BB_HASH_SIZE; i++) { - bb_ptr = bb_state.bb_hash[i]; - while (bb_ptr) { - if (bb_ptr->job_id && - ((bb_ptr->state == BB_STATE_STAGING_IN) || - (bb_ptr->state == BB_STATE_STAGED_IN))) { - job_ptr = find_job_record(bb_ptr->job_id); - if (!job_ptr) { - error("%s: job %u with allocated burst " - "buffers not found", - __func__, bb_ptr->job_id); - _stop_stage_out(bb_ptr->job_id); - bb_ptr->cancelled = true; - bb_ptr->end_time = 0; - } else if (job_ptr->start_time) { - bb_ptr->end_time = job_ptr->end_time; - bb_ptr->use_time = job_ptr->start_time; - } else { - /* Unknown start time */ - bb_ptr->use_time = now + 60 + 60; - } - } else if (bb_ptr->job_id) { - job_ptr = find_job_record(bb_ptr->job_id); - if (job_ptr) - bb_ptr->end_time = job_ptr->end_time; - } else { - bb_ptr->use_time = now; - } - if (bb_ptr->end_time && bb_ptr->size) { - if (bb_ptr->end_time <= now) - bb_state.next_end_time = now; - else if (bb_state.next_end_time > bb_ptr->end_time) - bb_state.next_end_time = bb_ptr->end_time; - } - bb_ptr = bb_ptr->next; - } - } -} - /* Test if a job can be allocated a burst buffer. * This may preempt currently active stage-in for higher priority jobs. * @@ -493,7 +392,8 @@ static int _test_size_limit(struct job_record *job_ptr, uint32_t add_space) add_user_space_needed = tmp_u + tmp_j - lim_u; } - add_total_space_needed = bb_state.used_space + add_space - bb_state.total_space; + add_total_space_needed = bb_state.used_space + add_space - + bb_state.total_space; if ((add_total_space_needed <= 0) && (add_user_space_needed <= 0)) return 0; @@ -718,16 +618,38 @@ static int _parse_job_info(void **dest, slurm_parser_enum_t type, if (job_id) { job_ptr = find_job_record(job_id); if (!job_ptr && (state == BB_STATE_STAGED_OUT)) { - error("%s: Vestigial buffer for completed job %u purged", - plugin_type, job_id); + struct job_record job_rec; + job_rec.job_id = job_id; + job_rec.user_id = user_id; + bb_ptr = bb_find_job_rec(&job_rec, bb_state.bb_hash); _stop_stage_out(job_id); /* Purge buffer */ + if (bb_ptr) { + bb_ptr->cancelled = true; + bb_ptr->end_time = 0; + } else { + /* Slurm knows nothing about this job, + * may be result of slurmctld cold start */ + error("%s: Vestigial buffer for purged job %u", + plugin_type, job_id); + } return SLURM_SUCCESS; } else if (!job_ptr && ((state == BB_STATE_STAGING_IN) || (state == BB_STATE_STAGED_IN))) { - error("%s: Vestigial buffer for unstarted job %u purged", - plugin_type, job_id); + struct job_record job_rec; + job_rec.job_id = job_id; + job_rec.user_id = user_id; + bb_ptr = bb_find_job_rec(&job_rec, bb_state.bb_hash); _stop_stage_in(job_id); /* Purge buffer */ + if (bb_ptr) { + bb_ptr->cancelled = true; + bb_ptr->end_time = 0; + } else { + /* Slurm knows nothing about this job, + * may be result of slurmctld cold start */ + error("%s: Vestigial buffer for purged job %u", + plugin_type, job_id); + } return SLURM_SUCCESS; } else if (!job_ptr) { error("%s: Vestigial buffer for job ID %u. " @@ -739,15 +661,16 @@ static int _parse_job_info(void **dest, slurm_parser_enum_t type, name = tmp_name; } if (job_ptr) { - if ((bb_ptr = bb_find_job_rec(job_ptr, bb_state.bb_hash)) == NULL) { - bb_ptr = _alloc_bb_job_rec(job_ptr, - _get_bb_size(job_ptr)); + bb_ptr = bb_find_job_rec(job_ptr, bb_state.bb_hash); + if (bb_ptr == NULL) { + bb_ptr = bb_alloc_job_rec(&bb_state, job_ptr, + _get_bb_size(job_ptr)); bb_ptr->state = state; - /* bb_ptr->state_time set in _alloc_bb_job_rec() */ + /* bb_ptr->state_time set in bb_alloc_job_rec() */ } } else { if ((bb_ptr = _find_bb_name_rec(name, user_id)) == NULL) { - bb_ptr = _alloc_bb_name_rec(name, user_id); + bb_ptr = bb_alloc_name_rec(&bb_state, name, user_id); bb_ptr->size = size; bb_ptr->state = state; bb_add_user_load(bb_ptr, &bb_state); @@ -852,8 +775,8 @@ static void _load_state(uint32_t job_id) script_args[2] = NULL; } START_TIMER; - resp = _run_script("GetSysState", bb_state.bb_config.get_sys_state, script_args, - 10); + resp = _run_script("GetSysState", bb_state.bb_config.get_sys_state, + script_args, 10); if (resp == NULL) return; END_TIMER; @@ -871,8 +794,11 @@ static void _load_state(uint32_t job_id) if (s_p_get_string(&tmp, "TotalSize", state_hashtbl)) { bb_state.total_space = bb_get_size_num(tmp); xfree(tmp); - if (bb_state.bb_config.debug_flag && (bb_state.total_space != last_total_space)) - info("%s: total_space:%u", __func__, bb_state.total_space); + if (bb_state.bb_config.debug_flag && + (bb_state.total_space != last_total_space)) { + info("%s: total_space:%u", __func__, + bb_state.total_space); + } last_total_space = bb_state.total_space; } else if (job_id == 0) { error("%s: GetSysState failed to respond with TotalSize", @@ -881,27 +807,6 @@ static void _load_state(uint32_t job_id) s_p_hashtbl_destroy(state_hashtbl); } -/* Local sleep function, handles termination signal */ -static void _my_sleep(int add_secs) -{ - struct timespec ts = {0, 0}; - struct timeval tv = {0, 0}; - - if (gettimeofday(&tv, NULL)) { /* Some error */ - sleep(1); - return; - } - - ts.tv_sec = tv.tv_sec + add_secs; - ts.tv_nsec = tv.tv_usec * 1000; - pthread_mutex_lock(&bb_state.term_mutex); - if (!bb_state.term_flag) { - pthread_cond_timedwait(&bb_state.term_cond, - &bb_state.term_mutex, &ts); - } - pthread_mutex_unlock(&bb_state.term_mutex); -} - /* Perform periodic background activities */ static void *_bb_agent(void *args) { @@ -910,7 +815,7 @@ static void *_bb_agent(void *args) NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK }; while (!bb_state.term_flag) { - _my_sleep(AGENT_INTERVAL); + bb_sleep(&bb_state, AGENT_INTERVAL); if (bb_state.term_flag) break; lock_slurmctld(job_write_lock); @@ -936,7 +841,7 @@ extern int init(void) pthread_mutex_init(&bb_state.term_mutex, NULL); pthread_mutex_lock(&bb_state.bb_mutex); - bb_load_config(&bb_state); + bb_load_config(&bb_state, "generic"); _test_config(); if (bb_state.bb_config.debug_flag) info("%s: %s", plugin_type, __func__); @@ -1004,7 +909,7 @@ extern int bb_p_reconfig(void) pthread_mutex_lock(&bb_state.bb_mutex); if (bb_state.bb_config.debug_flag) info("%s: %s", plugin_type, __func__); - bb_load_config(&bb_state); + bb_load_config(&bb_state, "generic"); _test_config(); pthread_mutex_unlock(&bb_state.bb_mutex); @@ -1017,7 +922,7 @@ extern int bb_p_reconfig(void) * * Returns a SLURM errno. */ -extern int bb_p_state_pack(Buf buffer, uint16_t protocol_version) +extern int bb_p_state_pack(uid_t uid, Buf buffer, uint16_t protocol_version) { uint32_t rec_count = 0; int eof, offset; @@ -1028,10 +933,10 @@ extern int bb_p_state_pack(Buf buffer, uint16_t protocol_version) packstr((char *)plugin_type, buffer); /* Remove "const" qualifier */ offset = get_buf_offset(buffer); pack32(rec_count, buffer); - pack32(bb_state.total_space, buffer); - pack32(bb_state.used_space, buffer); - bb_pack_config(&bb_state.bb_config, buffer, protocol_version); - rec_count = bb_pack_bufs(bb_state.bb_hash, buffer, protocol_version); + bb_pack_state(&bb_state, buffer, protocol_version); + if (bb_state.bb_config.private_data == 0) + uid = 0; /* User can see all data */ + rec_count = bb_pack_bufs(uid, bb_state.bb_hash,buffer,protocol_version); if (rec_count != 0) { eof = get_buf_offset(buffer); set_buf_offset(buffer, offset); @@ -1162,22 +1067,7 @@ static void _alloc_job_bb(struct job_record *job_ptr, uint32_t bb_size) bb_alloc_t *bb_ptr; int i; - if (bb_state.bb_config.prio_boost_use && job_ptr && job_ptr->details) { - uint16_t new_nice = (NICE_OFFSET - bb_state.bb_config.prio_boost_use); - if (new_nice < job_ptr->details->nice) { - int64_t new_prio = job_ptr->priority; - new_prio += job_ptr->details->nice; - new_prio -= new_nice; - job_ptr->priority = new_prio; - job_ptr->details->nice = new_nice; - info("%s: Uses burst buffer, reset priority to %u " - "for job_id %u", __func__, - job_ptr->priority, job_ptr->job_id); - } - } - - bb_ptr = _alloc_bb_job_rec(job_ptr, bb_size); - bb_add_user_load(bb_ptr, &bb_state); + bb_ptr = bb_alloc_job(&bb_state, job_ptr, bb_size); if (bb_state.bb_config.debug_flag) info("%s: start stage-in job_id:%u", __func__, job_ptr->job_id); @@ -1186,7 +1076,8 @@ static void _alloc_job_bb(struct job_record *job_ptr, uint32_t bb_size) if (script_argv) { bb_ptr->state = BB_STATE_STAGING_IN; bb_ptr->state_time = time(NULL); - resp = _run_script("StartStageIn", bb_state.bb_config.start_stage_in, + resp = _run_script("StartStageIn", + bb_state.bb_config.start_stage_in, script_argv, -1); if (resp) { error("%s: StartStageIn: %s", __func__, resp); @@ -1244,7 +1135,7 @@ extern int bb_p_job_try_stage_in(List job_queue) list_sort(job_candidates, bb_job_queue_sort); pthread_mutex_lock(&bb_state.bb_mutex); - _set_bb_use_time(); + bb_set_use_time(&bb_state); job_iter = list_iterator_create(job_candidates); while ((job_rec = list_next(job_iter))) { job_ptr = job_rec->job_ptr; diff --git a/src/slurmctld/burst_buffer.c b/src/slurmctld/burst_buffer.c index 7a5a04b68d8a583408fe001d92096c091a8bdc7e..714e79347ca9a3f4ff8fb1236f32e041953e9f3b 100644 --- a/src/slurmctld/burst_buffer.c +++ b/src/slurmctld/burst_buffer.c @@ -76,7 +76,8 @@ typedef struct slurm_bb_ops { int (*load_state) (bool init_config); - int (*state_pack) (Buf buffer, uint16_t protocol_version); + int (*state_pack) (uid_t uid, Buf buffer, + uint16_t protocol_version); int (*reconfig) (void); int (*job_validate) (struct job_descriptor *job_desc, uid_t submit_uid); @@ -239,7 +240,7 @@ extern int bb_g_load_state(bool init_config) * * Returns a SLURM errno. */ -extern int bb_g_state_pack(Buf buffer, uint16_t protocol_version) +extern int bb_g_state_pack(uid_t uid, Buf buffer, uint16_t protocol_version) { DEF_TIMERS; int i, rc, rc2; @@ -253,7 +254,7 @@ extern int bb_g_state_pack(Buf buffer, uint16_t protocol_version) slurm_mutex_lock(&g_context_lock); for (i = 0; i < g_context_cnt; i++) { last_offset = get_buf_offset(buffer); - rc2 = (*(ops[i].state_pack))(buffer, protocol_version); + rc2 = (*(ops[i].state_pack))(uid, buffer, protocol_version); if (last_offset != get_buf_offset(buffer)) rec_count++; rc = MAX(rc, rc2); diff --git a/src/slurmctld/burst_buffer.h b/src/slurmctld/burst_buffer.h index 90c00e98680bbc351ddca5f1ac1d245eb7c9061a..1546eef02f88c116d68d4d3ddf614e5ce2ba991d 100644 --- a/src/slurmctld/burst_buffer.h +++ b/src/slurmctld/burst_buffer.h @@ -78,7 +78,7 @@ extern int bb_g_load_state(bool init_config); * * Returns a SLURM errno. */ -extern int bb_g_state_pack(Buf buffer, uint16_t protocol_version); +extern int bb_g_state_pack(uid_t uid, Buf buffer, uint16_t protocol_version); /* * Note configuration may have changed. Handle changes in BurstBufferParameters. diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 01c56f0fad74e2b11d31a0ef030dd4eae65df0b9..d8e73b2191b392a2eb6d367700fdee2a23c5baed 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -3934,7 +3934,9 @@ static void _slurm_rpc_burst_buffer_info(slurm_msg_t * msg) debug2("Processing RPC: REQUEST_BURST_BUFFER_INFO from uid=%d", uid); buffer = init_buf(BUF_SIZE); - error_code = bb_g_state_pack(buffer, msg->protocol_version); + if (validate_super_user(uid)) + uid = 0; + error_code = bb_g_state_pack(uid, buffer, msg->protocol_version); END_TIMER2(__func__); if (error_code) {