diff --git a/src/common/msg_aggr.c b/src/common/msg_aggr.c index 9d9d709a6252014d8bc4823f5b22889680b6f438..448356f69af024fafff1b48fa02c550a33e4c5e5 100644 --- a/src/common/msg_aggr.c +++ b/src/common/msg_aggr.c @@ -76,6 +76,7 @@ typedef struct { typedef struct { uint16_t msg_index; + void (*resp_callback) (slurm_msg_t *msg); pthread_cond_t wait_cond; } msg_aggr_t; @@ -342,7 +343,8 @@ extern void msg_aggr_sender_fini(void) slurm_mutex_destroy(&msg_collection.mutex); } -extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait) +extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait, + void (*resp_callback) (slurm_msg_t *msg)) { int count; static uint16_t msg_index = 1; @@ -381,7 +383,7 @@ extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait) struct timespec timeout; msg_aggr->msg_index = msg->msg_index; - + msg_aggr->resp_callback = resp_callback; pthread_cond_init(&msg_aggr->wait_cond, NULL); slurm_mutex_lock(&msg_collection.aggr_mutex); @@ -422,7 +424,7 @@ extern void msg_aggr_add_comp(Buf buffer, void *auth_cred, header_t *header) msg->data = buffer; msg->data_size = remaining_buf(buffer); - msg_aggr_add_msg(msg, 0); + msg_aggr_add_msg(msg, 0, NULL); } extern void msg_aggr_resp(slurm_msg_t *msg) @@ -438,11 +440,12 @@ extern void msg_aggr_resp(slurm_msg_t *msg) info("msg_aggr_resp: processing composite msg_list..."); while ((next_msg = list_next(itr))) { switch (next_msg->msg_type) { + case REQUEST_BATCH_JOB_LAUNCH: case RESPONSE_SLURM_RC: /* signal sending thread that slurmctld received this - * epilog complete msg */ + * msg */ if (msg_collection.debug_flags & DEBUG_FLAG_ROUTE) - info("msg_aggr_resp: rc message found for " + info("msg_aggr_resp: response found for " "index %u signaling sending thread", next_msg->msg_index); slurm_mutex_lock(&msg_collection.aggr_mutex); @@ -454,6 +457,9 @@ extern void msg_aggr_resp(slurm_msg_t *msg) slurm_mutex_unlock(&msg_collection.aggr_mutex); continue; } + if (msg_aggr->resp_callback && + (next_msg->msg_type != RESPONSE_SLURM_RC)) + (*(msg_aggr->resp_callback))(next_msg); pthread_cond_signal(&msg_aggr->wait_cond); slurm_mutex_unlock(&msg_collection.aggr_mutex); break; diff --git a/src/common/msg_aggr.h b/src/common/msg_aggr.h index aceec8655b10d44f116ee7cafd64f8d40540e61e..f2ad2b89c42e283e5178ca6e74f7dc7949922a9c 100644 --- a/src/common/msg_aggr.h +++ b/src/common/msg_aggr.h @@ -49,7 +49,13 @@ extern void msg_aggr_sender_init(char *host, uint16_t port, uint64_t window, extern void msg_aggr_sender_reconfig(uint64_t window, uint64_t max_msg_cnt); extern void msg_aggr_sender_fini(void); -extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait); +/* add a message that needs to be sent. + * IN: msg - message to be sent + * IN: wait - whether or not we need to wait for a response + * IN: resp_callback - function to process response + */ +extern void msg_aggr_add_msg(slurm_msg_t *msg, bool wait, + void (*resp_callback) (slurm_msg_t *msg)); extern void msg_aggr_add_comp(Buf buffer, void *auth_cred, header_t *header); extern void msg_aggr_resp(slurm_msg_t *msg); diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index f4fe03a34c06967a3d2f1638300fb8b369671b70..a7bc0ce351f6d082951d26428fa2434abf22f521 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -458,7 +458,7 @@ slurmd_req(slurm_msg_t *msg) case MESSAGE_COMPOSITE: error("Processing RPC: MESSAGE_COMPOSITE: " "This should never happen"); - msg_aggr_add_msg(msg, 0); + msg_aggr_add_msg(msg, 0, NULL); break; case RESPONSE_MESSAGE_COMPOSITE: debug2("Processing RPC: RESPONSE_MESSAGE_COMPOSITE"); @@ -3897,7 +3897,7 @@ _epilog_complete(uint32_t jobid, int rc) /* we need to copy this symbol */ req->node_name = xstrdup(conf->node_name); - msg_aggr_add_msg(msg, 0); + msg_aggr_add_msg(msg, 0, NULL); } else { slurm_msg_t msg; epilog_complete_msg_t req; diff --git a/src/slurmd/slurmd/slurmd.c b/src/slurmd/slurmd/slurmd.c index 66b2b996d47ca09dabbd6f9ad786a2a332aecdbc..82fb2e015d39d7cf49f446799d61a2b27ca999e8 100644 --- a/src/slurmd/slurmd/slurmd.c +++ b/src/slurmd/slurmd/slurmd.c @@ -622,7 +622,7 @@ send_registration_msg(uint32_t status, bool startup) req->msg_type = MESSAGE_NODE_REGISTRATION_STATUS; req->data = msg; - msg_aggr_add_msg(req, 1); + msg_aggr_add_msg(req, 1, NULL); } else { slurm_msg_t req; slurm_msg_t_init(&req);