diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 9685db7aa63d08490a907facbcd624410c65a5ca..59a25f57f2850576a5588b0ae83c12ac6a7bf095 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -198,7 +198,9 @@ inline static void _update_cred_key(void); static void _slurm_rpc_composite_msg(slurm_msg_t *msg); static void _slurm_rpc_comp_msg_list(composite_msg_t * comp_msg, bool *run_scheduler, - List msg_list_in); + List msg_list_in, + struct timeval *start_tv, + int timeout); extern diag_stats_t slurmctld_diag_stats; @@ -5285,10 +5287,27 @@ _slurm_rpc_kill_job2(slurm_msg_t *msg) END_TIMER2("_slurm_rpc_kill_job2"); } +/* Return the number of micro-seconds between now and argument "tv" */ +static int _delta_tv(struct timeval *tv) +{ + struct timeval now = {0, 0}; + int delta_t; + + if (gettimeofday(&now, NULL)) + return 1; /* Some error */ + + delta_t = (now.tv_sec - tv->tv_sec) * 1000000; + delta_t += (now.tv_usec - tv->tv_usec); + return delta_t; +} + static void _slurm_rpc_composite_msg(slurm_msg_t *msg) { static time_t config_update = 0; static bool defer_sched = false; + static int sched_timeout = 0; + static int active_rpc_cnt = 0; + struct timeval start_tv; bool run_scheduler = false; composite_msg_t *comp_msg, comp_resp_msg; /* Locks: Read configuration, write job, write node */ @@ -5299,22 +5318,47 @@ static void _slurm_rpc_composite_msg(slurm_msg_t *msg) comp_resp_msg.msg_list = list_create(slurm_free_comp_msg_list); comp_msg = (composite_msg_t *) msg->data; - if (slurmctld_conf.debug_flags & DEBUG_FLAG_ROUTE) - info("Processing RPC: MESSAGE_COMPOSITE msg with %d direct " - "messages", comp_msg->msg_list ? - list_count(comp_msg->msg_list) : 0); if (slurmctld_conf.debug_flags & DEBUG_FLAG_ROUTE) - info("msg aggr: entering _slurm_rpc_composite_msg"); + info("Processing RPC: MESSAGE_COMPOSITE msg with %d messages", + comp_msg->msg_list ? list_count(comp_msg->msg_list) : 0); + if (config_update != slurmctld_conf.last_update) { char *sched_params = slurm_get_sched_params(); + int time_limit; + char *tmp_ptr; + defer_sched = (sched_params && strstr(sched_params, "defer")); + + time_limit = slurm_get_msg_timeout() / 2; + if (sched_params && + (tmp_ptr = strstr(sched_params, "max_sched_time="))) { + sched_timeout = atoi(tmp_ptr + 15); + if ((sched_timeout <= 0) || + (sched_timeout > time_limit)) { + error("Invalid max_sched_time: %d", + sched_timeout); + sched_timeout = 0; + } + } + + if (sched_timeout == 0) { + sched_timeout = MAX(time_limit, 1); + sched_timeout = MIN(sched_timeout, 2); + sched_timeout *= 1000000; + } xfree(sched_params); + config_update = slurmctld_conf.last_update; } + + _throttle_start(&active_rpc_cnt); lock_slurmctld(job_write_lock); + gettimeofday(&start_tv, NULL); _slurm_rpc_comp_msg_list(comp_msg, &run_scheduler, - comp_resp_msg.msg_list); + comp_resp_msg.msg_list, &start_tv, + sched_timeout); unlock_slurmctld(job_write_lock); + _throttle_fini(&active_rpc_cnt); if (list_count(comp_resp_msg.msg_list)) { slurm_msg_t resp_msg; @@ -5348,15 +5392,35 @@ static void _slurm_rpc_composite_msg(slurm_msg_t *msg) static void _slurm_rpc_comp_msg_list(composite_msg_t * comp_msg, bool *run_scheduler, - List msg_list_in) + List msg_list_in, + struct timeval *start_tv, + int timeout) { ListIterator itr; slurm_msg_t *next_msg; composite_msg_t *ncomp_msg; composite_msg_t *comp_resp_msg; + /* Locks: Read configuration, write job, write node */ + slurmctld_lock_t job_write_lock = { + READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; + DEF_TIMERS; + + START_TIMER; itr = list_iterator_create(comp_msg->msg_list); while ((next_msg = list_next(itr))) { + if (_delta_tv(start_tv) >= timeout) { + END_TIMER; + if (slurmctld_conf.debug_flags & DEBUG_FLAG_ROUTE) + info("composite message processing " + "yielding locks after running for %s", + TIME_STR); + unlock_slurmctld(job_write_lock); + usleep(10); + lock_slurmctld(job_write_lock); + gettimeofday(start_tv, NULL); + START_TIMER; + } /* The ret_list is used by slurm_send_rc_msg to house replys going back to the nodes. */ @@ -5375,7 +5439,8 @@ static void _slurm_rpc_comp_msg_list(composite_msg_t * comp_msg, "messages", ncomp_msg->msg_list ? list_count(ncomp_msg->msg_list) : 0); _slurm_rpc_comp_msg_list(ncomp_msg, run_scheduler, - comp_resp_msg->msg_list); + comp_resp_msg->msg_list, + start_tv, timeout); if (list_count(comp_resp_msg->msg_list)) { slurm_msg_t *resp_msg = xmalloc_nz(sizeof(slurm_msg_t)); @@ -5414,6 +5479,7 @@ static void _slurm_rpc_comp_msg_list(composite_msg_t * comp_msg, next_msg->ret_list = NULL; } list_iterator_destroy(itr); + END_TIMER; /* NOTE: RPC has no response */ }