diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index 321d5616e21edad64de404aefacc532c7f5f99f6..b01f92f06f37183fa0ee9e7c76bb74460034a2ad 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -3841,6 +3841,7 @@ extern int slurm_free_msg_data(slurm_msg_type_t type, void *data) break; case REQUEST_SIB_JOB_WILL_RUN: case REQUEST_SIB_SUBMIT_BATCH_JOB: + case REQUEST_SIB_RESOURCE_ALLOCATION: slurm_free_sib_msg(data); break; case RESPONSE_JOB_WILL_RUN: @@ -4448,6 +4449,8 @@ rpc_num2string(uint16_t opcode) return "REQUEST_SIB_JOB_WILL_RUN"; case REQUEST_SIB_SUBMIT_BATCH_JOB: return "REQUEST_SIB_SUBMIT_BATCH_JOB"; + case REQUEST_SIB_RESOURCE_ALLOCATION: + return "REQUEST_SIB_RESOURCE_ALLOCATION"; case RESPONSE_JOB_WILL_RUN: return "RESPONSE_JOB_WILL_RUN"; case REQUEST_JOB_ALLOCATION_INFO: diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 77f3e044c892ad434fbab8e4bdf6d01db92737cc..e88a1ebe2517e6f61798cb02991350704a713a23 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -288,6 +288,7 @@ typedef enum { RESPONSE_JOB_SBCAST_CRED, REQUEST_SIB_JOB_WILL_RUN, REQUEST_SIB_SUBMIT_BATCH_JOB, + REQUEST_SIB_RESOURCE_ALLOCATION, REQUEST_JOB_STEP_CREATE = 5001, RESPONSE_JOB_STEP_CREATE, diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index b042573041828089d866e29224982087e5d57be4..38087582b3e12241da851d87a8316891df6795b0 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -1092,6 +1092,7 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) break; case REQUEST_SIB_JOB_WILL_RUN: case REQUEST_SIB_SUBMIT_BATCH_JOB: + case REQUEST_SIB_RESOURCE_ALLOCATION: _pack_sib_msg((sib_msg_t *)msg->data, buffer, msg->protocol_version); break; @@ -1768,6 +1769,7 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) break; case REQUEST_SIB_JOB_WILL_RUN: case REQUEST_SIB_SUBMIT_BATCH_JOB: + case REQUEST_SIB_RESOURCE_ALLOCATION: rc = _unpack_sib_msg((sib_msg_t **)&(msg->data), buffer, msg->protocol_version); break; diff --git a/src/slurmctld/fed_mgr.c b/src/slurmctld/fed_mgr.c index 39345251cda4bdba99f1b101ac6e99f79c3f4cd4..2b2eb7bffd36320c5f0290366a5150b2f76057c0 100644 --- a/src/slurmctld/fed_mgr.c +++ b/src/slurmctld/fed_mgr.c @@ -600,6 +600,49 @@ end_it: return rc; } +static int _persist_allocte_resources(slurmdb_cluster_rec_t *conn, + sib_msg_t *sib_msg, + resource_allocation_response_msg_t **resp) +{ + int rc = SLURM_PROTOCOL_SUCCESS; + slurm_msg_t req_msg, resp_msg; + + *resp = NULL; + + slurm_msg_t_init(&req_msg); + slurm_msg_t_init(&resp_msg); + + req_msg.msg_type = REQUEST_SIB_RESOURCE_ALLOCATION; + req_msg.data = sib_msg; + + rc = _send_recv_msg(conn, &req_msg, &resp_msg, false); + if (rc) { + rc = SLURM_PROTOCOL_ERROR; + goto end_it; + } + + switch (resp_msg.msg_type) { + case RESPONSE_SLURM_RC: + if ((rc = ((return_code_msg_t *) resp_msg.data)->return_code)) { + slurm_seterrno(rc); + rc = SLURM_PROTOCOL_ERROR; + } + break; + case RESPONSE_RESOURCE_ALLOCATION: + *resp = (resource_allocation_response_msg_t *) resp_msg.data; + resp_msg.data = NULL; + break; + default: + slurm_seterrno(SLURM_UNEXPECTED_MSG_ERROR); + rc = SLURM_PROTOCOL_ERROR; + } + +end_it: + slurm_free_msg_members(&resp_msg); + + return rc; +} + static int _persist_update_job(slurmdb_cluster_rec_t *conn, job_desc_msg_t *data) { @@ -1376,7 +1419,31 @@ static slurmdb_cluster_rec_t *_find_start_now_sib(slurm_msg_t *msg, return ret_sib; } -static void *_submit_sibling_job(void *arg) +static void *_submit_sibling_allocation(void *arg) +{ + int rc = SLURM_SUCCESS; + resource_allocation_response_msg_t *alloc_resp = NULL; + sib_submit_t *sub = (sib_submit_t *)arg; + slurmdb_cluster_rec_t *sibling = sub->sibling; + sib_msg_t *sib_msg = sub->sib_msg; + + if ((rc = _persist_allocte_resources(sibling, sib_msg, &alloc_resp))) { + error("Failed to submit job to sibling %s: %m", sibling->name); + } else if (!alloc_resp) { + error("Got a success back without a resp. This shouldn't happen"); + rc = SLURM_ERROR; + } else if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR) { + info("Submitted federated allocation %u to %s", + alloc_resp->job_id, sibling->name); + } + sub->thread_rc = rc; + + slurm_free_resource_allocation_response_msg(alloc_resp); + + return NULL; +} + +static void *_submit_sibling_batch_job(void *arg) { int rc = SLURM_SUCCESS; submit_response_msg_t *resp = NULL; @@ -1415,12 +1482,14 @@ static void *_update_sibling_job(void *arg) * * IN job_desc - job_desc containing job_id and fed_siblings of job to be. * IN msg - contains the original job_desc buffer to send to the siblings. + * IN alloc_only - true if just an allocation. false if a batch job. * RET returns SLURM_SUCCESS if all siblings recieved the job sucessfully or * SLURM_ERROR if any siblings failed to receive the job. If a sibling * fails, then the sucessful siblings will be updated with the correct * sibling bitmap. */ -static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg) +static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg, + bool alloc_only) { int rc = SLURM_SUCCESS; ListIterator sib_itr, thread_itr; @@ -1459,7 +1528,9 @@ static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg) sub->sibling = sibling; sub->sib_msg = &sib_msg; if (pthread_create(&thread_id, &attr, - _submit_sibling_job, sub) != 0) { + ((alloc_only) ? + _submit_sibling_allocation : + _submit_sibling_batch_job), sub) != 0) { error("failed to create submit_sibling_job_thread"); xfree(sub); continue; @@ -1552,6 +1623,7 @@ static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg) * * IN msg - msg that contains packed job_desc msg to send to siblings. * IN job_desc - original job_desc msg. + * IN alloc_only - true if requesting just an allocation (srun/salloc). * IN uid - uid of user requesting allocation. * IN protocol_version - version of the code the caller is using * OUT job_id_ptr - job_id of allocated job @@ -1561,7 +1633,8 @@ static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg) * otherwise. */ extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc, - uid_t uid, uint16_t protocol_version, + bool alloc_only, uid_t uid, + uint16_t protocol_version, uint32_t *job_id_ptr, int *alloc_code, char **err_msg) { @@ -1611,7 +1684,8 @@ extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc, * fails, then don't worry about sending to the siblings. */ lock_slurmctld(job_write_lock); *alloc_code = job_allocate(job_desc, job_desc->immediate, false, NULL, - uid, &job_ptr, err_msg, protocol_version); + alloc_only, uid, &job_ptr, err_msg, + protocol_version); if (!job_ptr || (*alloc_code && job_ptr->job_state == JOB_FAILED)) { unlock_slurmctld(job_write_lock); @@ -1632,7 +1706,7 @@ extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc, unlock_slurmctld(job_write_lock); - if (_submit_sibling_jobs(job_desc, msg)) { + if (_submit_sibling_jobs(job_desc, msg, alloc_only)) { /* failed to submit a sibling job to a sibling. Need to update * the local job's sibling bitmap */ diff --git a/src/slurmctld/fed_mgr.h b/src/slurmctld/fed_mgr.h index a78491f9e894617bad47ae6c16986aecd45bb61b..f85782df39d342082c8155b3a3f68f00a3a6efd4 100644 --- a/src/slurmctld/fed_mgr.h +++ b/src/slurmctld/fed_mgr.h @@ -54,8 +54,8 @@ extern int fed_mgr_init(void *db_conn); extern bool fed_mgr_is_active(); extern bool fed_mgr_is_tracker_only_job(struct job_record *job_ptr); extern int fed_mgr_job_allocate(slurm_msg_t *msg, - job_desc_msg_t *job_desc, uid_t uid, - uint16_t protocol_version, + job_desc_msg_t *job_desc, bool alloc_only, + uid_t uid, uint16_t protocol_version, uint32_t *job_id_ptr, int *alloc_code, char **err_msg); extern int fed_mgr_sib_will_run(slurm_msg_t *msg, diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 1b176ee8e92478cee400c6b7569826765162e214..5593876e6958d3163713605c517be2026a3a7d45 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -390,6 +390,27 @@ void slurmctld_req(slurm_msg_t *msg, connection_arg_t *arg) break; } + case REQUEST_SIB_RESOURCE_ALLOCATION: + { + uint16_t tmp_version = msg->protocol_version; + sib_msg_t *sib_msg = msg->data; + job_desc_msg_t *job_desc = sib_msg->data; + job_desc->job_id = sib_msg->job_id; + job_desc->fed_siblings = sib_msg->fed_siblings; + + /* set protocol version to that of the client's version so that + * the job's start_protocol_version is that of the client's and + * not the calling controllers. */ + msg->protocol_version = sib_msg->data_version; + msg->data = job_desc; + + _slurm_rpc_allocate_resources(msg); + + msg->data = sib_msg; + msg->protocol_version = tmp_version; + + break; + } case MESSAGE_NODE_REGISTRATION_STATUS: _slurm_rpc_node_registration(msg, 0); break; @@ -1100,24 +1121,60 @@ static void _slurm_rpc_allocate_resources(slurm_msg_t * msg) if (error_code == SLURM_SUCCESS) { do_unlock = true; _throttle_start(&active_rpc_cnt); - lock_slurmctld(job_write_lock); - error_code = job_allocate(job_desc_msg, immediate, - false, NULL, - true, uid, &job_ptr, - &err_msg, - msg->protocol_version); - /* unlock after finished using the job structure data */ + if (job_desc_msg->job_id == SLURM_BATCH_SCRIPT && + fed_mgr_is_active()) { + uint32_t job_id; + if (fed_mgr_job_allocate( + msg, job_desc_msg, true, + uid, + msg->protocol_version, + &job_id, &error_code, + &err_msg)) { + do_unlock = false; + _throttle_fini(&active_rpc_cnt); + reject_job = true; + } else { + /* fed_mgr_job_allocate grabs and + * releases job_write_lock on its own to + * prevent waiting/locking on siblings + * to reply. Now grab the lock and grab + * the jobid. */ + lock_slurmctld(job_write_lock); + if (!(job_ptr = + find_job_record(job_id))) { + error("%s: can't find fed job that was just created. this should never happen", + __func__); + reject_job = true; + error_code = SLURM_ERROR; + } + } + } else { + lock_slurmctld(job_write_lock); + + error_code = job_allocate( + job_desc_msg, immediate, false, + NULL, true, uid, &job_ptr, + &err_msg, + msg->protocol_version); + /* unlock after finished using the job structure + * data */ + + /* return result */ + if (!job_ptr || + (error_code && + job_ptr->job_state == JOB_FAILED)) + reject_job = true; + } END_TIMER2("_slurm_rpc_allocate_resources"); } - } else if (errno) - error_code = errno; - else - error_code = SLURM_ERROR; - - /* return result */ - if (!job_ptr || (error_code && job_ptr->job_state == JOB_FAILED)) + } else { reject_job = true; + if (errno) + error_code = errno; + else + error_code = SLURM_ERROR; + } if (!reject_job) { xassert(job_ptr); @@ -3498,7 +3555,7 @@ static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg) if (job_desc_msg->job_id == SLURM_BATCH_SCRIPT && fed_mgr_is_active()) { /* make sure it's not a submitted sib job. */ - if (fed_mgr_job_allocate(msg, job_desc_msg, uid, + if (fed_mgr_job_allocate(msg, job_desc_msg, false, uid, msg->protocol_version, &job_id, &error_code, &err_msg)) reject_job = true;