From af4173b30fd3dd1ad5792da354bd30e17d77ba74 Mon Sep 17 00:00:00 2001 From: Danny Auble <da@llnl.gov> Date: Thu, 18 May 2006 16:32:58 +0000 Subject: [PATCH] Added Logic for fanout to failover to forward list if main node is unreachable --- NEWS | 2 + src/common/forward.c | 190 +++++++++++++++++++++++++++---- src/common/forward.h | 21 ++++ src/common/global_srun.c | 39 ++++++- src/common/read_config.c | 3 +- src/common/slurm_protocol_api.c | 71 ++++-------- src/common/slurm_protocol_pack.c | 3 +- src/sacct/sacct_stat.c | 2 +- src/sbcast/agent.c | 12 +- src/slurmctld/agent.c | 90 ++++++++++++--- src/slurmctld/node_mgr.c | 4 +- src/slurmctld/ping_nodes.c | 13 ++- src/slurmd/slurmd/req.c | 6 +- src/slurmd/slurmd/slurmd.c | 13 +-- 14 files changed, 356 insertions(+), 113 deletions(-) diff --git a/NEWS b/NEWS index 893b7bf06a5..b30dd132f78 100644 --- a/NEWS +++ b/NEWS @@ -10,6 +10,8 @@ documents those changes that are of interest to users and admins. times for all SLURM commands. NOTE: This may break Moab, Maui, and/or LSF schedulers. -- Fix for srun -n and -O options when paired with -b. + -- Added logic for fanout to failover to forward list if main node is + unreachable * Changes in SLURM 1.1.0-pre8 ============================= diff --git a/src/common/forward.c b/src/common/forward.c index 2d737c37373..4f366d73978 100644 --- a/src/common/forward.c +++ b/src/common/forward.c @@ -50,6 +50,7 @@ #define MAX_RETRIES 3 int _destroy_data_info_data(uint32_t type, ret_data_info_t *ret_data_info); + void *_forward_thread(void *arg) { forward_msg_t *fwd_msg = (forward_msg_t *)arg; @@ -65,12 +66,23 @@ void *_forward_thread(void *arg) char name[MAX_SLURM_NAME]; msg.forward.cnt = 0; - +start_again: + /* info("sending to %s with %d forwards", */ +/* fwd_msg->node_name, fwd_msg->header.forward.cnt); */ if ((fd = slurm_open_msg_conn(&fwd_msg->addr)) < 0) { - error("forward_thread: %m",fwd_msg->header.forward.cnt); - ret_list = list_create(destroy_ret_types); - no_resp_forwards(&fwd_msg->header.forward, &ret_list, errno); - goto nothing_sent; + error("forward_thread to %s: %m", fwd_msg->node_name); + slurm_mutex_lock(fwd_msg->forward_mutex); + if(forward_msg_to_next(fwd_msg, errno)) { + slurm_mutex_unlock(fwd_msg->forward_mutex); + free_buf(buffer); + buffer = init_buf(0); + goto start_again; + } + goto cleanup; + /* ret_list = list_create(destroy_ret_types); */ +/* no_resp_forwards(&fwd_msg->header.forward, &ret_list, errno); */ +/* goto nothing_sent; */ + } pack_header(&fwd_msg->header, buffer); @@ -93,9 +105,17 @@ void *_forward_thread(void *arg) get_buf_offset(buffer), SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) < 0) { error("forward_thread: slurm_msg_sendto: %m"); - ret_list = list_create(destroy_ret_types); - no_resp_forwards(&fwd_msg->header.forward, &ret_list, errno); - goto nothing_sent; + slurm_mutex_lock(fwd_msg->forward_mutex); + if(forward_msg_to_next(fwd_msg, errno)) { + slurm_mutex_unlock(fwd_msg->forward_mutex); + free_buf(buffer); + buffer = init_buf(0); + goto start_again; + } + goto cleanup; + /* ret_list = list_create(destroy_ret_types); */ +/* no_resp_forwards(&fwd_msg->header.forward, &ret_list, errno); */ +/* goto nothing_sent; */ } if ((fwd_msg->header.msg_type == REQUEST_SHUTDOWN) || @@ -124,10 +144,19 @@ void *_forward_thread(void *arg) ret_list = slurm_receive_msg(fd, &msg, fwd_msg->timeout); - if(!ret_list || list_count(ret_list) == 0) { - no_resp_forwards(&fwd_msg->header.forward, &ret_list, errno); + if(!ret_list || (fwd_msg->header.forward.cnt != 0 + && list_count(ret_list) == 0)) { + slurm_mutex_lock(fwd_msg->forward_mutex); + if(forward_msg_to_next(fwd_msg, errno)) { + slurm_mutex_unlock(fwd_msg->forward_mutex); + free_buf(buffer); + buffer = init_buf(0); + goto start_again; + } + goto cleanup; + //no_resp_forwards(&fwd_msg->header.forward, &ret_list, errno); } -nothing_sent: +//nothing_sent: type = xmalloc(sizeof(ret_types_t)); type->err = errno; list_push(ret_list, type); @@ -158,6 +187,8 @@ nothing_sent: ret_data_list))) { list_push(type->ret_data_list, ret_data_info); + /* info("got %s", */ +/* ret_data_info->node_name); */ } break; } @@ -174,6 +205,7 @@ nothing_sent: while((ret_data_info = list_pop(returned_type->ret_data_list))) { list_push(type->ret_data_list, ret_data_info); + //info("got %s",ret_data_info->node_name); } } destroy_ret_types(returned_type); @@ -238,6 +270,7 @@ int _destroy_data_info_data(uint32_t type, ret_data_info_t *ret_data_info) case REQUEST_OLD_JOB_RESOURCE_ALLOCATION: slurm_free_old_job_alloc_msg(ret_data_info->data); break; + case 0: case REQUEST_PING: case REQUEST_RECONFIGURE: case REQUEST_CONTROL: @@ -318,7 +351,7 @@ int _destroy_data_info_data(uint32_t type, ret_data_info_t *ret_data_info) slurm_free_return_code_msg(ret_data_info->data); break; default: - error("invalid RPC ret_type=%d", type); + error("invalid FORWARD ret_type=%d", type); break; } return SLURM_SUCCESS; @@ -333,8 +366,13 @@ int _destroy_data_info_data(uint32_t type, ret_data_info_t *ret_data_info) */ extern void forward_init(forward_t *forward, forward_t *from) { - if(from) { - forward = from; + if(from && from->init == FORWARD_INIT) { + forward->cnt = from->cnt; + forward->timeout = from->timeout; + forward->addr = from->addr; + forward->name = from->name; + forward->node_id = from->node_id; + forward->init = from->init; } else { forward->cnt = 0; forward->timeout = 0; @@ -427,6 +465,80 @@ extern int forward_msg(forward_struct_t *forward_struct, return SLURM_SUCCESS; } +/* + * forward_msg_to_next- logic to change the address and forward structure of a + * message to the next one in the queue and mark the + * current node as having an error adding it to the return + * list of the fwd_msg. + * + * IN: fwd_msg - forward_msg_t * - holds information about message + * and the childern it was suppose + * to forward to + * IN: err - int - error message from attempt + * + * RET: 0/1 - int - if 1 more to forward to 0 if + * no one left to forward to. + * you need to slurm_mutex_lock(fwd_msg->forward_mutex); + * before coming in here + */ +extern int forward_msg_to_next(forward_msg_t *fwd_msg, int err) +{ + ret_data_info_t *ret_data_info = NULL; + ret_types_t *type = NULL; + int i = 0; + int prev_cnt = fwd_msg->header.forward.cnt; + forward_t forward; + ListIterator itr; + + forward_init(&forward, NULL); + debug3("problems with %s", fwd_msg->node_name); + if(fwd_msg->ret_list) { + ret_data_info = xmalloc(sizeof(ret_data_info_t)); + ret_data_info->node_name = xstrdup(fwd_msg->node_name); + ret_data_info->nodeid = fwd_msg->header.srun_node_id; + itr = list_iterator_create(fwd_msg->ret_list); + while((type = (ret_types_t *) list_next(itr)) != NULL) { + if(type->msg_rc == SLURM_ERROR){ + list_push(type->ret_data_list, ret_data_info); + break; + } + } + list_iterator_destroy(itr); + + if(!type) { + type = xmalloc(sizeof(ret_types_t)); + list_push(fwd_msg->ret_list, type); + type->type = REQUEST_PING; + type->msg_rc = SLURM_ERROR; + type->err = err; + type->ret_data_list = list_create(destroy_data_info); + list_push(type->ret_data_list, ret_data_info); + } + } + if(prev_cnt == 0) { + debug3("no more to send to"); + return 0; + } + + fwd_msg->header.srun_node_id = fwd_msg->header.forward.node_id[0]; + memcpy(&fwd_msg->addr, &fwd_msg->header.forward.addr[0], + sizeof(slurm_addr)); + strncpy(fwd_msg->node_name, + &fwd_msg->header.forward.name[i * MAX_SLURM_NAME], + MAX_SLURM_NAME); + i = 0; + + forward_set(&forward, + prev_cnt, + &i, + &fwd_msg->header.forward); + + destroy_forward(&fwd_msg->header.forward); + forward_init(&fwd_msg->header.forward, &forward); + + return 1; +} + /* * forward_set - divide a mesage up into components for forwarding * IN: forward - forward_t * - struct to store forward info @@ -444,7 +556,7 @@ extern int forward_set(forward_t *forward, int j = 1; int total = from->cnt; - /* char name[MAX_SLURM_NAME]; */ +/* char name[MAX_SLURM_NAME]; */ /* strncpy(name, */ /* &from->name[(*pos) * MAX_SLURM_NAME], */ /* MAX_SLURM_NAME); */ @@ -458,12 +570,11 @@ extern int forward_set(forward_t *forward, forward->node_id = xmalloc(sizeof(int32_t) * span); forward->timeout = from->timeout; forward->init = FORWARD_INIT; - + while(j<span && ((*pos+j) < total)) { memcpy(&forward->addr[j-1], &from->addr[*pos+j], sizeof(slurm_addr)); - //forward->addr[j-1] = forward_addr[*pos+j]; strncpy(&forward->name[(j-1) * MAX_SLURM_NAME], &from->name[(*pos+j) * MAX_SLURM_NAME], MAX_SLURM_NAME); @@ -473,7 +584,7 @@ extern int forward_set(forward_t *forward, else forward->node_id[j-1] = 0; - /* strncpy(name, */ +/* strncpy(name, */ /* &from->name[(*pos+j) * MAX_SLURM_NAME], */ /* MAX_SLURM_NAME); */ /* info("along with %s",name); */ @@ -569,6 +680,47 @@ extern int forward_set_launch(forward_t *forward, return SLURM_SUCCESS; } +extern void forward_wait(slurm_msg_t * msg) +{ + int count = 0; + ret_types_t *ret_type = NULL; + ListIterator itr; + + /* wait for all the other messages on the tree under us */ + if(msg->forward_struct_init == FORWARD_INIT && msg->forward_struct) { + debug2("looking for %d", msg->forward_struct->fwd_cnt); + slurm_mutex_lock(&msg->forward_struct->forward_mutex); + count = 0; + if (msg->ret_list != NULL) { + itr = list_iterator_create(msg->ret_list); + while((ret_type = list_next(itr)) != NULL) { + count += list_count(ret_type->ret_data_list); + } + list_iterator_destroy(itr); + } + debug2("Got back %d", count); + while((count < msg->forward_struct->fwd_cnt)) { + pthread_cond_wait(&msg->forward_struct->notify, + &msg->forward_struct->forward_mutex); + count = 0; + if (msg->ret_list != NULL) { + itr = list_iterator_create(msg->ret_list); + while((ret_type = list_next(itr)) != NULL) { + count += list_count( + ret_type->ret_data_list); + } + list_iterator_destroy(itr); + } + debug2("Got back %d", count); + + } + debug2("Got them all"); + slurm_mutex_unlock(&msg->forward_struct->forward_mutex); + destroy_forward_struct(msg->forward_struct); + } + return; +} + extern int no_resp_forwards(forward_t *forward, List *ret_list, int err) { ret_types_t *type = NULL; @@ -610,7 +762,7 @@ void destroy_data_info(void *object) void destroy_forward(forward_t *forward) { - if(forward->cnt > 0) { + if(forward->init == FORWARD_INIT) { xfree(forward->addr); xfree(forward->name); xfree(forward->node_id); diff --git a/src/common/forward.h b/src/common/forward.h index a02cc6062c8..13ca1907fba 100644 --- a/src/common/forward.h +++ b/src/common/forward.h @@ -77,6 +77,24 @@ if (forward_msg(forward_struct, &header) == SLURM_ERROR) { extern int forward_msg(forward_struct_t *forward_struct, header_t *header); +/* + * forward_msg_to_next- logic to change the address and forward structure of a + * message to the next one in the queue and mark the + * current node as having an error adding it to the return + * list of the fwd_msg. + * + * IN: fwd_msg - forward_msg_t * - holds information about message + * and the childern it was suppose + * to forward to + * IN: err - int - error message from attempt + * + * RET: 0/1 - int - if 1 more to forward to 0 if + * no one left to forward to. + * you need to slurm_mutex_lock(fwd_msg->forward_mutex); + * before coming in here + */ +extern int forward_msg_to_next(forward_msg_t *fwd_msg, int err); + /* * forward_set - divide a mesage up into components for forwarding * IN: forward - forward_t * - struct to store forward info @@ -211,6 +229,9 @@ extern int forward_set_launch (forward_t *forward, hostlist_iterator_t itr, int32_t timeout); + +extern void forward_wait(slurm_msg_t *msg); + /* * no_resp_forward - Used to respond for nodes not able to respond since * the parent had failed in some way diff --git a/src/common/global_srun.c b/src/common/global_srun.c index 358aa8de546..aa1b94b157f 100644 --- a/src/common/global_srun.c +++ b/src/common/global_srun.c @@ -43,6 +43,7 @@ #include "src/common/slurm_protocol_api.h" #include "src/common/slurm_protocol_defs.h" #include "src/common/xmalloc.h" +#include "src/common/xstring.h" #include "src/common/xsignal.h" #include "src/common/forward.h" #include "src/common/global_srun.h" @@ -198,19 +199,50 @@ static void * _p_signal_task(void *args) slurm_msg_t *req = info->req_ptr; srun_job_t *job = info->job_ptr; char *host = job->step_layout->host[info->host_inx]; + char *tmpchar = NULL; List ret_list = NULL; ListIterator itr; ret_types_t *ret_type = NULL; ret_data_info_t *ret_data_info = NULL; + List tmp_ret_list = NULL; + forward_msg_t fwd_msg; +send_rc_again: debug3("sending signal to host %s", host); if ((ret_list = slurm_send_recv_rc_msg(req, 0)) == NULL) { errno = SLURM_SOCKET_ERROR; error("%s: signal: %m", host); - goto done; + if(!tmp_ret_list) + tmp_ret_list = list_create(destroy_ret_types); + + fwd_msg.header.srun_node_id = req->srun_node_id; + fwd_msg.header.forward = req->forward; + fwd_msg.ret_list = tmp_ret_list; + strncpy(fwd_msg.node_name, host, MAX_SLURM_NAME); + fwd_msg.forward_mutex = NULL; + if(forward_msg_to_next(&fwd_msg, errno)) { + req->address = fwd_msg.addr; + req->forward = fwd_msg.header.forward; + req->srun_node_id = fwd_msg.header.srun_node_id; + xfree(tmpchar); + tmpchar = xstrdup(fwd_msg.node_name); + host = tmpchar; + goto send_rc_again; + } } - + if(tmp_ret_list) { + if(!ret_list) + ret_list = tmp_ret_list; + else { + while((ret_type = list_pop(tmp_ret_list))) + list_push(ret_list, ret_type); + list_destroy(tmp_ret_list); + } + } + xfree(tmpchar); + if(!ret_list) + goto done; itr = list_iterator_create(ret_list); while((ret_type = list_next(itr)) != NULL) { rc = ret_type->msg_rc; @@ -238,8 +270,7 @@ static void * _p_signal_task(void *args) } list_iterator_destroy(itr); list_destroy(ret_list); - - done: +done: slurm_mutex_lock(&active_mutex); active--; pthread_cond_signal(&active_cond); diff --git a/src/common/read_config.c b/src/common/read_config.c index 097c88fea8b..fe090f3ffbc 100644 --- a/src/common/read_config.c +++ b/src/common/read_config.c @@ -1139,8 +1139,9 @@ slurm_conf_reinit(char *file_name) /* could check modified time on slurm.conf here */ _destroy_slurm_conf(); } - + _init_slurm_conf(file_name); + conf_initialized = true; pthread_mutex_unlock(&conf_lock); diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 53e1ab042db..28784276a60 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -730,6 +730,7 @@ List slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout) */ if (_slurm_msg_recvfrom_timeout(fd, &buf, &buflen, 0, timeout) < 0) { forward_init(&header.forward, NULL); + /* no need to init forward_struct_init here */ rc = errno; goto total_return; } @@ -776,6 +777,7 @@ List slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout) memcpy(msg->forward_struct->buf, &buffer->head[buffer->processed], msg->forward_struct->buf_len); + msg->forward_struct->ret_list = ret_list; msg->forward_struct->timeout = timeout-header.forward.timeout; @@ -945,9 +947,6 @@ int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg) Buf buffer; int rc; void * auth_cred; - int count = 0; - ret_types_t *ret_type = NULL; - ListIterator itr; /* * Initialize header with Auth credential and message type. @@ -961,42 +960,10 @@ int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg) if(msg->forward.init != FORWARD_INIT) { forward_init(&msg->forward, NULL); + /* no need to init forward_struct_init here */ msg->ret_list = NULL; } - - /* wait for all the other messages on the tree under us */ - if(msg->forward_struct_init == FORWARD_INIT && msg->forward_struct) { - debug3("looking for %d", msg->forward_struct->fwd_cnt); - slurm_mutex_lock(&msg->forward_struct->forward_mutex); - count = 0; - if (msg->ret_list != NULL) { - itr = list_iterator_create(msg->ret_list); - while((ret_type = (ret_types_t *) list_next(itr)) - != NULL) { - count += list_count(ret_type->ret_data_list); - } - list_iterator_destroy(itr); - } - debug3("Got back %d", count); - while((count < msg->forward_struct->fwd_cnt)) { - pthread_cond_wait(&msg->forward_struct->notify, - &msg->forward_struct->forward_mutex); - count = 0; - if (msg->ret_list != NULL) { - itr = list_iterator_create(msg->ret_list); - while((ret_type = (ret_types_t *) list_next(itr)) - != NULL) { - count += list_count(ret_type->ret_data_list); - } - list_iterator_destroy(itr); - } - debug3("Got back %d", count); - - } - debug3("Got them all"); - slurm_mutex_unlock(&msg->forward_struct->forward_mutex); - destroy_forward_struct(msg->forward_struct); - } + forward_wait(msg); init_header(&header, msg, SLURM_PROTOCOL_NO_FLAGS); @@ -1374,9 +1341,9 @@ _send_and_recv_msg(slurm_fd fd, slurm_msg_t *req, ret_list = slurm_receive_msg(fd, resp, timeout); } - if(!ret_list || list_count(ret_list) == 0) { - no_resp_forwards(&req->forward, &ret_list, errno); - } + /* if(!ret_list || list_count(ret_list) == 0) { */ +/* no_resp_forwards(&req->forward, &ret_list, errno); */ +/* } */ /* * Attempt to close an open connection @@ -1412,14 +1379,14 @@ int slurm_send_recv_controller_msg(slurm_msg_t *req, slurm_msg_t *resp) bool backup_controller_flag; uint16_t slurmctld_timeout; - if ((fd = slurm_open_controller_conn()) < 0) { - rc = SLURM_SOCKET_ERROR; - goto cleanup; - } forward_init(&req->forward, NULL); req->ret_list = NULL; req->orig_addr.sin_addr.s_addr = 0; req->forward_struct_init = 0; + if ((fd = slurm_open_controller_conn()) < 0) { + rc = SLURM_SOCKET_ERROR; + goto cleanup; + } //info("here 2"); conf = slurm_conf_lock(); @@ -1487,8 +1454,7 @@ List slurm_send_recv_node_msg(slurm_msg_t *req, slurm_msg_t *resp, int timeout) slurm_fd fd = -1; if ((fd = slurm_open_msg_conn(&req->address)) < 0) - return NULL; //SLURM_SOCKET_ERROR; - //info("here 3"); + return NULL; return _send_and_recv_msg(fd, req, resp, timeout); @@ -1729,14 +1695,16 @@ int slurm_send_recv_rc_msg_only_one(slurm_msg_t *req, int *rc, int timeout) ret_types_t *ret_type = NULL; int ret_c = SLURM_SUCCESS; - if ((fd = slurm_open_msg_conn(&req->address)) < 0) { - return SLURM_SOCKET_ERROR; - } - forward_init(&req->forward, NULL); req->ret_list = NULL; req->orig_addr.sin_addr.s_addr = 0; + /* no need to init forward_struct_init here */ + if ((fd = slurm_open_msg_conn(&req->address)) < 0) { + return SLURM_SOCKET_ERROR; + } + + ret_list = _send_recv_rc_msg(fd, req, timeout); if(ret_list) { if(list_count(ret_list)>1) @@ -1769,7 +1737,8 @@ int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc) forward_init(&req->forward, NULL); req->ret_list = NULL; req->orig_addr.sin_addr.s_addr = 0; - + /* no need to init forward_struct_init here */ + if ((fd = slurm_open_controller_conn()) < 0) return SLURM_SOCKET_ERROR; ret_list = _send_recv_rc_msg(fd, req, 0); diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 92c456c5e71..2d9dc430422 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -335,7 +335,8 @@ unpack_header(header_t * header, Buf buffer) forward_init(&header->forward, NULL); header->ret_list = NULL; - + /* There is no forward_struct_init here so no need to init */ + safe_unpack16(&header->version, buffer); safe_unpack16(&header->flags, buffer); safe_unpack16(&uint16_tmp, buffer); diff --git a/src/sacct/sacct_stat.c b/src/sacct/sacct_stat.c index 8c5089ce8bd..9aced08594d 100644 --- a/src/sacct/sacct_stat.c +++ b/src/sacct/sacct_stat.c @@ -66,7 +66,7 @@ void *_stat_thread(void *args) &resp_msg, msg->forward.timeout); if (!ret_list) { - error("got an error"); + error("got an error no list returned"); goto cleanup; } g_slurm_auth_destroy(resp_msg.auth_cred); diff --git a/src/sbcast/agent.c b/src/sbcast/agent.c index f715bf4358d..935a6ee98cc 100644 --- a/src/sbcast/agent.c +++ b/src/sbcast/agent.c @@ -56,6 +56,7 @@ typedef struct thd { pthread_t thread; /* thread ID */ slurm_msg_t *msg; /* message to send */ int rc; /* highest return codes from RPC */ + char node_name[MAX_SLURM_NAME]; } thd_t; static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -80,7 +81,7 @@ static void *_agent_thread(void *args) error("slurm_send_recv_rc_msg: %m"); exit(1); } - + itr = list_iterator_create(ret_list); while ((ret_type = list_next(itr)) != NULL) { data_itr = list_iterator_create(ret_type->ret_data_list); @@ -91,9 +92,7 @@ static void *_agent_thread(void *args) "localhost")) { xfree(ret_data_info->node_name); ret_data_info->node_name = - xmalloc(MAX_SLURM_NAME); - getnodename(ret_data_info->node_name, - MAX_SLURM_NAME); + xstrdup(thread_ptr->node_name); } error("REQUEST_FILE_BCAST(%s): %s", ret_data_info->node_name, @@ -150,6 +149,9 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg, for (i=0; i<alloc_resp->node_cnt; i++) { int j = i; + strncpy(thread_info[threads_used].node_name, + &from.name[MAX_SLURM_NAME*i], MAX_SLURM_NAME); + forward_set(&forward[threads_used], span[threads_used], &i, &from); msg[threads_used].msg_type = REQUEST_FILE_BCAST; @@ -159,6 +161,8 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg, msg[threads_used].ret_list = NULL; msg[threads_used].orig_addr.sin_addr.s_addr = 0; msg[threads_used].srun_node_id = 0; + + threads_used++; } xfree(span); diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 6ba3fe1b2ab..e56342d2a9d 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -152,7 +152,7 @@ typedef struct mail_info { } mail_info_t; static void _alarm_handler(int dummy); -static inline void _comm_err(char *node_name); +static inline int _comm_err(char *node_name); static void _list_delete_retry(void *retry_entry); static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr); static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx); @@ -366,6 +366,7 @@ static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr) (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE)) agent_info_ptr->get_reply = true; + forward_init(&forward, NULL); forward.cnt = agent_info_ptr->thread_count; forward.name = agent_arg_ptr->node_names; forward.addr = agent_arg_ptr->slurm_addr; @@ -698,12 +699,14 @@ finished: ; } /* Report a communications error for specified node */ -static inline void _comm_err(char *node_name) +static inline int _comm_err(char *node_name) { + int rc = 1; #if AGENT_IS_THREAD - if (is_node_resp (node_name)) + if ((rc = is_node_resp (node_name))) #endif error("agent/send_recv_msg: %s: %m", node_name); + return rc; } /* @@ -730,12 +733,14 @@ static void *_thread_per_group_rpc(void *args) slurm_msg_type_t msg_type = task_ptr->msg_type; bool is_kill_msg, srun_agent; List ret_list = NULL; + List tmp_ret_list = NULL; ListIterator itr; ListIterator data_itr; ret_types_t *ret_type = NULL; ret_data_info_t *ret_data_info = NULL; int found = 0; - + forward_msg_t fwd_msg; + #if AGENT_IS_THREAD /* Locks: Write job, write node */ slurmctld_lock_t job_write_lock = { @@ -801,7 +806,7 @@ static void *_thread_per_group_rpc(void *args) msg.address = thread_ptr->slurm_addr; msg.msg_type = msg_type; msg.data = task_ptr->msg_args_ptr; - msg.forward = thread_ptr->forward; + forward_init(&msg.forward, &thread_ptr->forward); msg.ret_list = NULL; msg.orig_addr.sin_addr.s_addr = 0; msg.srun_node_id = 0; @@ -810,22 +815,69 @@ static void *_thread_per_group_rpc(void *args) //info("forwarding to %d",msg.forward.cnt); thread_ptr->end_time = thread_ptr->start_time + COMMAND_TIMEOUT; if (task_ptr->get_reply) { + send_rc_again: if ((ret_list = slurm_send_recv_rc_msg(&msg, msg.forward.timeout)) == NULL) { - if (!srun_agent) - _comm_err(thread_ptr->node_name); - goto cleanup; + if(!tmp_ret_list) + tmp_ret_list = list_create(destroy_ret_types); + + fwd_msg.header.srun_node_id = msg.srun_node_id; + forward_init(&fwd_msg.header.forward, &msg.forward); + //destroy_forward(&msg.forward); + fwd_msg.ret_list = tmp_ret_list; + strncpy(fwd_msg.node_name, + thread_ptr->node_name, + MAX_SLURM_NAME); + fwd_msg.forward_mutex = NULL; + if(forward_msg_to_next(&fwd_msg, errno)) { + msg.address = fwd_msg.addr; + forward_init(&msg.forward, + &fwd_msg.header.forward); + //destroy_forward(&fwd_msg.header.forward); + msg.srun_node_id = fwd_msg.header.srun_node_id; + strncpy(thread_ptr->node_name, + fwd_msg.node_name, + MAX_SLURM_NAME); + goto send_rc_again; + } } } else { + send_node_again: if (slurm_send_only_node_msg(&msg) < 0) { if (!srun_agent) _comm_err(thread_ptr->node_name); - } else - thread_state = DSH_DONE; + fwd_msg.header.srun_node_id = msg.srun_node_id; + fwd_msg.header.forward = msg.forward; + fwd_msg.ret_list = NULL; + strncpy(fwd_msg.node_name, + thread_ptr->node_name, + MAX_SLURM_NAME); + fwd_msg.forward_mutex = NULL; + if(forward_msg_to_next(&fwd_msg, errno)) { + msg.address = fwd_msg.addr; + msg.forward = fwd_msg.header.forward; + msg.srun_node_id = fwd_msg.header.srun_node_id; + strncpy(thread_ptr->node_name, + fwd_msg.node_name, + MAX_SLURM_NAME); + goto send_node_again; + } + } + thread_state = DSH_DONE; goto cleanup; } - + + if(tmp_ret_list) { + if(!ret_list) + ret_list = tmp_ret_list; + else { + while((ret_type = list_pop(tmp_ret_list))) + list_push(ret_list, ret_type); + list_destroy(tmp_ret_list); + } + } + //info("got %d states back from the send", list_count(ret_list)); found = 0; itr = list_iterator_create(ret_list); @@ -841,7 +893,7 @@ static void *_thread_per_group_rpc(void *args) xstrdup(thread_ptr->node_name); found = 1; } - /* info("response for %s rc = %d", */ +/* info("response for %s rc = %d", */ /* ret_data_info->node_name, */ /* ret_type->msg_rc); */ if(rc == SLURM_ERROR) { @@ -940,12 +992,20 @@ static void *_thread_per_group_rpc(void *args) while((ret_data_info = list_next(data_itr)) != NULL) if (!srun_agent) { errno = ret_type->err; - _comm_err(ret_data_info->node_name); + rc = _comm_err( + ret_data_info->node_name); } list_iterator_destroy(data_itr); if (srun_agent) thread_state = DSH_FAILED; - else /* Not serious error, don't DRAIN node */ + else if(ret_type->type == REQUEST_PING) + /* check if a forward failed */ + thread_state = DSH_NO_RESP; + else /* some will fail that don't mean anything went + bad like a job term request on a job that is + already finished, we will just exit on those + cases + */ thread_state = DSH_DONE; } ret_type->msg_rc = thread_state; @@ -954,7 +1014,7 @@ static void *_thread_per_group_rpc(void *args) cleanup: xfree(args); - destroy_forward(&thread_ptr->forward); + destroy_forward(&msg.forward); slurm_mutex_lock(thread_mutex_ptr); thread_ptr->ret_list = ret_list; thread_ptr->state = thread_state; diff --git a/src/slurmctld/node_mgr.c b/src/slurmctld/node_mgr.c index 18bd2b74b36..700710100cd 100644 --- a/src/slurmctld/node_mgr.c +++ b/src/slurmctld/node_mgr.c @@ -815,7 +815,7 @@ int update_node ( update_node_msg_t * update_node_msg ) last_node_update = time (NULL); while ( (this_node_name = hostlist_shift (host_list)) ) { int err_code = 0; - state_val = update_node_msg -> node_state; + state_val = update_node_msg->node_state; node_ptr = find_node_record (this_node_name); node_inx = node_ptr - node_record_table_ptr; if (node_ptr == NULL) { @@ -1379,7 +1379,7 @@ void node_did_resp (char *name) _node_did_resp(node_ptr); } #else - debug3("updating %s",name); + debug2("updating %s",name); node_ptr = find_node_record (name); if (node_ptr == NULL) { error ("node_did_resp unable to find node %s", name); diff --git a/src/slurmctld/ping_nodes.c b/src/slurmctld/ping_nodes.c index a967497673e..5998fe79f66 100644 --- a/src/slurmctld/ping_nodes.c +++ b/src/slurmctld/ping_nodes.c @@ -136,7 +136,7 @@ void ping_nodes (void) agent_arg_t *reg_agent_args; pthread_attr_t reg_attr_agent; pthread_t reg_thread_agent; - + ping_agent_args = xmalloc (sizeof (agent_arg_t)); ping_agent_args->msg_type = REQUEST_PING; ping_agent_args->retry = 0; @@ -159,7 +159,7 @@ void ping_nodes (void) (last_ping_time == (time_t) 0) ) node_dead_time = (time_t) 0; else - node_dead_time = last_ping_time - slurmctld_conf.slurmd_timeout; + node_dead_time = last_ping_time-slurmctld_conf.slurmd_timeout; still_live_time = now - (slurmctld_conf.slurmd_timeout / 2); last_ping_time = now; @@ -170,10 +170,11 @@ void ping_nodes (void) for (i = 0; i < node_record_count; i++) { struct node_record *node_ptr; - + node_ptr = &node_record_table_ptr[i]; base_state = node_ptr->node_state & NODE_STATE_BASE; no_resp_flag = node_ptr->node_state & NODE_STATE_NO_RESPOND; + if ((slurmctld_conf.slurmd_timeout == 0) && (base_state != NODE_STATE_UNKNOWN)) continue; @@ -218,8 +219,8 @@ void ping_nodes (void) xrealloc ((reg_agent_args->node_names), (MAX_SLURM_NAME * reg_buf_rec_size)); } - reg_agent_args->slurm_addr[reg_agent_args->node_count] = - node_ptr->slurm_addr; + reg_agent_args->slurm_addr[reg_agent_args->node_count] + = node_ptr->slurm_addr; pos = MAX_SLURM_NAME * reg_agent_args->node_count; strncpy (®_agent_args->node_names[pos], node_ptr->name, MAX_SLURM_NAME); @@ -253,7 +254,7 @@ void ping_nodes (void) hostlist_uniq(ping_hostlist); hostlist_ranged_string(ping_hostlist, sizeof(host_str), host_str); - debug2 ("Spawning ping agent for %s", host_str); + info("Spawning ping agent for %s", host_str); ping_begin(); slurm_attr_init (&ping_attr_agent); if (pthread_attr_setdetachstate (&ping_attr_agent, diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 6a0d12fca80..a518d7a2bb5 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -287,6 +287,7 @@ _send_slurmstepd_init(int fd, slurmd_step_type_t type, void *req, "NodeName %s", parent_alias); /* parent_rank = -1; */ } + free(parent_alias); } #else /* In FRONT_END mode, one slurmd pretends to be all @@ -949,7 +950,7 @@ _rpc_reconfig(slurm_msg_t *msg, slurm_addr *cli_addr) (unsigned int) req_uid); else kill(conf->pid, SIGHUP); - + forward_wait(msg); /* Never return a message, slurmctld does not expect one */ } @@ -965,7 +966,8 @@ _rpc_shutdown(slurm_msg_t *msg, slurm_addr *cli_addr) if (kill(conf->pid, SIGTERM) != 0) error("kill(%u,SIGTERM): %m", conf->pid); } - + forward_wait(msg); + /* Never return a message, slurmctld does not expect one */ } diff --git a/src/slurmd/slurmd/slurmd.c b/src/slurmd/slurmd/slurmd.c index fbcc58cf40d..053a6b128d5 100644 --- a/src/slurmd/slurmd/slurmd.c +++ b/src/slurmd/slurmd/slurmd.c @@ -524,10 +524,9 @@ _read_config() { char *path_pubkey = NULL; slurm_ctl_conf_t *cf = NULL; - slurm_conf_reinit(conf->conffile); cf = slurm_conf_lock(); - + slurm_mutex_lock(&conf->config_mutex); if (conf->conffile == NULL) @@ -591,14 +590,13 @@ static void _reconfigure(void) { slurm_ctl_conf_t *cf; - + _reconfig = 0; - _read_config(); - + _update_logging(); _print_conf(); - + /* * Make best effort at changing to new public key */ @@ -961,8 +959,9 @@ _term_handler(int signum) static void _hup_handler(int signum) { - if (signum == SIGHUP) + if (signum == SIGHUP) { _reconfig = 1; + } } -- GitLab