diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 829c568c0e4000080f22b7162bb91f2f0a187d63..80071b93c5cdc507ca7d64a7ee095089f9f715c7 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -118,6 +118,8 @@ static int _make_step_cred(struct step_record *step_rec, slurm_cred_t **slurm_cred, uint16_t protocol_version); inline static void _proc_multi_msg(uint32_t rpc_uid, slurm_msg_t *msg); +static int _route_msg_to_origin(slurm_msg_t *msg, char *job_id_str, + uint32_t job_id, uid_t uid); static void _throttle_fini(int *active_rpc_cnt); static void _throttle_start(int *active_rpc_cnt); @@ -3404,42 +3406,12 @@ static void _slurm_rpc_update_job(slurm_msg_t * msg) /* Locks: Read config, write job, write node, read partition, read fed*/ slurmctld_lock_t job_write_lock = { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK }; - slurmctld_lock_t fed_read_lock = - {NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; uid_t uid = g_slurm_auth_get_uid(msg->auth_cred, slurmctld_config.auth_info); - /* route msg to origin cluster if a federated job */ - lock_slurmctld(fed_read_lock); - if (!error_code && !msg->conn && fed_mgr_fed_rec) { - /* Don't send reroute if coming from a federated cluster (aka - * has a msg->conn). */ - uint32_t job_id, origin_id; - - if (job_desc_msg->job_id_str) - job_id = strtol(job_desc_msg->job_id_str, NULL, 10); - else - job_id = job_desc_msg->job_id; - origin_id = fed_mgr_get_cluster_id(job_id); - - if (origin_id && (origin_id != fed_mgr_cluster_rec->fed.id)) { - slurmdb_cluster_rec_t *dst = - fed_mgr_get_cluster_by_id(origin_id); - if (!dst) { - error("couldn't find cluster by cluster id %d", - origin_id); - slurm_send_rc_msg(msg, SLURM_ERROR); - } else { - slurm_send_reroute_msg(msg, dst); - info("%s: REQUEST_UPDATE_JOB job %d uid %d routed to %s", - __func__, job_id, uid, dst->name); - } - - unlock_slurmctld(fed_read_lock); - return; - } - } - unlock_slurmctld(fed_read_lock); + if (!_route_msg_to_origin(msg, job_desc_msg->job_id_str, + job_desc_msg->job_id, uid)) + return; START_TIMER; debug2("Processing RPC: REQUEST_UPDATE_JOB from uid=%d", uid); @@ -5982,3 +5954,50 @@ static void _proc_multi_msg(uint32_t rpc_uid, slurm_msg_t *msg) free_buf(resp_buf); return; } + +/* Route msg to federated job's origin. + * RET returns SLURM_SUCCESS if the msg was routed. + */ +static int _route_msg_to_origin(slurm_msg_t *msg, char *src_job_id_str, + uint32_t src_job_id, uid_t uid) +{ + slurmctld_lock_t fed_read_lock = { + NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK }; + + xassert(msg); + + /* route msg to origin cluster if a federated job */ + lock_slurmctld(fed_read_lock); + if (!msg->conn && fed_mgr_fed_rec) { + /* Don't send reroute if coming from a federated cluster (aka + * has a msg->conn). */ + uint32_t job_id, origin_id; + + if (src_job_id_str) + job_id = strtol(src_job_id_str, NULL, 10); + else + job_id = src_job_id; + origin_id = fed_mgr_get_cluster_id(job_id); + + if (origin_id && (origin_id != fed_mgr_cluster_rec->fed.id)) { + slurmdb_cluster_rec_t *dst = + fed_mgr_get_cluster_by_id(origin_id); + if (!dst) { + error("couldn't find cluster by cluster id %d", + origin_id); + slurm_send_rc_msg(msg, SLURM_ERROR); + } else { + slurm_send_reroute_msg(msg, dst); + info("%s: %s job %d uid %d routed to %s", + __func__, rpc_num2string(msg->msg_type), + job_id, uid, dst->name); + } + + unlock_slurmctld(fed_read_lock); + return SLURM_SUCCESS; + } + } + unlock_slurmctld(fed_read_lock); + + return SLURM_ERROR; +}