From 5ad795fc820cf12fa9e1cc4833f16a5dfc7f6f58 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Tue, 23 Sep 2003 18:12:34 +0000 Subject: [PATCH] Add timers for handling queued agent requests so as to support better scalability. An arbitrary number of requests may be queued and they are processed one per second until the queue is empty or pending requests were last attempted recently (configuration parameters set to 60 seconds as a minimum retry interval). --- src/slurmctld/agent.c | 112 ++++++++++++++++++------------------- src/slurmctld/agent.h | 13 ++--- src/slurmctld/controller.c | 13 +---- src/slurmctld/job_mgr.c | 17 ++++-- src/slurmctld/node_mgr.c | 23 ++++---- src/slurmctld/proc_req.c | 1 - 6 files changed, 83 insertions(+), 96 deletions(-) diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 6e3e4224530..ad630cfc84e 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -121,6 +121,11 @@ typedef struct task_info { void *msg_args_ptr; /* ptr to RPC data to be used */ } task_info_t; +typedef struct queued_request { + agent_arg_t* agent_arg_ptr; /* The queued request */ + time_t last_attempt; /* Time of last xmit attempt */ +} queued_request_t; + static void _alarm_handler(int dummy); static inline void _comm_err(char *node_name); static void _list_delete_retry(void *retry_entry); @@ -624,6 +629,7 @@ static void _alarm_handler(int dummy) static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count) { agent_arg_t *agent_arg_ptr; + queued_request_t *queued_req_ptr = NULL; thd_t *thread_ptr = agent_info_ptr->thread_struct; int i, j; @@ -656,17 +662,20 @@ static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count) count, j); agent_arg_ptr->node_count = j; } - debug2("Queue RPC msg_type=%u, count=%d for retry", + debug2("Queue RPC msg_type=%u, nodes=%d for retry", agent_arg_ptr->msg_type, j); /* add the requeust to a list */ + queued_req_ptr = xmalloc(sizeof(queued_request_t)); + queued_req_ptr->agent_arg_ptr = agent_arg_ptr; + queued_req_ptr->last_attempt = time(NULL); slurm_mutex_lock(&retry_mutex); if (retry_list == NULL) { retry_list = list_create(&_list_delete_retry); if (retry_list == NULL) fatal("list_create failed"); } - if (list_enqueue(retry_list, (void *) agent_arg_ptr) == 0) + if (list_append(retry_list, (void *) queued_req_ptr) == 0) fatal("list_append failed"); slurm_mutex_unlock(&retry_mutex); } @@ -677,89 +686,80 @@ static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count) */ static void _list_delete_retry(void *retry_entry) { - agent_arg_t *agent_arg_ptr; /* pointer to part_record */ + queued_request_t *queued_req_ptr; - agent_arg_ptr = (agent_arg_t *) retry_entry; - xfree(agent_arg_ptr->slurm_addr); - xfree(agent_arg_ptr->node_names); -#if AGENT_IS_THREAD - xfree(agent_arg_ptr->msg_args); -#endif - xfree(agent_arg_ptr); + if (! retry_entry) + return; + + queued_req_ptr = (queued_request_t *) retry_entry; + _purge_agent_args(queued_req_ptr->agent_arg_ptr); + xfree(queued_req_ptr); } /* - * agent_retry - Agent for retrying pending RPCs (top one on the queue), - * IN args - unused - * RET count of queued requests + * agent_retry - Agent for retrying pending RPCs. One pending request is + * issued if it has been pending for at least min_wait seconds + * IN min_wait - Minimum wait time between re-issue of a pending RPC + * RET count of queued requests remaining */ -int agent_retry (void *args) - +extern int agent_retry (int min_wait) { int list_size = 0; - agent_arg_t *agent_arg_ptr = NULL; + time_t now = time(NULL); + queued_request_t *queued_req_ptr = NULL; slurm_mutex_lock(&retry_mutex); if (retry_list) { + double age = 0; list_size = list_count(retry_list); - agent_arg_ptr = (agent_arg_t *) list_dequeue(retry_list); + queued_req_ptr = (queued_request_t *) list_peek(retry_list); + if (queued_req_ptr) { + age = difftime(now, queued_req_ptr->last_attempt); + if (age > min_wait) + queued_req_ptr = (queued_request_t *) + list_pop(retry_list); + else /* too new */ + queued_req_ptr = NULL; + } } slurm_mutex_unlock(&retry_mutex); - if (agent_arg_ptr) - _spawn_retry_agent(agent_arg_ptr); + if (queued_req_ptr) { + agent_arg_t *agent_arg_ptr = queued_req_ptr->agent_arg_ptr; + xfree(queued_req_ptr); + if (agent_arg_ptr) + _spawn_retry_agent(agent_arg_ptr); + else + error("agent_retry found record with no agent_args"); + } return list_size; } /* - * agent_queue_request - put a request on the queue for later execution + * agent_queue_request - put a new request on the queue for later execution * IN agent_arg_ptr - the request to enqueue */ void agent_queue_request(agent_arg_t *agent_arg_ptr) { + queued_request_t *queued_req_ptr = NULL; + + queued_req_ptr = xmalloc(sizeof(queued_request_t)); + queued_req_ptr->agent_arg_ptr = agent_arg_ptr; +/* queued_req_ptr->last_attempt = 0; Implicit */ + slurm_mutex_lock(&retry_mutex); if (retry_list == NULL) { retry_list = list_create(&_list_delete_retry); if (retry_list == NULL) fatal("list_create failed"); } - list_enqueue(retry_list, (void *)agent_arg_ptr); + list_prepend(retry_list, (void *)queued_req_ptr); slurm_mutex_unlock(&retry_mutex); } -/* retry_pending - retry all pending RPCs for the given node name - * IN node_name - name of a node to executing pending RPCs for */ -void retry_pending(char *node_name) -{ - int list_size = 0, i, j, found; - agent_arg_t *agent_arg_ptr = NULL; - - slurm_mutex_lock(&retry_mutex); - if (retry_list) { - list_size = list_count(retry_list); - } - for (i = 0; i < list_size; i++) { - agent_arg_ptr = (agent_arg_t *) list_dequeue(retry_list); - found = 0; - for (j = 0; j < agent_arg_ptr->node_count; j++) { - if (strncmp - (&agent_arg_ptr->node_names[j * MAX_NAME_LEN], - node_name, MAX_NAME_LEN)) - continue; - found = 1; - break; - } - if (found) /* issue this RPC */ - _spawn_retry_agent(agent_arg_ptr); - else /* put the RPC back on the queue */ - list_enqueue(retry_list, (void *) agent_arg_ptr); - } - slurm_mutex_unlock(&retry_mutex); -} - -/* _spawn_retry_agent - pthread_crate an agent for the given task */ +/* _spawn_retry_agent - pthread_create an agent for the given task */ static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr) { int retries = 0; @@ -808,17 +808,13 @@ static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg) /* agent_purge - purge all pending RPC requests */ void agent_purge(void) { -#if AGENT_IS_THREAD - agent_arg_t *agent_arg_ptr = NULL; - if (retry_list == NULL) return; slurm_mutex_lock(&retry_mutex); - while ((agent_arg_ptr = (agent_arg_t *) list_dequeue(retry_list))) - _purge_agent_args(agent_arg_ptr); + list_destroy(retry_list); + retry_list = NULL; slurm_mutex_unlock(&retry_mutex); -#endif } static void _purge_agent_args(agent_arg_t *agent_arg_ptr) diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h index db87b2b7d01..61d3861448e 100644 --- a/src/slurmctld/agent.h +++ b/src/slurmctld/agent.h @@ -64,15 +64,12 @@ extern void *agent (void *args); extern void agent_queue_request(agent_arg_t *agent_arg_ptr); /* - * agent_retry - Agent for retrying pending RPCs (top one on the queue), - * IN args - unused - * RET count of queued requests + * agent_retry - Agent for retrying pending RPCs. One pending request is + * issued if it has been pending for at least min_wait seconds + * IN min_wait - Minimum wait time between re-issue of a pending RPC + * RET count of queued requests remaining */ -extern int agent_retry (void *args); - -/* retry_pending - retry all pending RPCs for the given node name - * IN node_name - name of a node to executing pending RPCs for */ -extern void retry_pending (char *node_name); +extern int agent_retry (int min_wait); /* agent_purge - purge all pending RPC requests */ extern void agent_purge (void); diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 8ffbd3e5f2f..43d08bbebf6 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -569,7 +569,6 @@ static void *_slurmctld_background(void *no_data) static time_t last_checkpoint_time; static time_t last_group_time; static time_t last_ping_time; - static time_t last_rpc_retry_time; static time_t last_timelimit_time; time_t now; /* Locks: Write job, write node, read partition */ @@ -587,7 +586,7 @@ static void *_slurmctld_background(void *no_data) /* Let the dust settle before doing work */ now = time(NULL); last_sched_time = last_checkpoint_time = last_group_time = now; - last_timelimit_time = last_rpc_retry_time = now; + last_timelimit_time = now; last_ping_time = now + (time_t)MIN_CHECKIN_TIME - (time_t)slurmctld_conf.heartbeat_interval; (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); @@ -634,15 +633,7 @@ static void *_slurmctld_background(void *no_data) unlock_slurmctld(node_write_lock); } - if (difftime(now, last_rpc_retry_time) >= RPC_RETRY_INTERVAL) { - if (agent_retry(NULL) <= 5) { - /* FIXME: Make this timer based */ - /* if there are more than a few requests, - * don't reset the timer, re-issue one - * request per loop */ - last_rpc_retry_time = now; - } - } + (void) agent_retry(RPC_RETRY_INTERVAL); if (difftime(now, last_group_time) >= PERIODIC_GROUP_CHECK) { last_group_time = now; diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c index 5bd973e82e3..93b2132f88c 100644 --- a/src/slurmctld/job_mgr.c +++ b/src/slurmctld/job_mgr.c @@ -2878,7 +2878,17 @@ static void _purge_lost_batch_jobs(int node_inx, time_t now) list_iterator_destroy(job_record_iterator); } -/* _kill_job_on_node - Kill the specific job_id on a specific node */ +/* + * _kill_job_on_node - Kill the specific job_id on a specific node, + * the request is not processed immediately, but queued. + * This is to prevent a flood of pthreads if slurmctld restarts + * without saved state and slurmd daemons register with a + * multitude of running jobs. Slurmctld will not recognize + * these jobs and use this function to kill them - one + * agent request per node as they register. + * IN job_id - id of the job to be killed + * IN node_ptr - pointer to the node on which the job resides + */ static void _kill_job_on_node(uint32_t job_id, struct node_record *node_ptr) { @@ -2892,7 +2902,7 @@ _kill_job_on_node(uint32_t job_id, struct node_record *node_ptr) agent_info = xmalloc(sizeof(agent_arg_t)); agent_info->node_count = 1; - agent_info->retry = 0; + agent_info->retry = 0; agent_info->slurm_addr = xmalloc(sizeof(slurm_addr)); memcpy(agent_info->slurm_addr, &node_ptr->slurm_addr, sizeof(slurm_addr)); @@ -2900,9 +2910,6 @@ _kill_job_on_node(uint32_t job_id, struct node_record *node_ptr) agent_info->msg_type = REQUEST_KILL_JOB; agent_info->msg_args = kill_req; - /* Since we can get a flood of these requests at startup, - * just queue the request rather than creating a pthread - * and possibly creating an unmanagable number of threads */ agent_queue_request(agent_info); } diff --git a/src/slurmctld/node_mgr.c b/src/slurmctld/node_mgr.c index 3a131b1f7d1..bb1744e1ff9 100644 --- a/src/slurmctld/node_mgr.c +++ b/src/slurmctld/node_mgr.c @@ -51,6 +51,7 @@ #include "src/slurmctld/slurmctld.h" #define BUF_SIZE 4096 +#define MAX_RETRIES 10 /* Global variables */ List config_list = NULL; /* list of config_record entries */ @@ -1084,7 +1085,6 @@ validate_node_specs (char *node_name, uint32_t cpus, int error_code, node_inx; struct config_record *config_ptr; struct node_record *node_ptr; - uint16_t resp_state; char *reason_down = NULL; node_ptr = find_node_record (node_name); @@ -1098,9 +1098,10 @@ validate_node_specs (char *node_name, uint32_t cpus, if (cpus < config_ptr->cpus) { error ("Node %s has low cpu count %u", node_name, cpus); error_code = EINVAL; - reason_down = "Low CPUs"; } + reason_down = "Low CPUs"; + } node_ptr->cpus = cpus; - if ((config_ptr->cpus != cpus) && (node_ptr->partition_ptr)) + if ((config_ptr->cpus != cpus) && (node_ptr->partition_ptr)) node_ptr->partition_ptr->total_cpus += (cpus - config_ptr->cpus); @@ -1120,7 +1121,6 @@ validate_node_specs (char *node_name, uint32_t cpus, } node_ptr->tmp_disk = tmp_disk; - resp_state = node_ptr->node_state & NODE_STATE_NO_RESPOND; node_ptr->node_state &= (uint16_t) (~NODE_STATE_NO_RESPOND); if (error_code) { if ((node_ptr->node_state != NODE_STATE_DRAINING) && @@ -1171,7 +1171,6 @@ validate_node_specs (char *node_name, uint32_t cpus, info ("validate_node_specs: node %s returned to service", node_name); xfree(node_ptr->reason); - resp_state = 1; /* just started responding */ reset_job_priority(); } else if ((node_ptr->node_state == NODE_STATE_ALLOCATED) && (job_count == 0)) { /* job vanished */ @@ -1184,8 +1183,6 @@ validate_node_specs (char *node_name, uint32_t cpus, node_inx = node_ptr - node_record_table_ptr; if (job_count == 0) bit_set (idle_node_bitmap, node_inx); - if (resp_state) /* Do all pending RPCs now */ - retry_pending (node_name); if ((node_ptr->node_state == NODE_STATE_DOWN) || (node_ptr->node_state == NODE_STATE_DRAINING) || @@ -1221,10 +1218,8 @@ void node_did_resp (char *name) node_ptr->node_state = NODE_STATE_IDLE; if (node_ptr->node_state == NODE_STATE_IDLE) bit_set (idle_node_bitmap, node_inx); - if (resp_state) { + if (resp_state) info("Node %s now responding", name); - retry_pending (name); /* Do all pending RPCs now */ - } if ((node_ptr->node_state == NODE_STATE_DOWN) || (node_ptr->node_state == NODE_STATE_DRAINING) || (node_ptr->node_state == NODE_STATE_DRAINED)) @@ -1389,6 +1384,7 @@ void msg_to_slurmd (slurm_msg_type_t msg_type) agent_arg_t *kill_agent_args; pthread_attr_t kill_attr_agent; pthread_t kill_thread_agent; + int retries = 0; kill_agent_args = xmalloc (sizeof (agent_arg_t)); kill_agent_args->msg_type = msg_type; @@ -1431,11 +1427,12 @@ void msg_to_slurmd (slurm_msg_type_t msg_type) PTHREAD_SCOPE_SYSTEM)) error ("pthread_attr_setscope error %m"); #endif - if (pthread_create (&kill_thread_agent, &kill_attr_agent, + while (pthread_create (&kill_thread_agent, &kill_attr_agent, agent, (void *)kill_agent_args)) { error ("pthread_create error %m"); - /* Queue the request for later processing */ - agent_queue_request(kill_agent_args); + if (++retries > MAX_RETRIES) + fatal("Can't create pthread"); + sleep(1); /* sleep and try again */ } } } diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 630e0ba5fda..75c8773f899 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -56,7 +56,6 @@ # include "src/common/qsw.h" #endif -#include "src/slurmctld/agent.h" #include "src/slurmctld/locks.h" #include "src/slurmctld/proc_req.h" #include "src/slurmctld/read_config.h" -- GitLab