From 398c4d9f6bef8f6b5ed1bb6edeb8d1c5b8c2f3a3 Mon Sep 17 00:00:00 2001 From: Brian Christiansen <brian@schedmd.com> Date: Thu, 25 May 2017 13:18:33 -0600 Subject: [PATCH] Fix fed communications to talk on siblings proto Clusters in the federation could be different rpc_versions so each cluster needs to talk each other's language. --- src/slurmctld/fed_mgr.c | 82 +++++++++++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 28 deletions(-) diff --git a/src/slurmctld/fed_mgr.c b/src/slurmctld/fed_mgr.c index 5559ea4fdce..c1e236b4005 100644 --- a/src/slurmctld/fed_mgr.c +++ b/src/slurmctld/fed_mgr.c @@ -850,7 +850,7 @@ static int _persist_update_job(slurmdb_cluster_rec_t *conn, uint32_t job_id, slurm_msg_t_init(&tmp_msg); tmp_msg.msg_type = REQUEST_UPDATE_JOB; tmp_msg.data = data; - tmp_msg.protocol_version = SLURM_PROTOCOL_VERSION; + tmp_msg.protocol_version = conn->rpc_version; buffer = init_buf(BUF_SIZE); pack_msg(&tmp_msg, buffer); @@ -864,8 +864,9 @@ static int _persist_update_job(slurmdb_cluster_rec_t *conn, uint32_t job_id, sib_msg.job_id = job_id; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = tmp_msg.protocol_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, 0, false); @@ -889,8 +890,9 @@ static int _persist_update_job_resp(slurmdb_cluster_rec_t *conn, sib_msg.return_code = return_code; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = conn->rpc_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, job_id, false); @@ -922,8 +924,9 @@ static int _persist_fed_job_revoke(slurmdb_cluster_rec_t *conn, uint32_t job_id, sib_msg.return_code = return_code; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = conn->rpc_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, job_id, false); @@ -945,8 +948,9 @@ static int _persist_fed_job_response(slurmdb_cluster_rec_t *conn, uint32_t job_i sib_msg.return_code = return_code; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = conn->rpc_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, job_id, false); @@ -983,7 +987,8 @@ static int _persist_fed_job_lock(slurmdb_cluster_rec_t *conn, uint32_t job_id, else req_msg.msg_type = REQUEST_SIB_JOB_UNLOCK; - req_msg.data = &sib_msg; + req_msg.protocol_version = conn->rpc_version; + req_msg.data = &sib_msg; if (_send_recv_msg(conn, &req_msg, &resp_msg, false)) { rc = SLURM_PROTOCOL_ERROR; @@ -1038,8 +1043,9 @@ static int _persist_fed_job_start(slurmdb_cluster_rec_t *conn, sib_msg.cluster_id = cluster_id; sib_msg.start_time = start_time; - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = conn->rpc_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, job_id, false); @@ -1075,7 +1081,7 @@ static int _persist_fed_job_cancel(slurmdb_cluster_rec_t *conn, uint32_t job_id, slurm_msg_t_init(&tmp_msg); tmp_msg.msg_type = REQUEST_CANCEL_JOB_STEP; tmp_msg.data = &kill_req; - tmp_msg.protocol_version = SLURM_PROTOCOL_VERSION; + tmp_msg.protocol_version = conn->rpc_version; buffer = init_buf(BUF_SIZE); pack_msg(&tmp_msg, buffer); @@ -1088,8 +1094,9 @@ static int _persist_fed_job_cancel(slurmdb_cluster_rec_t *conn, uint32_t job_id, sib_msg.req_uid = uid; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = tmp_msg.protocol_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, job_id, false); @@ -1124,7 +1131,7 @@ static int _persist_fed_job_requeue(slurmdb_cluster_rec_t *conn, slurm_msg_t_init(&tmp_msg); tmp_msg.msg_type = REQUEST_JOB_REQUEUE; tmp_msg.data = &requeue_req; - tmp_msg.protocol_version = SLURM_PROTOCOL_VERSION; + tmp_msg.protocol_version = conn->rpc_version; buffer = init_buf(BUF_SIZE); pack_msg(&tmp_msg, buffer); @@ -1137,8 +1144,9 @@ static int _persist_fed_job_requeue(slurmdb_cluster_rec_t *conn, sib_msg.data_version = tmp_msg.protocol_version; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = tmp_msg.protocol_version; + req_msg.data = &sib_msg; rc = _queue_rpc(conn, &req_msg, job_id, false); @@ -1749,8 +1757,9 @@ extern int _handle_fed_send_job_sync(fed_job_update_info_t *job_update_info) sib_msg.start_time = sync_time; slurm_msg_t_init(&req_msg); - req_msg.msg_type = REQUEST_SIB_MSG; - req_msg.data = &sib_msg; + req_msg.msg_type = REQUEST_SIB_MSG; + req_msg.protocol_version = job_msg.protocol_version; + req_msg.data = &sib_msg; rc = _queue_rpc(sibling, &req_msg, 0, false); @@ -2731,6 +2740,8 @@ static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg, sib_msg_t sib_msg = {0}; slurmdb_cluster_rec_t *sibling = NULL; slurm_msg_t req_msg; + uint16_t last_rpc_version = NO_VAL16; + Buf buffer = NULL; xassert(job_desc); xassert(msg); @@ -2767,6 +2778,25 @@ static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg, else sib_msg.sib_msg_type = FED_JOB_SUBMIT_BATCH; + /* Pack message buffer according to sibling's rpc version. A + * submission from a client will already have a buffer with the + * packed job_desc from the client. If this controller is + * submitting new sibling jobs then the buffer needs to be + * packed according to each siblings rpc_version. */ + if (!msg->buffer && + (last_rpc_version != sibling->rpc_version)) { + free_buf(buffer); + msg->protocol_version = sibling->rpc_version; + buffer = init_buf(BUF_SIZE); + pack_msg(msg, buffer); + sib_msg.data_buffer = buffer; + sib_msg.data_version = msg->protocol_version; + + last_rpc_version = sibling->rpc_version; + } + + req_msg.protocol_version = sibling->rpc_version; + if (!(rc = _queue_rpc(sibling, &req_msg, 0, false))) job_desc->fed_siblings_active |= FED_SIBLING_BIT(sibling->fed.id); @@ -2774,6 +2804,8 @@ static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg, } list_iterator_destroy(sib_itr); + free_buf(buffer); + return ret_rc; } @@ -2789,7 +2821,6 @@ static int _prepare_submit_siblings(struct job_record *job_ptr, int rc = SLURM_SUCCESS; uint32_t origin_id; job_desc_msg_t *job_desc; - Buf buffer; slurm_msg_t msg; xassert(job_ptr); @@ -2804,15 +2835,11 @@ static int _prepare_submit_siblings(struct job_record *job_ptr, if (!(job_desc = copy_job_record_to_job_desc(job_ptr))) return SLURM_ERROR; - /* have to pack job_desc into a buffer */ + /* Have to pack job_desc into a buffer. _submit_sibling_jobs will pack + * the job_desc according to each sibling's rpc_version. */ slurm_msg_t_init(&msg); msg.msg_type = REQUEST_RESOURCE_ALLOCATION; msg.data = job_desc; - msg.protocol_version = SLURM_PROTOCOL_VERSION; - - buffer = init_buf(BUF_SIZE); - pack_msg(&msg, buffer); - msg.buffer = buffer; if (_submit_sibling_jobs(job_desc, &msg, false, dest_sibs)) error("Failed to submit fed job to siblings"); @@ -2827,7 +2854,6 @@ static int _prepare_submit_siblings(struct job_record *job_ptr, job_ptr->fed_details->siblings_active |= job_desc->fed_siblings_active; update_job_fed_details(job_ptr); - free_buf(buffer); /* free the environment since all strings are stored in one * xmalloced buffer */ if (job_desc->environment) { -- GitLab