From f4e0330e0a3b4b4155e8a4be734d9a78313fb0be Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Tue, 27 May 2003 23:02:57 +0000 Subject: [PATCH] o slurm_receive_msg() now takes timeout argument. Updated callers, including slurm_send_recv_node_msg(), slurm_send_recv_rc_msg(), etc. o Fixed fd leak in agent.c using slurm_send_recv_rc_msg() w/ timeout. --- src/api/config_info.c | 6 +- src/api/job_info.c | 2 +- src/api/reconfigure.c | 2 +- src/common/slurm_protocol_api.c | 66 +++++++------ src/common/slurm_protocol_api.h | 19 ++-- src/slurmctld/agent.c | 162 +++++++++----------------------- src/slurmctld/controller.c | 140 +++++---------------------- src/slurmd/slurmd.c | 2 +- src/srun/io.c | 6 +- src/srun/launch.c | 22 +---- src/srun/msg.c | 2 +- src/srun/signals.c | 2 +- 12 files changed, 136 insertions(+), 295 deletions(-) diff --git a/src/api/config_info.c b/src/api/config_info.c index 42361aff445..f2c17ace684 100644 --- a/src/api/config_info.c +++ b/src/api/config_info.c @@ -142,19 +142,19 @@ slurm_load_ctl_conf (time_t update_time, slurm_ctl_conf_t **confp) req_msg.msg_type = REQUEST_BUILD_INFO; req_msg.data = &req; - if (slurm_send_recv_controller_msg(&req_msg, &resp_msg) < 0) + if (slurm_send_recv_controller_msg(&req_msg, &resp_msg) < 0) return SLURM_ERROR; switch (resp_msg.msg_type) { case RESPONSE_BUILD_INFO: *confp = (slurm_ctl_conf_info_msg_t *) resp_msg.data; - break ; + break; case RESPONSE_SLURM_RC: rc = ((return_code_msg_t *) resp_msg.data)->return_code; slurm_free_return_code_msg(resp_msg.data); if (rc) slurm_seterrno_ret(rc); - break ; + break; default: slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); break; diff --git a/src/api/job_info.c b/src/api/job_info.c index 0d58e452068..7d4531100de 100644 --- a/src/api/job_info.c +++ b/src/api/job_info.c @@ -263,7 +263,7 @@ slurm_pid2jobid (pid_t job_pid, uint32_t *jobid) req_msg.msg_type = REQUEST_JOB_ID; req_msg.data = &req; - if (slurm_send_recv_node_msg(&req_msg, &resp_msg) < 0) + if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0) < 0) return SLURM_ERROR; switch (resp_msg.msg_type) { diff --git a/src/api/reconfigure.c b/src/api/reconfigure.c index 41b5d621107..ca88fbefc00 100644 --- a/src/api/reconfigure.c +++ b/src/api/reconfigure.c @@ -126,7 +126,7 @@ _send_message_controller (enum controller_id dest, slurm_msg_t *req) if (slurm_send_node_msg(fd, req) < 0) slurm_seterrno_ret(SLURM_COMMUNICATIONS_SEND_ERROR); - if ((rc = slurm_receive_msg(fd, &resp_msg)) < 0) + if ((rc = slurm_receive_msg(fd, &resp_msg, 0)) < 0) slurm_seterrno_ret(SLURM_COMMUNICATIONS_RECEIVE_ERROR); if (slurm_shutdown_msg_conn(fd) != SLURM_SUCCESS) diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index e9aca7d106f..5c3bca500ed 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -57,6 +57,7 @@ /* #DEFINES */ #define _DEBUG 0 +#define SLURM_DEFAULT_TIMEOUT 2000 /* STATIC VARIABLES */ static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER; @@ -309,7 +310,7 @@ int slurm_close_accepted_conn(slurm_fd open_fd) * OUT msg - a slurm_msg struct to be filled in by the function * RET int - size of msg received in bytes before being unpacked */ -int slurm_receive_msg(slurm_fd fd, slurm_msg_t * msg) +int slurm_receive_msg(slurm_fd fd, slurm_msg_t * msg, int timeout) { char *buf = NULL; size_t buflen = 0; @@ -319,13 +320,20 @@ int slurm_receive_msg(slurm_fd fd, slurm_msg_t * msg) void *auth_cred; Buf buffer; + xassert(fd >= 0); + + if ((timeout*=1000) == 0) + timeout = SLURM_DEFAULT_TIMEOUT; + /* * Receive a msg. slurm_msg_recvfrom() will read the message * length and allocate space on the heap for a buffer containing * the message. */ - if (_slurm_msg_recvfrom(fd, &buf, &buflen, 0) < 0) + if (_slurm_msg_recvfrom_timeout(fd, &buf, &buflen, 0, timeout) < 0) { + error("recvfrom_timeout: %m"); return SLURM_ERROR; + } #if _DEBUG _print_data (buftemp,rc); @@ -455,6 +463,9 @@ int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg) get_buf_offset(buffer), SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ); + if (rc < 0) + error("slurm_msg_sendto: %m"); + free_buf(buffer); return rc; } @@ -710,42 +721,43 @@ int slurm_unpack_slurm_addr_no_alloc(slurm_addr * slurm_address, * IN request_msg - slurm_msg the request msg * IN rc - the return_code to send back to the client */ -void slurm_send_rc_msg(slurm_msg_t * request_msg, int rc) +void slurm_send_rc_msg(slurm_msg_t *msg, int rc) { - slurm_msg_t response_msg; + slurm_msg_t resp_msg; return_code_msg_t rc_msg; - /* no change */ rc_msg.return_code = rc; - /* init response_msg structure */ - response_msg.address = request_msg->address; - response_msg.msg_type = RESPONSE_SLURM_RC; - response_msg.data = &rc_msg; + + resp_msg.address = msg->address; + resp_msg.msg_type = RESPONSE_SLURM_RC; + resp_msg.data = &rc_msg; /* send message */ - slurm_send_node_msg(request_msg->conn_fd, &response_msg); + slurm_send_node_msg(msg->conn_fd, &resp_msg); } /* * Send and recv a slurm request and response on the open slurm descriptor */ static int -_send_and_recv_msg(slurm_fd fd, slurm_msg_t *req, slurm_msg_t *resp) +_send_and_recv_msg(slurm_fd fd, slurm_msg_t *req, slurm_msg_t *resp, + int timeout) { - int rc = SLURM_SUCCESS; + int err = SLURM_SUCCESS; if ( (slurm_send_node_msg(fd, req) < 0) - || (slurm_receive_msg(fd, resp) < 0) ) - rc = SLURM_ERROR; + || (slurm_receive_msg(fd, resp, timeout) < 0) ) + err = errno; /* * Attempt to close an open connection */ if (slurm_shutdown_msg_conn(fd) < 0) - rc = SLURM_ERROR; + return SLURM_ERROR; - return rc; + if (err) slurm_seterrno_ret(err); + return SLURM_SUCCESS; } /* slurm_send_recv_controller_msg @@ -762,7 +774,7 @@ int slurm_send_recv_controller_msg(slurm_msg_t *req, slurm_msg_t *resp) if ((fd = slurm_open_controller_conn()) < 0) return SLURM_SOCKET_ERROR; - return _send_and_recv_msg(fd, req, resp); + return _send_and_recv_msg(fd, req, resp, 0); } /* slurm_send_recv_node_msg @@ -772,14 +784,14 @@ int slurm_send_recv_controller_msg(slurm_msg_t *req, slurm_msg_t *resp) * OUT response_msg - slurm_msg response * RET int - return code */ -int slurm_send_recv_node_msg(slurm_msg_t *req, slurm_msg_t *resp) +int slurm_send_recv_node_msg(slurm_msg_t *req, slurm_msg_t *resp, int timeout) { slurm_fd fd = -1; if ((fd = slurm_open_msg_conn(&req->address)) < 0) return SLURM_SOCKET_ERROR; - return _send_and_recv_msg(fd, req, resp); + return _send_and_recv_msg(fd, req, resp, timeout); } @@ -836,15 +848,15 @@ int slurm_send_only_node_msg(slurm_msg_t *req) * Send message and recv "return code" message on an already open * slurm file descriptor */ -static int _send_recv_rc_msg(slurm_fd fd, slurm_msg_t *req, int *rc) +static int _send_recv_rc_msg(slurm_fd fd, slurm_msg_t *req, int *rc, + int timeout) { int retval = SLURM_SUCCESS; slurm_msg_t msg; - retval = _send_and_recv_msg(fd, req, &msg); - slurm_shutdown_msg_conn(fd); + retval = _send_and_recv_msg(fd, req, &msg, timeout); - if (retval != SLURM_SUCCESS) + if (retval != SLURM_SUCCESS) goto done; if (msg.msg_type != RESPONSE_SLURM_RC) @@ -862,14 +874,14 @@ static int _send_recv_rc_msg(slurm_fd fd, slurm_msg_t *req, int *rc) * Then read back an "rc" message returning the "return_code" specified * in the response in the "rc" parameter. */ -int slurm_send_recv_rc_msg(slurm_msg_t *req, int *rc) +int slurm_send_recv_rc_msg(slurm_msg_t *req, int *rc, int timeout) { slurm_fd fd = -1; - if ((fd = slurm_open_msg_conn(&req->address)) < 0) + if ((fd = slurm_open_msg_conn(&req->address)) < 0) return SLURM_SOCKET_ERROR; - return _send_recv_rc_msg(fd, req, rc); + return _send_recv_rc_msg(fd, req, rc, timeout); } /* @@ -882,7 +894,7 @@ int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc) if ((fd = slurm_open_controller_conn()) < 0) return SLURM_SOCKET_ERROR; - return _send_recv_rc_msg(fd, req, rc); + return _send_recv_rc_msg(fd, req, rc, 0); } /* diff --git a/src/common/slurm_protocol_api.h b/src/common/slurm_protocol_api.h index eda31b23b68..ee57636460a 100644 --- a/src/common/slurm_protocol_api.h +++ b/src/common/slurm_protocol_api.h @@ -143,13 +143,16 @@ int inline slurm_shutdown_msg_engine(slurm_fd open_fd); \**********************************************************************/ /* - * NOTE: memory is allocated for the returned msg and must be freed at - * some point using the slurm_free_functions - * IN open_fd - file descriptor to receive msg on - * OUT msg - a slurm_msg struct to be filled in by the function - * RET int - size of msg received in bytes before being unpacked + * Receive a slurm message on the open slurm descriptor "fd" waiting + * at most "timeout" seconds for the message data. If timeout is + * zero, a default timeout is used. Memory for the message data + * (msg->data) is allocated from within this function, and must be + * freed at some point using one of the slurm_free* functions. + * + * Returns SLURM_SUCCESS if an entire message is successfully + * received. Otherwise SLURM_ERROR is returned. */ -int slurm_receive_msg(slurm_fd open_fd, slurm_msg_t * msg); +int slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout); /**********************************************************************\ * send message functions @@ -392,13 +395,13 @@ int slurm_send_recv_controller_msg(slurm_msg_t * request_msg, * RET int - return code */ int slurm_send_recv_node_msg(slurm_msg_t * request_msg, - slurm_msg_t * response_msg); + slurm_msg_t * response_msg, int timeout); /* * Open a connection to req->address, send message and receive * a "return code" message, returning return code in "rc" */ -int slurm_send_recv_rc_msg(slurm_msg_t *req, int *rc); +int slurm_send_recv_rc_msg(slurm_msg_t *req, int *rc, int timeout); /* * Same as above, but send to controller diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 9848532d5ea..23b2c02a111 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -65,10 +65,11 @@ #include "src/common/list.h" #include "src/common/log.h" #include "src/common/macros.h" -#include "src/common/slurm_protocol_defs.h" +#include "src/common/xsignal.h" #include "src/common/xassert.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" +#include "src/common/slurm_protocol_api.h" #include "src/slurmctld/agent.h" #include "src/slurmctld/locks.h" @@ -125,7 +126,6 @@ static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr); static void *_thread_per_node_rpc(void *args); static int _valid_agent_arg(agent_arg_t *agent_arg_ptr); static void *_wdog(void *args); -static void _xsignal(int signal, void (*handler) (int)); static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER; static List retry_list = NULL; /* agent_arg_t list for retry */ @@ -151,6 +151,8 @@ void *agent(void *args) if (_valid_agent_arg(agent_arg_ptr)) goto cleanup; + xsignal(SIGALRM, _alarm_handler); + /* initialize the agent data structures */ agent_info_ptr = _make_agent_info(agent_arg_ptr); thread_ptr = agent_info_ptr->thread_struct; @@ -355,9 +357,12 @@ static void *_wdog(void *args) work_done = false; delay = difftime(time(NULL), thread_ptr[i].time); - if (delay >= COMMAND_TIMEOUT) + if (delay >= COMMAND_TIMEOUT) { + debug3("thd %d timed out\n", + thread_ptr[i].thread); pthread_kill(thread_ptr[i].thread, SIGALRM); + } break; case DSH_NEW: work_done = false; @@ -465,99 +470,44 @@ static void *_wdog(void *args) */ static void *_thread_per_node_rpc(void *args) { - int msg_size = 0; - int rc; - slurm_fd sockfd; - slurm_msg_t request_msg; - slurm_msg_t *response_msg = xmalloc(sizeof(slurm_msg_t)); - return_code_msg_t *slurm_rc_msg; + int rc = SLURM_SUCCESS; + int timeout = 0; + slurm_msg_t msg; task_info_t *task_ptr = (task_info_t *) args; thd_t *thread_ptr = task_ptr->thread_struct_ptr; state_t thread_state = DSH_NO_RESP; - sigset_t set; + #if AGENT_IS_THREAD struct node_record *node_ptr; /* Locks: Write write node */ slurmctld_lock_t node_write_lock = { NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK }; #endif + xassert(args != NULL); - /* set up SIGALRM handler */ - if (sigemptyset(&set)) - error("sigemptyset error: %m"); - if (sigaddset(&set, SIGALRM)) - error("sigaddset error on SIGALRM: %m"); - if (sigprocmask(SIG_UNBLOCK, &set, NULL) != 0) - fatal("sigprocmask error: %m"); - _xsignal(SIGALRM, _alarm_handler); - - if (args == NULL) - fatal("_thread_per_node_rpc has NULL argument"); slurm_mutex_lock(task_ptr->thread_mutex_ptr); thread_ptr->state = DSH_ACTIVE; thread_ptr->time = time(NULL); slurm_mutex_unlock(task_ptr->thread_mutex_ptr); - /* init message connection for message communication */ - if ((sockfd = slurm_open_msg_conn(&thread_ptr->slurm_addr)) - == SLURM_SOCKET_ERROR) { - error( - "_thread_per_node_rpc/slurm_open_msg_conn to host %s: %m", - thread_ptr->node_name); - goto cleanup; - } - /* send request message */ - request_msg.msg_type = task_ptr->msg_type; - request_msg.data = task_ptr->msg_args_ptr; - if ((rc = slurm_send_node_msg(sockfd, &request_msg)) - == SLURM_SOCKET_ERROR) { - error( - "_thread_per_node_rpc/slurm_send_node_msg to host %s: %m", - thread_ptr->node_name); - goto cleanup; - } - + msg.address = thread_ptr->slurm_addr; + msg.msg_type = task_ptr->msg_type; + msg.data = task_ptr->msg_args_ptr; - if (task_ptr->msg_type == REQUEST_KILL_TIMELIMIT) { - int kill_wait; -#if AGENT_IS_THREAD - kill_wait = slurmctld_conf.kill_wait; -#else - fatal("get kill_wait from elsewhere"); -#endif - slurm_mutex_lock(task_ptr->thread_mutex_ptr); - thread_ptr->time = time(NULL) + kill_wait; - slurm_mutex_unlock(task_ptr->thread_mutex_ptr); - sleep(kill_wait); - } + if (task_ptr->msg_type == REQUEST_KILL_TIMELIMIT) + timeout = slurmctld_conf.kill_wait; - /* receive message as needed (most message types) */ - if (task_ptr->get_reply && - ((msg_size = slurm_receive_msg(sockfd, response_msg)) - == SLURM_SOCKET_ERROR)) { - error( - "_thread_per_node_rpc/slurm_receive_msg host=%s msg=%u, error=%m", - thread_ptr->node_name, task_ptr->msg_type); + if (slurm_send_recv_rc_msg(&msg, &rc, timeout) < 0) { + error("agent: %s: %m", thread_ptr->node_name); goto cleanup; } - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) == SLURM_SOCKET_ERROR) { - error( - "_thread_per_node_rpc/slurm_shutdown_msg_conn to host %s: %m", - thread_ptr->node_name); - goto cleanup; - } if (!task_ptr->get_reply) { thread_state = DSH_DONE; goto cleanup; } - if (msg_size) { - error("_thread_per_node_rpc/msg_size to host %s error %d", - thread_ptr->node_name, msg_size); - goto cleanup; - } + #if AGENT_IS_THREAD /* SPECIAL CASE: Immediately mark node as IDLE on job kill reply */ @@ -578,39 +528,31 @@ static void *_thread_per_node_rpc(void *args) } #endif - switch (response_msg->msg_type) { - case RESPONSE_SLURM_RC: - slurm_rc_msg = (return_code_msg_t *) response_msg->data; - rc = slurm_rc_msg->return_code; - slurm_free_return_code_msg(slurm_rc_msg); - if (rc == SLURM_SUCCESS) { - debug3("agent processed RPC to node %s", - thread_ptr->node_name); - thread_state = DSH_DONE; - } else if (rc == ESLURMD_EPILOG_FAILED) { - error("Epilog failure on host %s, setting DOWN", - thread_ptr->node_name); - thread_state = DSH_FAILED; - } else if (rc == ESLURMD_PROLOG_FAILED) { - error("Prolog failure on host %s, setting DOWN", - thread_ptr->node_name); - thread_state = DSH_FAILED; - } else if (rc == ESLURM_INVALID_JOB_ID) { - /* Not indicative of a real error */ - debug2("agent processed RPC to node %s, error %s", - thread_ptr->node_name, "Invalid Job Id"); - thread_state = DSH_DONE; - } else { - error("agent error from host %s: %s", - thread_ptr->node_name, - slurm_strerror(rc)); /* Don't use %m */ - thread_state = DSH_DONE; - } + switch (rc) { + case SLURM_SUCCESS: + debug3("agent processed RPC to node %s", thread_ptr->node_name); + thread_state = DSH_DONE; break; - default: - error("agent reply from host %s, bad msg_type %d", - thread_ptr->node_name, response_msg->msg_type); + case ESLURMD_EPILOG_FAILED: + error("Epilog failure on host %s, setting DOWN", + thread_ptr->node_name); + thread_state = DSH_FAILED; break; + case ESLURMD_PROLOG_FAILED: + error("Prolog failure on host %s, setting DOWN", + thread_ptr->node_name); + thread_state = DSH_FAILED; + break; + case ESLURM_INVALID_JOB_ID: /* Not indicative of a real error */ + debug2("agent processed RPC to node %s, error %s", + thread_ptr->node_name, "Invalid Job Id"); + thread_state = DSH_DONE; + break; + + default: + error("agent error from host %s: %s", + thread_ptr->node_name, slurm_strerror(rc)); + thread_state = DSH_DONE; } cleanup: @@ -623,26 +565,10 @@ static void *_thread_per_node_rpc(void *args) pthread_cond_signal(task_ptr->thread_cond_ptr); slurm_mutex_unlock(task_ptr->thread_mutex_ptr); - slurm_free_msg(response_msg); xfree(args); return (void *) NULL; } -/* - * Emulate signal() but with BSD semantics (i.e. don't restore signal to - * SIGDFL prior to executing handler). - */ -static void _xsignal(int signal, void (*handler) (int)) -{ - struct sigaction sa, old_sa; - - sa.sa_handler = handler; - sigemptyset(&sa.sa_mask); - sigaddset(&sa.sa_mask, signal); - sa.sa_flags = 0; - sigaction(signal, &sa, &old_sa); -} - /* * SIGALRM handler. This is just a stub because we are really interested * in interrupting connect() in k4cmd/rcmd or select() in rsh() below and diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index fe1865524c2..598c056f76c 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -470,15 +470,13 @@ static void *_slurmctld_rpc_mgr(void *no_data) */ static void *_service_connection(void *arg) { - int error_code; slurm_fd newsockfd = ((connection_arg_t *) arg)->newsockfd; slurm_msg_t *msg = NULL; void *return_code = NULL; msg = xmalloc(sizeof(slurm_msg_t)); - if ((error_code = slurm_receive_msg(newsockfd, msg)) - == SLURM_SOCKET_ERROR) { + if (slurm_receive_msg(newsockfd, msg, 0) < 0) { error("slurm_receive_msg (_service_connection) error %m"); } else { if (msg->msg_type == REQUEST_SHUTDOWN_IMMEDIATE) @@ -2307,8 +2305,7 @@ static void *_background_rpc_mgr(void *no_data) msg = xmalloc(sizeof(slurm_msg_t)); msg->conn_fd = newsockfd; - if (slurm_receive_msg(newsockfd, msg) - == SLURM_SOCKET_ERROR) + if (slurm_receive_msg(newsockfd, msg, 0) < 0) error("slurm_receive_msg error %m"); else { error_code = _background_process_msg(msg); @@ -2365,66 +2362,30 @@ static int _background_process_msg(slurm_msg_t * msg) * RET 0 if no error */ static int _ping_controller(void) { - int rc, msg_size; - slurm_fd sockfd; - slurm_msg_t request_msg; - slurm_msg_t response_msg; - return_code_msg_t *slurm_rc_msg; - slurm_addr primary_addr; + int rc; + slurm_msg_t req; debug3("pinging slurmctld at %s", slurmctld_conf.control_addr); - /* init message connection for message communication with - * primary controller */ - slurm_set_addr(&primary_addr, slurmctld_conf.slurmctld_port, - slurmctld_conf.control_addr); - if ((sockfd = slurm_open_msg_conn(&primary_addr)) - == SLURM_SOCKET_ERROR) { - error("_ping_controller/slurm_open_msg_conn: %m"); - return SLURM_SOCKET_ERROR; - } - /* send request message */ - request_msg.msg_type = REQUEST_PING; + /* + * Set address of controller to ping + */ + slurm_set_addr(&req.address, slurmctld_conf.slurmctld_port, + slurmctld_conf.control_addr); - if ((rc = slurm_send_node_msg(sockfd, &request_msg)) - == SLURM_SOCKET_ERROR) { - error("_ping_controller/slurm_send_node_msg error: %m"); - return SLURM_SOCKET_ERROR; - } - /* receive message */ - if ((msg_size = slurm_receive_msg(sockfd, &response_msg)) - == SLURM_SOCKET_ERROR) { - error("_ping_controller/slurm_receive_msg error: %m"); - return SLURM_SOCKET_ERROR; - } + req.msg_type = REQUEST_PING; - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) - == SLURM_SOCKET_ERROR) { - error("_ping_controller/slurm_shutdown_msg_conn error: %m"); - return SLURM_SOCKET_ERROR; + if (slurm_send_recv_rc_msg(&req, &rc, 0) < 0) { + error("_ping_controller/slurm_send_node_msg error: %m"); + return SLURM_ERROR; } - if (msg_size) - return msg_size; - - switch (response_msg.msg_type) { - case RESPONSE_SLURM_RC: - slurm_rc_msg = (return_code_msg_t *) response_msg.data; - rc = slurm_rc_msg->return_code; - slurm_free_return_code_msg(slurm_rc_msg); - if (rc) { - error("_ping_controller/response error %d", rc); - return SLURM_PROTOCOL_ERROR; - } - break; - default: - error("_ping_controller/unexpected message type %d", - response_msg.msg_type); + if (rc) { + error("_ping_controller/response error %d", rc); return SLURM_PROTOCOL_ERROR; - break; } + return SLURM_PROTOCOL_SUCCESS; } @@ -2436,77 +2397,27 @@ static int _ping_controller(void) int shutdown_backup_controller(void) { int rc; - int msg_size; - slurm_fd sockfd; - slurm_msg_t request_msg; - slurm_msg_t response_msg; - return_code_msg_t *slurm_rc_msg; - slurm_addr secondary_addr; + slurm_msg_t req; if ((slurmctld_conf.backup_addr == NULL) || (strlen(slurmctld_conf.backup_addr) == 0)) return SLURM_PROTOCOL_SUCCESS; - /* init message connection for message communication with - * primary controller */ - slurm_set_addr(&secondary_addr, slurmctld_conf.slurmctld_port, + slurm_set_addr(&req.address, slurmctld_conf.slurmctld_port, slurmctld_conf.backup_addr); - if ((sockfd = slurm_open_msg_conn(&secondary_addr)) - == SLURM_SOCKET_ERROR) { - error - ("shutdown_backup_controller/slurm_open_msg_conn: %m"); - return SLURM_SOCKET_ERROR; - } /* send request message */ - request_msg.msg_type = REQUEST_CONTROL; - request_msg.data = NULL; - - if ((rc = slurm_send_node_msg(sockfd, &request_msg)) - == SLURM_SOCKET_ERROR) { - error - ("shutdown_backup_controller/slurm_send_node_msg error: %m"); - return SLURM_SOCKET_ERROR; - } - - /* receive message */ - if ((msg_size = slurm_receive_msg(sockfd, &response_msg)) - == SLURM_SOCKET_ERROR) { - error - ("shutdown_backup_controller/slurm_receive_msg error: %m"); - return SLURM_SOCKET_ERROR; - } + req.msg_type = REQUEST_CONTROL; + req.data = NULL; - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) - == SLURM_SOCKET_ERROR) { - error - ("shutdown_backup_controller/slurm_shutdown_msg_conn error: %m"); + if (slurm_send_recv_rc_msg(&req, &rc, 0) < 0) { + error("shutdown_backup:send/recv: %m"); return SLURM_SOCKET_ERROR; } - if (msg_size) - return msg_size; - - switch (response_msg.msg_type) { - case RESPONSE_SLURM_RC: - slurm_rc_msg = (return_code_msg_t *) response_msg.data; - rc = slurm_rc_msg->return_code; - slurm_free_return_code_msg(slurm_rc_msg); - if (rc) { - error - ("shutdown_backup_controller/response error %d", - rc); - return SLURM_PROTOCOL_ERROR; - } else - info("BackupController told to shutdown"); - break; - default: - error - ("shutdown_backup_controller/unexpected message type %d", - response_msg.msg_type); - return SLURM_PROTOCOL_ERROR; - break; + if (rc) { + error("shutdown_backup: %s", slurm_strerror(rc)); + return SLURM_ERROR; } /* FIXME: Ideally the REQUEST_CONTROL RPC does not return until all @@ -2515,6 +2426,7 @@ int shutdown_backup_controller(void) * so the state save should occur right away). We sleep for a while * here and give the backup controller time to shutdown */ sleep(2); + return SLURM_PROTOCOL_SUCCESS; } diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index 66b61699406..2005fdc224a 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -295,7 +295,7 @@ _service_connection(void *arg) conn_t *con = (conn_t *) arg; slurm_msg_t *msg = xmalloc(sizeof(*msg)); - if ((rc = slurm_receive_msg(con->fd, msg)) < 0) { + if ((rc = slurm_receive_msg(con->fd, msg, 0)) < 0) { error("slurm_receive_msg: %m"); slurm_free_msg(msg); } else { diff --git a/src/srun/io.c b/src/srun/io.c index 69d1b7973c2..0c10ba3761d 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -236,6 +236,10 @@ _io_thr_init(job_t *job, struct pollfd *fds) xassert(job != NULL); + /* + * XXX: Handle job->ofname/efname == IO_ONE + */ + _set_iofds_nonblocking(job); if (job->ofname->type == IO_ALL) @@ -262,8 +266,6 @@ _io_thr_init(job_t *job, struct pollfd *fds) for (i = 0; i < job->niofds; i++) _poll_set_rd(fds[i], job->iofd[i]); - - } static void diff --git a/src/srun/launch.c b/src/srun/launch.c index 796bfdec739..ebad31a4a7a 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -277,24 +277,10 @@ static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job) static int _send_msg_rc(slurm_msg_t *msg) { - slurm_msg_t resp; - return_code_msg_t *rcmsg = NULL; - int rc = 0; - - if ((rc = slurm_send_recv_node_msg(msg, &resp)) < 0) - return rc; - - switch (resp.msg_type) { - case RESPONSE_SLURM_RC: - rcmsg = resp.data; - rc = rcmsg->return_code; - slurm_free_return_code_msg(rcmsg); - break; - default: - error("recvd msg type %d. expected %d", resp.msg_type, - RESPONSE_SLURM_RC); - rc = SLURM_UNEXPECTED_MSG_ERROR; - } + int rc = 0; + + if ((rc = slurm_send_recv_rc_msg(msg, &rc, 0)) < 0) + return SLURM_ERROR; slurm_seterrno_ret (rc); } diff --git a/src/srun/msg.c b/src/srun/msg.c index a5205487bf9..b4252db7a31 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -524,7 +524,7 @@ _accept_msg_connection(job_t *job, int fdnum) msg = xmalloc(sizeof(*msg)); again: - if (slurm_receive_msg(fd, msg) == SLURM_SOCKET_ERROR) { + if (slurm_receive_msg(fd, msg, 0) < 0) { if (errno == EINTR) goto again; error("slurm_receive_msg[%s]: %m", host); diff --git a/src/srun/signals.c b/src/srun/signals.c index c7e8bf09691..cc47a223052 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -327,7 +327,7 @@ static void * _p_signal_task(void *args) char *host = job->host[info->host_inx]; debug3("sending signal to host %s", host); - if (slurm_send_recv_rc_msg(req, &rc) < 0) { + if (slurm_send_recv_rc_msg(req, &rc, 0) < 0) { error("%s: signal: %m", host); goto done; } -- GitLab