diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 6e3e42245309e26b18dcc8fb5fd0edd60f551bdc..ad630cfc84ecea4765b4a51084322902e2f3814f 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -121,6 +121,11 @@ typedef struct task_info { void *msg_args_ptr; /* ptr to RPC data to be used */ } task_info_t; +typedef struct queued_request { + agent_arg_t* agent_arg_ptr; /* The queued request */ + time_t last_attempt; /* Time of last xmit attempt */ +} queued_request_t; + static void _alarm_handler(int dummy); static inline void _comm_err(char *node_name); static void _list_delete_retry(void *retry_entry); @@ -624,6 +629,7 @@ static void _alarm_handler(int dummy) static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count) { agent_arg_t *agent_arg_ptr; + queued_request_t *queued_req_ptr = NULL; thd_t *thread_ptr = agent_info_ptr->thread_struct; int i, j; @@ -656,17 +662,20 @@ static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count) count, j); agent_arg_ptr->node_count = j; } - debug2("Queue RPC msg_type=%u, count=%d for retry", + debug2("Queue RPC msg_type=%u, nodes=%d for retry", agent_arg_ptr->msg_type, j); /* add the requeust to a list */ + queued_req_ptr = xmalloc(sizeof(queued_request_t)); + queued_req_ptr->agent_arg_ptr = agent_arg_ptr; + queued_req_ptr->last_attempt = time(NULL); slurm_mutex_lock(&retry_mutex); if (retry_list == NULL) { retry_list = list_create(&_list_delete_retry); if (retry_list == NULL) fatal("list_create failed"); } - if (list_enqueue(retry_list, (void *) agent_arg_ptr) == 0) + if (list_append(retry_list, (void *) queued_req_ptr) == 0) fatal("list_append failed"); slurm_mutex_unlock(&retry_mutex); } @@ -677,89 +686,80 @@ static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count) */ static void _list_delete_retry(void *retry_entry) { - agent_arg_t *agent_arg_ptr; /* pointer to part_record */ + queued_request_t *queued_req_ptr; - agent_arg_ptr = (agent_arg_t *) retry_entry; - xfree(agent_arg_ptr->slurm_addr); - xfree(agent_arg_ptr->node_names); -#if AGENT_IS_THREAD - xfree(agent_arg_ptr->msg_args); -#endif - xfree(agent_arg_ptr); + if (! retry_entry) + return; + + queued_req_ptr = (queued_request_t *) retry_entry; + _purge_agent_args(queued_req_ptr->agent_arg_ptr); + xfree(queued_req_ptr); } /* - * agent_retry - Agent for retrying pending RPCs (top one on the queue), - * IN args - unused - * RET count of queued requests + * agent_retry - Agent for retrying pending RPCs. One pending request is + * issued if it has been pending for at least min_wait seconds + * IN min_wait - Minimum wait time between re-issue of a pending RPC + * RET count of queued requests remaining */ -int agent_retry (void *args) - +extern int agent_retry (int min_wait) { int list_size = 0; - agent_arg_t *agent_arg_ptr = NULL; + time_t now = time(NULL); + queued_request_t *queued_req_ptr = NULL; slurm_mutex_lock(&retry_mutex); if (retry_list) { + double age = 0; list_size = list_count(retry_list); - agent_arg_ptr = (agent_arg_t *) list_dequeue(retry_list); + queued_req_ptr = (queued_request_t *) list_peek(retry_list); + if (queued_req_ptr) { + age = difftime(now, queued_req_ptr->last_attempt); + if (age > min_wait) + queued_req_ptr = (queued_request_t *) + list_pop(retry_list); + else /* too new */ + queued_req_ptr = NULL; + } } slurm_mutex_unlock(&retry_mutex); - if (agent_arg_ptr) - _spawn_retry_agent(agent_arg_ptr); + if (queued_req_ptr) { + agent_arg_t *agent_arg_ptr = queued_req_ptr->agent_arg_ptr; + xfree(queued_req_ptr); + if (agent_arg_ptr) + _spawn_retry_agent(agent_arg_ptr); + else + error("agent_retry found record with no agent_args"); + } return list_size; } /* - * agent_queue_request - put a request on the queue for later execution + * agent_queue_request - put a new request on the queue for later execution * IN agent_arg_ptr - the request to enqueue */ void agent_queue_request(agent_arg_t *agent_arg_ptr) { + queued_request_t *queued_req_ptr = NULL; + + queued_req_ptr = xmalloc(sizeof(queued_request_t)); + queued_req_ptr->agent_arg_ptr = agent_arg_ptr; +/* queued_req_ptr->last_attempt = 0; Implicit */ + slurm_mutex_lock(&retry_mutex); if (retry_list == NULL) { retry_list = list_create(&_list_delete_retry); if (retry_list == NULL) fatal("list_create failed"); } - list_enqueue(retry_list, (void *)agent_arg_ptr); + list_prepend(retry_list, (void *)queued_req_ptr); slurm_mutex_unlock(&retry_mutex); } -/* retry_pending - retry all pending RPCs for the given node name - * IN node_name - name of a node to executing pending RPCs for */ -void retry_pending(char *node_name) -{ - int list_size = 0, i, j, found; - agent_arg_t *agent_arg_ptr = NULL; - - slurm_mutex_lock(&retry_mutex); - if (retry_list) { - list_size = list_count(retry_list); - } - for (i = 0; i < list_size; i++) { - agent_arg_ptr = (agent_arg_t *) list_dequeue(retry_list); - found = 0; - for (j = 0; j < agent_arg_ptr->node_count; j++) { - if (strncmp - (&agent_arg_ptr->node_names[j * MAX_NAME_LEN], - node_name, MAX_NAME_LEN)) - continue; - found = 1; - break; - } - if (found) /* issue this RPC */ - _spawn_retry_agent(agent_arg_ptr); - else /* put the RPC back on the queue */ - list_enqueue(retry_list, (void *) agent_arg_ptr); - } - slurm_mutex_unlock(&retry_mutex); -} - -/* _spawn_retry_agent - pthread_crate an agent for the given task */ +/* _spawn_retry_agent - pthread_create an agent for the given task */ static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr) { int retries = 0; @@ -808,17 +808,13 @@ static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg) /* agent_purge - purge all pending RPC requests */ void agent_purge(void) { -#if AGENT_IS_THREAD - agent_arg_t *agent_arg_ptr = NULL; - if (retry_list == NULL) return; slurm_mutex_lock(&retry_mutex); - while ((agent_arg_ptr = (agent_arg_t *) list_dequeue(retry_list))) - _purge_agent_args(agent_arg_ptr); + list_destroy(retry_list); + retry_list = NULL; slurm_mutex_unlock(&retry_mutex); -#endif } static void _purge_agent_args(agent_arg_t *agent_arg_ptr) diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h index db87b2b7d011a5a18c0d691e5d84f76845c2b958..61d3861448e864eb7ed4d0dcf3ebf898e8410de9 100644 --- a/src/slurmctld/agent.h +++ b/src/slurmctld/agent.h @@ -64,15 +64,12 @@ extern void *agent (void *args); extern void agent_queue_request(agent_arg_t *agent_arg_ptr); /* - * agent_retry - Agent for retrying pending RPCs (top one on the queue), - * IN args - unused - * RET count of queued requests + * agent_retry - Agent for retrying pending RPCs. One pending request is + * issued if it has been pending for at least min_wait seconds + * IN min_wait - Minimum wait time between re-issue of a pending RPC + * RET count of queued requests remaining */ -extern int agent_retry (void *args); - -/* retry_pending - retry all pending RPCs for the given node name - * IN node_name - name of a node to executing pending RPCs for */ -extern void retry_pending (char *node_name); +extern int agent_retry (int min_wait); /* agent_purge - purge all pending RPC requests */ extern void agent_purge (void); diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 8ffbd3e5f2f78056b3d6644ae5de9ba2f517cf37..43d08bbebf66b04c09515e77382358eb2e6a1314 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -569,7 +569,6 @@ static void *_slurmctld_background(void *no_data) static time_t last_checkpoint_time; static time_t last_group_time; static time_t last_ping_time; - static time_t last_rpc_retry_time; static time_t last_timelimit_time; time_t now; /* Locks: Write job, write node, read partition */ @@ -587,7 +586,7 @@ static void *_slurmctld_background(void *no_data) /* Let the dust settle before doing work */ now = time(NULL); last_sched_time = last_checkpoint_time = last_group_time = now; - last_timelimit_time = last_rpc_retry_time = now; + last_timelimit_time = now; last_ping_time = now + (time_t)MIN_CHECKIN_TIME - (time_t)slurmctld_conf.heartbeat_interval; (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); @@ -634,15 +633,7 @@ static void *_slurmctld_background(void *no_data) unlock_slurmctld(node_write_lock); } - if (difftime(now, last_rpc_retry_time) >= RPC_RETRY_INTERVAL) { - if (agent_retry(NULL) <= 5) { - /* FIXME: Make this timer based */ - /* if there are more than a few requests, - * don't reset the timer, re-issue one - * request per loop */ - last_rpc_retry_time = now; - } - } + (void) agent_retry(RPC_RETRY_INTERVAL); if (difftime(now, last_group_time) >= PERIODIC_GROUP_CHECK) { last_group_time = now; diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c index 5bd973e82e3e2353809cb205ad0696eefc5d23c5..93b2132f88cf75317e9985389e8433a9124d11b5 100644 --- a/src/slurmctld/job_mgr.c +++ b/src/slurmctld/job_mgr.c @@ -2878,7 +2878,17 @@ static void _purge_lost_batch_jobs(int node_inx, time_t now) list_iterator_destroy(job_record_iterator); } -/* _kill_job_on_node - Kill the specific job_id on a specific node */ +/* + * _kill_job_on_node - Kill the specific job_id on a specific node, + * the request is not processed immediately, but queued. + * This is to prevent a flood of pthreads if slurmctld restarts + * without saved state and slurmd daemons register with a + * multitude of running jobs. Slurmctld will not recognize + * these jobs and use this function to kill them - one + * agent request per node as they register. + * IN job_id - id of the job to be killed + * IN node_ptr - pointer to the node on which the job resides + */ static void _kill_job_on_node(uint32_t job_id, struct node_record *node_ptr) { @@ -2892,7 +2902,7 @@ _kill_job_on_node(uint32_t job_id, struct node_record *node_ptr) agent_info = xmalloc(sizeof(agent_arg_t)); agent_info->node_count = 1; - agent_info->retry = 0; + agent_info->retry = 0; agent_info->slurm_addr = xmalloc(sizeof(slurm_addr)); memcpy(agent_info->slurm_addr, &node_ptr->slurm_addr, sizeof(slurm_addr)); @@ -2900,9 +2910,6 @@ _kill_job_on_node(uint32_t job_id, struct node_record *node_ptr) agent_info->msg_type = REQUEST_KILL_JOB; agent_info->msg_args = kill_req; - /* Since we can get a flood of these requests at startup, - * just queue the request rather than creating a pthread - * and possibly creating an unmanagable number of threads */ agent_queue_request(agent_info); } diff --git a/src/slurmctld/node_mgr.c b/src/slurmctld/node_mgr.c index 3a131b1f7d1f28779bf1ea333ff6c17de635117b..bb1744e1ff952c4bde3c30d2c91862c2a6f63a77 100644 --- a/src/slurmctld/node_mgr.c +++ b/src/slurmctld/node_mgr.c @@ -51,6 +51,7 @@ #include "src/slurmctld/slurmctld.h" #define BUF_SIZE 4096 +#define MAX_RETRIES 10 /* Global variables */ List config_list = NULL; /* list of config_record entries */ @@ -1084,7 +1085,6 @@ validate_node_specs (char *node_name, uint32_t cpus, int error_code, node_inx; struct config_record *config_ptr; struct node_record *node_ptr; - uint16_t resp_state; char *reason_down = NULL; node_ptr = find_node_record (node_name); @@ -1098,9 +1098,10 @@ validate_node_specs (char *node_name, uint32_t cpus, if (cpus < config_ptr->cpus) { error ("Node %s has low cpu count %u", node_name, cpus); error_code = EINVAL; - reason_down = "Low CPUs"; } + reason_down = "Low CPUs"; + } node_ptr->cpus = cpus; - if ((config_ptr->cpus != cpus) && (node_ptr->partition_ptr)) + if ((config_ptr->cpus != cpus) && (node_ptr->partition_ptr)) node_ptr->partition_ptr->total_cpus += (cpus - config_ptr->cpus); @@ -1120,7 +1121,6 @@ validate_node_specs (char *node_name, uint32_t cpus, } node_ptr->tmp_disk = tmp_disk; - resp_state = node_ptr->node_state & NODE_STATE_NO_RESPOND; node_ptr->node_state &= (uint16_t) (~NODE_STATE_NO_RESPOND); if (error_code) { if ((node_ptr->node_state != NODE_STATE_DRAINING) && @@ -1171,7 +1171,6 @@ validate_node_specs (char *node_name, uint32_t cpus, info ("validate_node_specs: node %s returned to service", node_name); xfree(node_ptr->reason); - resp_state = 1; /* just started responding */ reset_job_priority(); } else if ((node_ptr->node_state == NODE_STATE_ALLOCATED) && (job_count == 0)) { /* job vanished */ @@ -1184,8 +1183,6 @@ validate_node_specs (char *node_name, uint32_t cpus, node_inx = node_ptr - node_record_table_ptr; if (job_count == 0) bit_set (idle_node_bitmap, node_inx); - if (resp_state) /* Do all pending RPCs now */ - retry_pending (node_name); if ((node_ptr->node_state == NODE_STATE_DOWN) || (node_ptr->node_state == NODE_STATE_DRAINING) || @@ -1221,10 +1218,8 @@ void node_did_resp (char *name) node_ptr->node_state = NODE_STATE_IDLE; if (node_ptr->node_state == NODE_STATE_IDLE) bit_set (idle_node_bitmap, node_inx); - if (resp_state) { + if (resp_state) info("Node %s now responding", name); - retry_pending (name); /* Do all pending RPCs now */ - } if ((node_ptr->node_state == NODE_STATE_DOWN) || (node_ptr->node_state == NODE_STATE_DRAINING) || (node_ptr->node_state == NODE_STATE_DRAINED)) @@ -1389,6 +1384,7 @@ void msg_to_slurmd (slurm_msg_type_t msg_type) agent_arg_t *kill_agent_args; pthread_attr_t kill_attr_agent; pthread_t kill_thread_agent; + int retries = 0; kill_agent_args = xmalloc (sizeof (agent_arg_t)); kill_agent_args->msg_type = msg_type; @@ -1431,11 +1427,12 @@ void msg_to_slurmd (slurm_msg_type_t msg_type) PTHREAD_SCOPE_SYSTEM)) error ("pthread_attr_setscope error %m"); #endif - if (pthread_create (&kill_thread_agent, &kill_attr_agent, + while (pthread_create (&kill_thread_agent, &kill_attr_agent, agent, (void *)kill_agent_args)) { error ("pthread_create error %m"); - /* Queue the request for later processing */ - agent_queue_request(kill_agent_args); + if (++retries > MAX_RETRIES) + fatal("Can't create pthread"); + sleep(1); /* sleep and try again */ } } } diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 630e0ba5fdaa2e67bb0a8d0461bffae6f7ff5e29..75c8773f899b9a2cc522d6dc6e844922cf070d64 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -56,7 +56,6 @@ # include "src/common/qsw.h" #endif -#include "src/slurmctld/agent.h" #include "src/slurmctld/locks.h" #include "src/slurmctld/proc_req.h" #include "src/slurmctld/read_config.h"