diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 1d2ec4c447844c61078359ddccad18afb011219a..c87c89cd1f69f14118919757076ce29b5787ea4c 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -100,8 +100,10 @@ typedef struct task_info { void *msg_args_ptr; /* pointer to RPC data to be used */ } task_info_t; +static void alarm_handler(int dummy); static void *thread_per_node_rpc (void *args); static void *wdog (void *args); +static void xsignal(int signal, void (*handler)(int)); /* * agent - party responsible for transmitting an common RPC in parallel across a set @@ -312,6 +314,15 @@ wdog (void *args) xfree (slurm_addr); #endif } +#if AGENT_IS_THREAD + /* Update last_response on responding nodes */ + lock_slurmctld (node_write_lock); + for (i = 0; i < agent_ptr->thread_count; i++) { + if (thread_ptr[i].state == DSH_DONE) + node_did_resp (thread_ptr[i].node_name); + } + unlock_slurmctld (node_write_lock); +#endif if (max_delay) debug ("agent maximum delay %d seconds", max_delay); @@ -324,7 +335,6 @@ wdog (void *args) static void * thread_per_node_rpc (void *args) { - sigset_t set; int msg_size ; int rc ; slurm_fd sockfd ; @@ -334,14 +344,16 @@ thread_per_node_rpc (void *args) task_info_t *task_ptr = (task_info_t *) args; thd_t *thread_ptr = task_ptr->thread_struct_ptr; state_t thread_state = DSH_FAILED; + sigset_t set; - /* accept SIGALRM */ + /* set up SIGALRM handler */ if (sigemptyset (&set)) error ("sigemptyset error: %m"); if (sigaddset (&set, SIGALRM)) error ("sigaddset error on SIGALRM: %m"); if (sigprocmask (SIG_UNBLOCK, &set, NULL) != 0) - error ("sigprocmask error: %m"); + fatal ("sigprocmask error: %m"); + xsignal(SIGALRM, alarm_handler); if (args == NULL) fatal ("thread_per_node_rpc has NULL argument"); @@ -414,3 +426,27 @@ cleanup: return (void *) NULL; } +/* + * Emulate signal() but with BSD semantics (i.e. don't restore signal to + * SIGDFL prior to executing handler). + */ +static void xsignal(int signal, void (*handler)(int)) +{ + struct sigaction sa, old_sa; + + sa.sa_handler = handler; + sigemptyset(&sa.sa_mask); + sigaddset(&sa.sa_mask, signal); + sa.sa_flags = 0; + sigaction(signal, &sa, &old_sa); +} + +/* + * SIGALRM handler. This is just a stub because we are really interested + * in interrupting connect() in k4cmd/rcmd or select() in rsh() below and + * causing them to return EINTR. + */ +static void alarm_handler(int dummy) +{ +} + diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 2287684302425d448687122af4ee7f50e6afd510..c1ccaea64408d9233ea0f0d569ecfbb845bbab8a 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -118,6 +118,7 @@ main (int argc, char *argv[]) int error_code; char node_name[MAX_NAME_LEN]; pthread_attr_t thread_attr_sig, thread_attr_rpc; + sigset_t set; /* * Establish initial configuration @@ -155,7 +156,15 @@ main (int argc, char *argv[]) /* init ssl job credential stuff */ slurm_ssl_init ( ) ; slurm_init_signer ( &sign_ctx, slurmctld_conf.job_credential_private_key ) ; - + + /* Block SIGALRM everyone not explicitly enabled */ + if (sigemptyset (&set)) + error ("sigemptyset error: %m"); + if (sigaddset (&set, SIGALRM)) + error ("sigaddset error on SIGALRM: %m"); + if (sigprocmask (SIG_BLOCK, &set, NULL) != 0) + fatal ("sigprocmask error: %m"); + /* * create attached thread signal handling */ @@ -481,7 +490,7 @@ report_locks_set ( void ) lock_count = strlen (config) + strlen (job) + strlen (node) + strlen (partition); if (lock_count > 0) - error ("The following locks were left set config:%s, job:%s, node:%s, part:%s", + error ("The following locks were left set config:%s, job:%s, node:%s, partition:%s", config, job, node, partition); return lock_count; } @@ -1559,12 +1568,12 @@ void slurm_rpc_node_registration ( slurm_msg_t * msg ) { /* init */ - int error_code = 0, i; + int error_code = 0; clock_t start_time; slurm_node_registration_status_msg_t * node_reg_stat_msg = ( slurm_node_registration_status_msg_t * ) msg-> data ; - /* Locks: Write node */ - slurmctld_lock_t node_write_lock = { NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK }; + /* Locks: Write job and node */ + slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; #ifdef HAVE_AUTHD uid_t uid = 0; #endif @@ -1580,16 +1589,17 @@ slurm_rpc_node_registration ( slurm_msg_t * msg ) #endif if (error_code == 0) { /* do RPC call */ - lock_slurmctld (node_write_lock); + lock_slurmctld (job_write_lock); error_code = validate_node_specs ( node_reg_stat_msg -> node_name , node_reg_stat_msg -> cpus , node_reg_stat_msg -> real_memory_size , node_reg_stat_msg -> temporary_disk_space ) ; - unlock_slurmctld (node_write_lock); - for (i=0; i<node_reg_stat_msg->job_count; i++) { - debug ("Register with job_id %u", node_reg_stat_msg->job_id[i]); - } + validate_jobs_on_node ( + node_reg_stat_msg -> node_name , + node_reg_stat_msg -> job_count , + node_reg_stat_msg -> job_id ) ; + unlock_slurmctld (job_write_lock); } /* return result */ diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c index 76020b6a1426a9ff23458a7610f026fcf1396b9b..8d9880de63e337d02325db821bf29a3790c06209 100644 --- a/src/slurmctld/job_mgr.c +++ b/src/slurmctld/job_mgr.c @@ -361,6 +361,7 @@ dump_job_details_state (struct job_details *detail_ptr, void **buf_ptr, int *buf pack32 ((uint32_t) detail_ptr->num_nodes, buf_ptr, buf_len); pack16 ((uint16_t) detail_ptr->shared, buf_ptr, buf_len); pack16 ((uint16_t) detail_ptr->contiguous, buf_ptr, buf_len); + pack16 ((uint16_t) detail_ptr->kill_on_node_fail, buf_ptr, buf_len); pack32 ((uint32_t) detail_ptr->min_procs, buf_ptr, buf_len); pack32 ((uint32_t) detail_ptr->min_memory, buf_ptr, buf_len); @@ -463,7 +464,7 @@ load_job_state ( void ) uint16_t job_state, next_step_id, details; char *nodes = NULL, *partition = NULL, *name = NULL; uint32_t num_procs, num_nodes, min_procs, min_memory, min_tmp_disk, submit_time; - uint16_t shared, contiguous, name_len; + uint16_t shared, contiguous, kill_on_node_fail, name_len; char *req_nodes = NULL, *features = NULL; char *stderr = NULL, *stdin = NULL, *stdout = NULL, *work_dir = NULL; slurm_job_credential_t *credential_ptr = NULL; @@ -535,6 +536,7 @@ load_job_state ( void ) safe_unpack32 (&num_nodes, &buf_ptr, &buffer_size); safe_unpack16 (&shared, &buf_ptr, &buffer_size); safe_unpack16 (&contiguous, &buf_ptr, &buffer_size); + safe_unpack16 (&kill_on_node_fail, &buf_ptr, &buffer_size); safe_unpack32 (&min_procs, &buf_ptr, &buffer_size); safe_unpack32 (&min_memory, &buf_ptr, &buffer_size); @@ -612,6 +614,8 @@ load_job_state ( void ) job_ptr->details->num_nodes = num_nodes; job_ptr->details->shared = shared; job_ptr->details->contiguous = contiguous; + job_ptr->details->kill_on_node_fail = kill_on_node_fail; + job_ptr->details->kill_on_node_fail = 1; job_ptr->details->min_procs = min_procs; job_ptr->details->min_memory = min_memory; job_ptr->details->min_tmp_disk = min_tmp_disk; @@ -757,13 +761,86 @@ find_job_record(uint32_t job_id) return NULL; } +/* find_running_job_by_node_name - Given a node name, return a pointer to any + * job currently running on that node */ +struct job_record * +find_running_job_by_node_name (char *node_name) +{ + ListIterator job_record_iterator; + struct job_record *job_record_point; + struct node_record *node_record_point; + int bit_position; + + node_record_point = find_node_record (node_name); + if (node_record_point == NULL) /* No such node */ + return NULL; + bit_position = node_record_point - node_record_table_ptr; + + job_record_iterator = list_iterator_create (job_list); + while ((job_record_point = (struct job_record *) list_next (job_record_iterator))) { + if ( (job_record_point->job_state != JOB_STAGE_IN) && + (job_record_point->job_state != JOB_RUNNING) && + (job_record_point->job_state != JOB_STAGE_OUT) ) + continue; /* job not active */ + if (bit_test (job_record_point->node_bitmap, bit_position)) + break; /* found job here */ + } + list_iterator_destroy (job_record_iterator); + + return job_record_point; +} + +/* kill_running_job_by_node_name - Given a node name, deallocate that job + * from the node or kill it */ +int +kill_running_job_by_node_name (char *node_name) +{ + ListIterator job_record_iterator; + struct job_record *job_record_point; + struct node_record *node_record_point; + int bit_position; + int job_count = 1; + + node_record_point = find_node_record (node_name); + if (node_record_point == NULL) /* No such node */ + return 0; + bit_position = node_record_point - node_record_table_ptr; + + job_record_iterator = list_iterator_create (job_list); + while ((job_record_point = (struct job_record *) list_next (job_record_iterator))) { + if ( (job_record_point->job_state != JOB_STAGE_IN) && + (job_record_point->job_state != JOB_RUNNING) && + (job_record_point->job_state != JOB_STAGE_OUT) ) + continue; /* job not active */ + if (bit_test (job_record_point->node_bitmap, bit_position) == 0) + continue; /* job not on this node */ + + error ("Running job_id %u vanished from node %s", + job_record_point->job_id, node_name); + job_count++; + if ( (job_record_point->details == NULL) || + (job_record_point->details->kill_on_node_fail)) { + last_job_update = time (NULL); + job_record_point->job_state = JOB_NODE_FAIL; + job_record_point->end_time = time(NULL); + deallocate_nodes (job_record_point); + delete_job_details(job_record_point); + } + } + list_iterator_destroy (job_record_iterator); + + return job_count; +} + + /* dump_job_desc - dump the incoming job submit request message */ void dump_job_desc(job_desc_msg_t * job_specs) { long job_id, min_procs, min_memory, min_tmp_disk, num_procs; - long num_nodes, time_limit, priority, contiguous, shared; + long num_nodes, time_limit, priority, contiguous; + long kill_on_node_fail, shared; if (job_specs == NULL) return; @@ -786,13 +863,16 @@ dump_job_desc(job_desc_msg_t * job_specs) time_limit = (job_specs->time_limit != NO_VAL) ? job_specs->time_limit : -1 ; priority = (job_specs->priority != NO_VAL) ? job_specs->priority : -1 ; - contiguous = (job_specs->contiguous != (uint16_t) NO_VAL) ? job_specs->contiguous : -1 ; + contiguous = (job_specs->contiguous != (uint16_t) NO_VAL) ? + job_specs->contiguous : -1 ; + kill_on_node_fail = (job_specs->kill_on_node_fail != (uint16_t) NO_VAL) ? + job_specs->kill_on_node_fail : -1 ; shared = (job_specs->shared != (uint16_t) NO_VAL) ? job_specs->shared : -1 ; debug3(" time_limit=%ld priority=%ld contiguous=%ld shared=%ld", time_limit, priority, contiguous, shared); - debug3(" script=\"%s\"", - job_specs->script); + debug3(" kill_on_node_fail=%ld script=\"%s\"", + kill_on_node_fail, job_specs->script); if (job_specs->env_size == 1) debug3(" environment=\"%s\"", job_specs->environment[0]); @@ -1409,6 +1489,8 @@ copy_job_desc_to_job_record ( job_desc_msg_t * job_desc , detail_ptr->shared = job_desc->shared; if (job_desc->contiguous != NO_VAL) detail_ptr->contiguous = job_desc->contiguous; + if (job_desc->kill_on_node_fail != NO_VAL) + detail_ptr->kill_on_node_fail = job_desc->kill_on_node_fail; if (job_desc->min_procs != NO_VAL) detail_ptr->min_procs = job_desc->min_procs; if (job_desc->min_memory != NO_VAL) @@ -1557,7 +1639,8 @@ job_time_limit (void) if ((job_ptr->job_state == JOB_PENDING) || (job_ptr->job_state == JOB_FAILED) || (job_ptr->job_state == JOB_COMPLETE) || - (job_ptr->job_state == JOB_TIMEOUT)) + (job_ptr->job_state == JOB_TIMEOUT) || + (job_ptr->job_state == JOB_NODE_FAIL)) continue; last_job_update = now; info ("Time limit exhausted for job_id %u, terminated", job_ptr->job_id); @@ -1595,6 +1678,8 @@ validate_job_desc ( job_desc_msg_t * job_desc_msg , int allocate ) } if (job_desc_msg->contiguous == NO_VAL) job_desc_msg->contiguous = 0 ; + if (job_desc_msg->kill_on_node_fail == NO_VAL) + job_desc_msg->kill_on_node_fail = 1 ; if (job_desc_msg->shared == NO_VAL) job_desc_msg->shared = 0 ; @@ -2167,6 +2252,12 @@ update_job (job_desc_msg_t * job_specs, uid_t uid) } } + if (job_specs -> kill_on_node_fail != (uint16_t) NO_VAL && detail_ptr) { + detail_ptr -> kill_on_node_fail = job_specs -> kill_on_node_fail; + info ("update_job: setting kill_on_node_fail to %u for job_id %u", + job_specs -> kill_on_node_fail, job_specs -> job_id); + } + if (job_specs -> features && detail_ptr) { if ( super_user ) { if (detail_ptr -> features) @@ -2235,3 +2326,48 @@ update_job (job_desc_msg_t * job_specs, uid_t uid) return error_code; } + +/* validate_jobs_on_node - validate that any jobs that should be on the node are + * actually running, if not clean up the job records and/or node records, + * call this function after validate_node_specs() sets the node state properly */ +void +validate_jobs_on_node ( char *node_name, uint32_t job_count, uint32_t *job_id_ptr) +{ + int i; + struct node_record *node_ptr; + struct job_record *job_ptr; + + node_ptr = find_node_record (node_name); + if (node_ptr == NULL) + return; + + /* If no job is running here, ensure none are assigned to this node */ + if (job_count == 0) { + (void) kill_running_job_by_node_name (node_name); + return; + } + + /* Ensure that jobs which are running are really supposed to be there */ + for (i=0; i<job_count; i++) { + job_ptr = find_job_record (job_id_ptr[i]); + if (job_ptr == NULL) { + error ("Orphan job_id %u reported on node %s", + job_id_ptr[i], node_name); + continue; + } + + if ( (job_ptr->job_state == JOB_STAGE_IN) || + (job_ptr->job_state == JOB_RUNNING) || + (job_ptr->job_state == JOB_STAGE_OUT) ) { + debug ("Registered job_id %u on node %s ", + job_id_ptr[i], node_name); + continue; /* All is well */ + } + error ("Registered job_id %u in state %s on node %s ", + job_id_ptr[i], + job_state_string (job_ptr->job_state), node_name); + job_ptr->job_state = JOB_NODE_FAIL; +/* FIXME: Add code to kill job on this node */ + } + return; +} diff --git a/src/slurmctld/node_mgr.c b/src/slurmctld/node_mgr.c index 480ff7cca14532ad672db97ffec3c9531df78d0d..a354ac3fc4aa263a9e246c01ecbce3e98920d530 100644 --- a/src/slurmctld/node_mgr.c +++ b/src/slurmctld/node_mgr.c @@ -1073,6 +1073,25 @@ validate_node_specs (char *node_name, uint32_t cpus, return error_code; } +/* node_did_resp - record that the specified node is responding */ +void +node_did_resp (char *name) +{ + struct node_record *node_ptr; + int i; + + node_ptr = find_node_record (name); + if (node_ptr == NULL) { + error ("node_not_resp unable to find node %s", name); + return; + } + + i = node_ptr - node_record_table_ptr; + last_node_update = time (NULL); + node_record_table_ptr[i].last_response = time (NULL); + return; +} + /* node_not_resp - record that the specified node is not responding */ void node_not_resp (char *name) diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 76bdc189f9088afd1dc706ad3393bff4252c2aa8..544dc472df213158467371c4849defb427a3b121 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -164,7 +164,7 @@ extern time_t last_step_update; *//* time of last update to job steps */ extern int job_count; /* number of jobs in the system */ -/* job_details - specification of a job's constraints, not required after initiation */ +/* job_details - specification of a job's constraints */ struct job_details { uint32_t magic; /* magic cookie to test data integrity */ uint32_t num_procs; /* minimum number of processors */ @@ -175,6 +175,7 @@ struct job_details { char *features; /* required features */ uint16_t shared; /* 1 if more than one job can execute on a node */ uint16_t contiguous; /* requires contiguous nodes, 1=true, 0=false */ + uint16_t kill_on_node_fail; /* 1 if job should be killed on on failure */ uint32_t min_procs; /* minimum processors per node, MB */ uint32_t min_memory; /* minimum memory per node, MB */ uint32_t min_tmp_disk; /* minimum temporary disk per node, MB */ @@ -318,6 +319,10 @@ extern struct node_record *find_node_record (char *name); /* find_part_record - find a record for partition with specified name */ extern struct part_record *find_part_record (char *name); +/* find_running_job_by_node_name - Given a node name, return a pointer to any + * job currently running on that node */ +extern struct job_record *find_running_job_by_node_name (char *node_name); + /* find_step_record - return a pointer to the step record with the given job_id and step_id */ extern struct step_record * find_step_record(struct job_record *job_ptr, uint16_t step_id); @@ -399,6 +404,9 @@ int mkdir2 (char * path, int modes); /* node_name2bitmap - given a node name regular expression, build a bitmap representation */ extern int node_name2bitmap (char *node_names, bitstr_t **bitmap); +/* node_did_resp - record that the specified node is responding */ +extern void node_did_resp (char *name); + /* node_not_resp - record that the specified node is not responding */ extern void node_not_resp (char *name); @@ -504,6 +512,11 @@ extern int update_node ( update_node_msg_t * update_node_msg ) ; /* update_part - update a partition's configuration data per the supplied specification */ extern int update_part (update_part_msg_t * part_desc ); +/* validate_jobs_on_node - validate that any jobs that should be on the node are + * actually running, if not clean up the job records and/or node records, + * call this function after validate_node_specs() sets the node state properly */ +extern void validate_jobs_on_node ( char *node_name, uint32_t job_count, uint32_t *job_id_ptr); + /* validate_group - validate that the submit uid is authorized to run in this partition */ extern int validate_group (struct part_record *part_ptr, uid_t submit_uid);