From 174e842148b82b2a1bc8860958bc3ddc2efb1191 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Mon, 23 Sep 2002 18:50:37 +0000 Subject: [PATCH] Added step_id along with job_id list of active jobs on node registration. Expanded error recovery logic on slurmd registration. --- src/slurmctld/agent.c | 35 +++++++++++++++++---- src/slurmctld/controller.c | 3 +- src/slurmctld/job_mgr.c | 62 ++++++++++++++++++++++++++++++++------ src/slurmctld/slurmctld.h | 10 +++--- 4 files changed, 89 insertions(+), 21 deletions(-) diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index c87c89cd1f6..1076e657204 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -255,7 +255,8 @@ wdog (void *args) /* Locks: Write node */ slurmctld_lock_t node_write_lock = { NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK }; #else - struct sockaddr_in *slurm_addr; + int done_cnt; + char *slurm_names; #endif while (1) { @@ -301,17 +302,22 @@ wdog (void *args) unlock_slurmctld (node_write_lock); #else /* Build a list of all non-responding nodes and send it to slurmctld */ - error ("agent/wdog: %d nodes failed to respond", fail_cnt); - slurm_addr = xmalloc (fail_cnt * sizeof (struct sockaddr_in)); + slurm_names = xmalloc (fail_cnt * MAX_NAME_LEN); + fail_cnt = 0; for (i = 0; i < agent_ptr->thread_count; i++) { - if (thread_ptr[i].state == DSH_FAILED) - slurm_addr[fail_cnt++] = thread_ptr[i].slurm_addr; + if (thread_ptr[i].state == DSH_FAILED) { + strncpy (&slurm_names[MAX_NAME_LEN * fail_cnt], + thread_ptr[i].node_name, MAX_NAME_LEN); + error ("agent/wdog: node %s failed to respond", + thread_ptr[i].node_name); + fail_cnt++; + } } /* send RPC */ fatal ("Code development needed here if agent is not thread"); - xfree (slurm_addr); + xfree (slurm_names); #endif } #if AGENT_IS_THREAD @@ -322,6 +328,23 @@ wdog (void *args) node_did_resp (thread_ptr[i].node_name); } unlock_slurmctld (node_write_lock); +#else + /* Build a list of all responding nodes and send it to slurmctld to update time stamps */ + done_cnt = agent_ptr->thread_count - fail_cnt; + slurm_names = xmalloc (done_cnt * MAX_NAME_LEN); + done_cnt = 0; + for (i = 0; i < agent_ptr->thread_count; i++) { + if (thread_ptr[i].state == DSH_DONE) { + strncpy (&slurm_names[MAX_NAME_LEN * done_cnt], + thread_ptr[i].node_name, MAX_NAME_LEN); + done_cnt++; + } + } + + /* send RPC */ + fatal ("Code development needed here if agent is not thread"); + + xfree (slurm_addr); #endif if (max_delay) debug ("agent maximum delay %d seconds", max_delay); diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index c1ccaea6440..92204b369cb 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -1598,7 +1598,8 @@ slurm_rpc_node_registration ( slurm_msg_t * msg ) validate_jobs_on_node ( node_reg_stat_msg -> node_name , node_reg_stat_msg -> job_count , - node_reg_stat_msg -> job_id ) ; + node_reg_stat_msg -> job_id , + node_reg_stat_msg -> step_id ) ; unlock_slurmctld (job_write_lock); } diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c index 8d9880de63e..769590648fe 100644 --- a/src/slurmctld/job_mgr.c +++ b/src/slurmctld/job_mgr.c @@ -32,6 +32,7 @@ #include <ctype.h> #include <errno.h> +#include <signal.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -94,12 +95,12 @@ int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int allocate, void list_delete_job (void *job_entry); int list_find_job_id (void *job_entry, void *key); int list_find_job_old (void *job_entry, void *key); +void signal_job_on_node (uint32_t job_id, uint16_t step_id, int signum, char *node_name); int top_priority (struct job_record *job_ptr); int validate_job_desc ( job_desc_msg_t * job_desc_msg , int allocate ) ; int write_data_to_file ( char * file_name, char * data ) ; int write_data_array_to_file ( char * file_name, char ** data, uint16_t size ) ; - /* * create_job_record - create an empty job_record including job_details. * load its values with defaults (zeros, nulls, and magic cookie) @@ -2327,19 +2328,24 @@ update_job (job_desc_msg_t * job_specs, uid_t uid) return error_code; } + /* validate_jobs_on_node - validate that any jobs that should be on the node are * actually running, if not clean up the job records and/or node records, * call this function after validate_node_specs() sets the node state properly */ -void -validate_jobs_on_node ( char *node_name, uint32_t job_count, uint32_t *job_id_ptr) +void +validate_jobs_on_node ( char *node_name, uint32_t job_count, + uint32_t *job_id_ptr, uint16_t *step_id_ptr) { - int i; + int i, node_inx; struct node_record *node_ptr; struct job_record *job_ptr; node_ptr = find_node_record (node_name); - if (node_ptr == NULL) + if (node_ptr == NULL) { + error ("slurmd registered on unknown node %s", node_name); return; + } + node_inx = node_ptr - node_record_table_ptr; /* If no job is running here, ensure none are assigned to this node */ if (job_count == 0) { @@ -2353,21 +2359,57 @@ validate_jobs_on_node ( char *node_name, uint32_t job_count, uint32_t *job_id_pt if (job_ptr == NULL) { error ("Orphan job_id %u reported on node %s", job_id_ptr[i], node_name); + signal_job_on_node (job_id_ptr[i], step_id_ptr[i], + SIGKILL, node_name); + /* We may well have pending purge job RPC to send slurmd, + * which would synchronize this */ continue; } if ( (job_ptr->job_state == JOB_STAGE_IN) || (job_ptr->job_state == JOB_RUNNING) || (job_ptr->job_state == JOB_STAGE_OUT) ) { - debug ("Registered job_id %u on node %s ", - job_id_ptr[i], node_name); - continue; /* All is well */ + if ( bit_test (job_ptr->node_bitmap, node_inx)) + debug3 ("Registered job_id %u on node %s ", + job_id_ptr[i], node_name); /* All is well */ + else { + error ("REGISTERED JOB_ID %u ON WRONG NODE %s ", + job_id_ptr[i], node_name); /* Very bad */ + signal_job_on_node (job_id_ptr[i], step_id_ptr[i], + SIGKILL, node_name); + } } + continue; + + if (job_ptr->job_state == JOB_PENDING) { + /* FIXME: In the future try to let job run */ + error ("REGISTERED PENDING JOB_ID %u ON NODE %s ", + job_id_ptr[i], node_name); /* Very bad */ + job_ptr->job_state = JOB_FAILED; + last_job_update = time (NULL); + job_ptr->end_time = time(NULL); + delete_job_details(job_ptr); + signal_job_on_node (job_id_ptr[i], step_id_ptr[i], + SIGKILL, node_name); + continue; + } + + /* else job is supposed to be done */ error ("Registered job_id %u in state %s on node %s ", job_id_ptr[i], job_state_string (job_ptr->job_state), node_name); - job_ptr->job_state = JOB_NODE_FAIL; -/* FIXME: Add code to kill job on this node */ + signal_job_on_node (job_id_ptr[i], step_id_ptr[i], + SIGKILL, node_name); + /* We may well have pending purge job RPC to send slurmd, + * which would synchronize this */ } return; } + +/* signal_job_on_node - send specific signal to specific job_id, step_id and node_name */ +void +signal_job_on_node (uint32_t job_id, uint16_t step_id, int signum, char *node_name) +{ + /* FIXME: add code to send RPC to specified node */ + error ("CODE DEVELOPMENT NEEDED HERE"); +} diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 544dc472df2..59dcc929031 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -28,6 +28,7 @@ #define _HAVE_SLURM_H #include <pthread.h> +#include <stdint.h> #include <stdlib.h> #include <time.h> #include <sys/types.h> @@ -512,13 +513,14 @@ extern int update_node ( update_node_msg_t * update_node_msg ) ; /* update_part - update a partition's configuration data per the supplied specification */ extern int update_part (update_part_msg_t * part_desc ); +/* validate_group - validate that the submit uid is authorized to run in this partition */ +extern int validate_group (struct part_record *part_ptr, uid_t submit_uid); + /* validate_jobs_on_node - validate that any jobs that should be on the node are * actually running, if not clean up the job records and/or node records, * call this function after validate_node_specs() sets the node state properly */ -extern void validate_jobs_on_node ( char *node_name, uint32_t job_count, uint32_t *job_id_ptr); - -/* validate_group - validate that the submit uid is authorized to run in this partition */ -extern int validate_group (struct part_record *part_ptr, uid_t submit_uid); +extern void validate_jobs_on_node ( char *node_name, uint32_t job_count, + uint32_t *job_id_ptr, uint16_t *step_id_ptr); /* validate_node_specs - validate the node's specifications as valid */ extern int validate_node_specs (char *node_name, -- GitLab