diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 57f0770114d1df77ed63e4e40b22a429992e8c16..e9aca7d106f6a36b07909a0d0c1af3e1b1170300 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -1,5 +1,6 @@ /*****************************************************************************\ * slurm_protocol_api.c - high-level slurm communication functions + * $Id$ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -55,7 +56,7 @@ /* EXTERNAL VARIABLES */ /* #DEFINES */ -#define _DEBUG 0 +#define _DEBUG 0 /* STATIC VARIABLES */ static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER; @@ -66,152 +67,146 @@ static slurm_ctl_conf_t slurmctld_conf; /**********************************************************************\ * protocol configuration functions \**********************************************************************/ - -#if _DEBUG -static void _print_data(char *data, int len) -{ - int i; - for (i = 0; i < len; i++) { - if ((i % 10 == 0) && (i != 0)) - printf("\n"); - printf("%2.2x ", ((int) data[i] & 0xff)); - if (i >= 200) - break; - } - printf("\n\n"); -} -#endif - /* slurm_set_api_config * sets the slurm_protocol_config object * NOT THREAD SAFE - * IN protocol_conf - slurm_protocol_config object + * IN protocol_conf - slurm_protocol_config object + * + * XXX: Why isn't the "config_lock" mutex used here? */ int slurm_set_api_config(slurm_protocol_config_t * protocol_conf) { - proto_conf = protocol_conf; - return SLURM_SUCCESS; + proto_conf = protocol_conf; + return SLURM_SUCCESS; } /* slurm_get_api_config * returns a pointer to the current slurm_protocol_config object - * RET slurm_protocol_config_t - current slurm_protocol_config object + * RET slurm_protocol_config_t - current slurm_protocol_config object */ slurm_protocol_config_t *slurm_get_api_config() { - return proto_conf; + return proto_conf; } /* slurm_api_set_default_config - * called by the send_controller_msg function to insure that at least - * the compiled in default slurm_protocol_config object is initialized - * RET int - return code + * called by the send_controller_msg function to insure that at least + * the compiled in default slurm_protocol_config object is initialized + * RET int - return code */ int slurm_api_set_default_config() { - int rc = SLURM_SUCCESS; - - slurm_mutex_lock(&config_lock); - if ((slurmctld_conf.control_addr != NULL) && - (slurmctld_conf.slurmctld_port != 0)) - goto cleanup; - - read_slurm_conf_ctl(&slurmctld_conf); - if ((slurmctld_conf.control_addr == NULL) || - (slurmctld_conf.slurmctld_port == 0)) { - error("Unable to establish control machine or port"); - rc =SLURM_ERROR; - goto cleanup; - } - - slurm_set_addr(&proto_conf_default.primary_controller, - slurmctld_conf.slurmctld_port, - slurmctld_conf.control_addr); - if (proto_conf_default.primary_controller.sin_port == 0) { - error("Unable to establish control machine address"); - rc =SLURM_ERROR; - goto cleanup; - } - - if (slurmctld_conf.backup_addr) { - slurm_set_addr(&proto_conf_default.secondary_controller, - slurmctld_conf.slurmctld_port, - slurmctld_conf.backup_addr); - } - proto_conf = &proto_conf_default; + int rc = SLURM_SUCCESS; + + slurm_mutex_lock(&config_lock); + if ((slurmctld_conf.control_addr != NULL) && + (slurmctld_conf.slurmctld_port != 0)) + goto cleanup; + + read_slurm_conf_ctl(&slurmctld_conf); + if ((slurmctld_conf.control_addr == NULL) || + (slurmctld_conf.slurmctld_port == 0)) { + error("Unable to establish control machine or port"); + rc =SLURM_ERROR; + goto cleanup; + } + + slurm_set_addr(&proto_conf_default.primary_controller, + slurmctld_conf.slurmctld_port, + slurmctld_conf.control_addr); + if (proto_conf_default.primary_controller.sin_port == 0) { + error("Unable to establish control machine address"); + rc =SLURM_ERROR; + goto cleanup; + } + + if (slurmctld_conf.backup_addr) { + slurm_set_addr(&proto_conf_default.secondary_controller, + slurmctld_conf.slurmctld_port, + slurmctld_conf.backup_addr); + } + proto_conf = &proto_conf_default; cleanup: - slurm_mutex_unlock(&config_lock); - return rc; + slurm_mutex_unlock(&config_lock); + return rc; } /* slurm_get_slurmd_port * returns slurmd port from slurmctld_conf object - * RET short int - slurmd port + * RET short int - slurmd port */ short int slurm_get_slurmd_port(void) { - if (slurmctld_conf.slurmd_port == 0) /* ==0 if config unread */ - slurm_api_set_default_config(); + if (slurmctld_conf.slurmd_port == 0) /* ==0 if config unread */ + slurm_api_set_default_config(); - return slurmctld_conf.slurmd_port; + return slurmctld_conf.slurmd_port; } /* slurm_get_slurm_user_id * returns slurmd uid from slurmctld_conf object - * RET uint32_t - slurm user id + * RET uint32_t - slurm user id */ uint32_t slurm_get_slurm_user_id(void) { - if (slurmctld_conf.slurmd_port == 0) /* ==0 if config unread */ - slurm_api_set_default_config(); + if (slurmctld_conf.slurmd_port == 0) /* ==0 if config unread */ + slurm_api_set_default_config(); - return slurmctld_conf.slurm_user_id; + return slurmctld_conf.slurm_user_id; } /**********************************************************************\ * general message management functions used by slurmctld, slurmd \**********************************************************************/ -/* In the socket implementation it creates a socket, binds to it, and - * listens for connections. - * IN port - port to bind the msg server to - * RET slurm_fd - file descriptor of the connection created +/* + * Initialize a slurm server at port "port" + * + * IN port - port to bind the msg server to + * RET slurm_fd - file descriptor of the connection created */ slurm_fd slurm_init_msg_engine_port(uint16_t port) { - slurm_addr slurm_address; + slurm_addr addr; - slurm_set_addr_any(&slurm_address, port); - return _slurm_init_msg_engine(&slurm_address); + slurm_set_addr_any(&addr, port); + return _slurm_init_msg_engine(&addr); } -/* In the socket implementation it creates a socket, binds to it, and - * listens for connections. - * IN slurm_address - slurm_addr to bind the msg server to - * RET slurm_fd - file descriptor of the connection created +/* + * Same as above, but initialize using a slurm address "addr" + * + * IN addr - slurm_addr to bind the msg server to + * RET slurm_fd - file descriptor of the connection created */ -slurm_fd slurm_init_msg_engine(slurm_addr * slurm_address) +slurm_fd slurm_init_msg_engine(slurm_addr *addr) { - return _slurm_init_msg_engine(slurm_address); + return _slurm_init_msg_engine(addr); } -/* just calls close on an established msg connection - * IN open_fd - an open file descriptor to close - * RET int - the return code +/* + * Close an established message engine. + * Returns SLURM_SUCCESS or SLURM_FAILURE. + * + * IN fd - an open file descriptor to close + * RET int - the return code */ -int slurm_shutdown_msg_engine(slurm_fd open_fd) +int slurm_shutdown_msg_engine(slurm_fd fd) { - return _slurm_close(open_fd); + return _slurm_close(fd); } -/* just calls close on an established msg connection to close - * IN open_fd - an open file descriptor to close - * RET int - the return code +/* + * Close an established message connection. + * Returns SLURM_SUCCESS or SLURM_FAILURE. + * + * IN fd - an open file descriptor to close + * RET int - the return code */ -int slurm_shutdown_msg_conn(slurm_fd open_fd) +int slurm_shutdown_msg_conn(slurm_fd fd) { - return _slurm_close(open_fd); + return _slurm_close(fd); } /**********************************************************************\ @@ -219,101 +214,88 @@ int slurm_shutdown_msg_conn(slurm_fd open_fd) \**********************************************************************/ /* In the bsd socket implementation it creates a SOCK_STREAM socket - * and calls connect on it a SOCK_DGRAM socket called with connect - * is defined to only receive messages from the address/port pair - * argument of the connect call slurm_address - for now it is - * really just a sockaddr_in - * IN slurm_address - slurm_addr of the connection destination - * RET slurm_fd - file descriptor of the connection created + * and calls connect on it a SOCK_DGRAM socket called with connect + * is defined to only receive messages from the address/port pair + * argument of the connect call slurm_address - for now it is + * really just a sockaddr_in + * IN slurm_address - slurm_addr of the connection destination + * RET slurm_fd - file descriptor of the connection created */ slurm_fd slurm_open_msg_conn(slurm_addr * slurm_address) { - return _slurm_open_msg_conn(slurm_address); + return _slurm_open_msg_conn(slurm_address); } /* calls connect to make a connection-less datagram connection to the - * primary or secondary slurmctld message engine - * RET slurm_fd - file descriptor of the connection created + * primary or secondary slurmctld message engine + * RET slurm_fd - file descriptor of the connection created */ slurm_fd slurm_open_controller_conn() { - slurm_fd connection_fd; + slurm_fd fd; + + if (slurm_api_set_default_config() < 0) + return SLURM_FAILURE; + + if ((fd = slurm_open_msg_conn(&proto_conf->primary_controller)) >= 0) + return fd; + + debug("Failed to contact primary controller: %m"); - connection_fd = slurm_api_set_default_config(); + if (!slurmctld_conf.backup_controller) + goto fail; - /* try to send to primary first then secondary */ - if ((connection_fd == SLURM_SUCCESS) && - ((connection_fd = - slurm_open_msg_conn(&proto_conf->primary_controller)) == - SLURM_SOCKET_ERROR)) { - debug("Open connection to primary controller failed: %m"); + if ((fd = slurm_open_msg_conn(&proto_conf->secondary_controller)) >= 0) + return fd; - if ((slurmctld_conf.backup_controller) && - ((connection_fd = - slurm_open_msg_conn(&proto_conf-> - secondary_controller)) - == SLURM_SOCKET_ERROR)) - debug - ("Open connection to secondary controller failed: %m"); - } + debug("Failed to contact secondary controller: %m"); - return connection_fd; + fail: + slurm_seterrno_ret(SLURM_COMMUNICATIONS_CONNECTION_ERROR); } /* calls connect to make a connection-less datagram connection to the - * primary or secondary slurmctld message engine - * RET slurm_fd - file descriptor of the connection created - * IN dest - controller to contact, primary or secondary + * primary or secondary slurmctld message engine + * RET slurm_fd - file descriptor of the connection created + * IN dest - controller to contact, primary or secondary */ slurm_fd slurm_open_controller_conn_spec(enum controller_id dest) { - slurm_fd connection_fd; - - connection_fd = slurm_api_set_default_config(); - - if (connection_fd != SLURM_SUCCESS) { - debug3("slurm_api_set_default_config error"); - } else if (dest == PRIMARY_CONTROLLER) { - if ((connection_fd = - slurm_open_msg_conn(&proto_conf-> - primary_controller)) == - SLURM_SOCKET_ERROR) - debug - ("Open connection to primary controller failed: %m"); - } else if (slurmctld_conf.backup_controller) { - if ((connection_fd = - slurm_open_msg_conn(&proto_conf-> - secondary_controller)) == - SLURM_SOCKET_ERROR) - debug - ("Open connection to secondary controller failed: %m"); - } else { - debug("No secondary controller to contact"); - connection_fd = SLURM_SOCKET_ERROR; - } - - return connection_fd; + slurm_addr *addr; + + if (slurm_api_set_default_config() < 0) { + debug3("Error: Unable to set default config"); + return SLURM_ERROR; + } + + addr = (dest == PRIMARY_CONTROLLER) ? + &proto_conf->primary_controller : + &proto_conf->secondary_controller; + + if (!addr) return SLURM_ERROR; + + return slurm_open_msg_conn(addr); } /* In the bsd implmentation maps directly to a accept call - * IN open_fd - file descriptor to accept connection on - * OUT slurm_address - slurm_addr of the accepted connection - * RET slurm_fd - file descriptor of the connection created + * IN open_fd - file descriptor to accept connection on + * OUT slurm_address - slurm_addr of the accepted connection + * RET slurm_fd - file descriptor of the connection created */ slurm_fd slurm_accept_msg_conn(slurm_fd open_fd, - slurm_addr * slurm_address) + slurm_addr * slurm_address) { - return _slurm_accept_msg_conn(open_fd, slurm_address); + return _slurm_accept_msg_conn(open_fd, slurm_address); } /* In the bsd implmentation maps directly to a close call, to close - * the socket that was accepted - * IN open_fd - an open file descriptor to close - * RET int - the return code + * the socket that was accepted + * IN open_fd - an open file descriptor to close + * RET int - the return code */ int slurm_close_accepted_conn(slurm_fd open_fd) { - return _slurm_close_accepted_conn(open_fd); + return _slurm_close_accepted_conn(open_fd); } /**********************************************************************\ @@ -322,162 +304,159 @@ int slurm_close_accepted_conn(slurm_fd open_fd) /* * NOTE: memory is allocated for the returned msg and must be freed at - * some point using the slurm_free_functions - * IN open_fd - file descriptor to receive msg on - * OUT msg - a slurm_msg struct to be filled in by the function - * RET int - size of msg received in bytes before being unpacked - */ -int slurm_receive_msg(slurm_fd open_fd, slurm_msg_t * msg) -{ - char *buftemp; - header_t header; - int rc; - void *auth_cred; - Buf buffer; - - buftemp = xmalloc(SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE); - if ((rc = _slurm_msg_recvfrom(open_fd, buftemp, - SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE, - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS)) - == SLURM_SOCKET_ERROR) { - xfree(buftemp); - return rc; - } - -#if _DEBUG - _print_data (buftemp,rc); + * some point using the slurm_free_functions + * IN open_fd - file descriptor to receive msg on + * OUT msg - a slurm_msg struct to be filled in by the function + * RET int - size of msg received in bytes before being unpacked + */ +int slurm_receive_msg(slurm_fd fd, slurm_msg_t * msg) +{ + char *buf = NULL; + size_t buflen = 0; + + header_t header; + int rc; + void *auth_cred; + Buf buffer; + + /* + * Receive a msg. slurm_msg_recvfrom() will read the message + * length and allocate space on the heap for a buffer containing + * the message. + */ + if (_slurm_msg_recvfrom(fd, &buf, &buflen, 0) < 0) + return SLURM_ERROR; + +#if _DEBUG + _print_data (buftemp,rc); #endif - buffer = create_buf(buftemp, rc); - - /* unpack header */ - unpack_header(&header, buffer); - if ((rc = check_header_version(&header)) != SLURM_SUCCESS) { - free_buf(buffer); - slurm_seterrno_ret(SLURM_PROTOCOL_VERSION_ERROR); - } - - /* unpack authentication cred */ - if ( ( auth_cred = g_slurm_auth_unpack( buffer) ) == NULL ) { - error( "authentication: %s ", - g_slurm_auth_errstr( g_slurm_auth_errno( NULL ) ) ); - free_buf(buffer); - slurm_seterrno_ret(ESLURM_PROTOCOL_INCOMPLETE_PACKET); - } - - /* verify credentials */ - rc = g_slurm_auth_verify( auth_cred, NULL, 2 ); - if ( rc != SLURM_SUCCESS) { - error( "authentication: %s ", - g_slurm_auth_errstr( g_slurm_auth_errno( auth_cred ) ) ); - (void) g_slurm_auth_destroy(auth_cred); - free_buf(buffer); - slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR); - } - - /* unpack msg body */ - msg->msg_type = header.msg_type; - if ((header.body_length > remaining_buf(buffer)) || - (unpack_msg(msg, buffer) != SLURM_SUCCESS)) { - g_slurm_auth_destroy(auth_cred); - free_buf(buffer); - slurm_seterrno_ret(ESLURM_PROTOCOL_INCOMPLETE_PACKET); - } - - msg->cred = (void *) auth_cred; - - free_buf(buffer); - return SLURM_SUCCESS; + buffer = create_buf(buf, buflen); + + unpack_header(&header, buffer); + + if (check_header_version(&header) < 0) { + free_buf(buffer); + slurm_seterrno_ret(SLURM_PROTOCOL_VERSION_ERROR); + } + + if ((auth_cred = g_slurm_auth_unpack(buffer)) == NULL) { + error( "authentication: %s ", + g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); + free_buf(buffer); + slurm_seterrno_ret(ESLURM_PROTOCOL_INCOMPLETE_PACKET); + } + + rc = g_slurm_auth_verify( auth_cred, NULL, 2 ); + + if (rc != SLURM_SUCCESS) { + error( "authentication: %s ", + g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred))); + (void) g_slurm_auth_destroy(auth_cred); + free_buf(buffer); + slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR); + } + + /* + * Unpack message body + */ + msg->msg_type = header.msg_type; + if ( (header.body_length > remaining_buf(buffer)) + || (unpack_msg(msg, buffer) != SLURM_SUCCESS) ) { + (void) g_slurm_auth_destroy(auth_cred); + free_buf(buffer); + slurm_seterrno_ret(ESLURM_PROTOCOL_INCOMPLETE_PACKET); + } + + msg->cred = (void *) auth_cred; + + free_buf(buffer); + return SLURM_SUCCESS; } /**********************************************************************\ * send message functions \**********************************************************************/ -/* sends a slurm_protocol msg to the slurmctld based on location - * information retrieved from the slurmd.conf. if unable to contant - * the primary slurmctld attempts will be made to contact the backup - * controller - * - * IN open_fd - file descriptor to send msg on - * IN msg - a slurm msg struct to be sent - * RET int - size of msg sent in bytes - */ -int slurm_send_controller_msg(slurm_fd open_fd, slurm_msg_t * msg) -{ - int rc; - /* try to send to primary first then secondary */ - msg->address = proto_conf->primary_controller; - if ((rc = slurm_send_node_msg(open_fd, msg)) == SLURM_SOCKET_ERROR) { - debug("Send message to primary controller failed: %m"); - msg->address = proto_conf->secondary_controller; - if ((rc = slurm_send_node_msg(open_fd, msg)) - == SLURM_SOCKET_ERROR) - debug - ("Send messge to secondary controller failed: %m"); - } - return rc; -} - -/* sends a message to an arbitrary node - * - * IN open_fd - file descriptor to send msg on - * IN msg - a slurm msg struct to be sent - * RET int - size of msg sent in bytes - */ -int slurm_send_node_msg(slurm_fd open_fd, slurm_msg_t * msg) -{ - header_t header; - int rc; - unsigned int msg_len, tmp_len; - Buf buffer; - void *auth_cred; - - /* initialize header */ - auth_cred = g_slurm_auth_create( NULL, 2 ); - if ( auth_cred == NULL ) { - error( "authentication: %s", - g_slurm_auth_errstr( g_slurm_auth_errno( NULL ) ) ); - return SLURM_PROTOCOL_AUTHENTICATION_ERROR; - } - init_header(&header, msg->msg_type, SLURM_PROTOCOL_NO_FLAGS); - - /* pack header */ - buffer = init_buf(0); - pack_header(&header, buffer); - - /* pack creds */ - if ( g_slurm_auth_pack(auth_cred, buffer) ) { - error( "authentication: %s", - g_slurm_auth_errstr( g_slurm_auth_errno( auth_cred ) ) ); - } - (void) g_slurm_auth_destroy(auth_cred); - - /* pack msg */ - tmp_len = get_buf_offset(buffer); - pack_msg(msg, buffer); - msg_len = get_buf_offset(buffer) - tmp_len; - - /* update header with correct cred and msg lengths */ - update_header(&header, msg_len); - - /* repack updated header */ - tmp_len = get_buf_offset(buffer); - set_buf_offset(buffer, 0); - pack_header(&header, buffer); - set_buf_offset(buffer, tmp_len); - - /* send msg */ -#if _DEBUG - _print_data (get_buf_data(buffer),get_buf_offset(buffer)); +/* + * Do the wonderful stuff that needs be done to pack msg + * and hdr into buffer + */ +static void +_pack_msg(slurm_msg_t *msg, header_t *hdr, Buf buffer) +{ + unsigned int tmplen, msglen; + + tmplen = get_buf_offset(buffer); + pack_msg(msg, buffer); + msglen = get_buf_offset(buffer) - tmplen; + + /* update header with correct cred and msg lengths */ + update_header(hdr, msglen); + + /* repack updated header */ + tmplen = get_buf_offset(buffer); + set_buf_offset(buffer, 0); + pack_header(hdr, buffer); + set_buf_offset(buffer, tmplen); +} + + +/* + * Send a slurm message over an open file descriptor `fd' + * Returns the size of the message sent in bytes, or -1 on failure. + */ +int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg) +{ + header_t header; + Buf buffer; + int rc; + void * auth_cred; + + /* + * Initialize header with Auth credential and message type. + */ + auth_cred = g_slurm_auth_create(NULL, 2); + if (auth_cred == NULL) { + error("authentication: %s", + g_slurm_auth_errstr(g_slurm_auth_errno(NULL)) ); + slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR); + } + + init_header(&header, msg->msg_type, SLURM_PROTOCOL_NO_FLAGS); + + /* + * Pack header into buffer for transmission + */ + buffer = init_buf(0); + pack_header(&header, buffer); + + /* + * Pack auth credential + */ + if (g_slurm_auth_pack(auth_cred, buffer)) { + error("authentication: %s", + g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred))); + } + (void) g_slurm_auth_destroy(auth_cred); + + + /* + * Pack message into buffer + */ + _pack_msg(msg, &header, buffer); + +#if _DEBUG + _print_data (get_buf_data(buffer),get_buf_offset(buffer)); #endif - if ((rc = _slurm_msg_sendto(open_fd, get_buf_data(buffer), - get_buf_offset(buffer), - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS )) - == SLURM_SOCKET_ERROR) - error("Error sending msg socket: %m"); + /* + * Send message + */ + rc = _slurm_msg_sendto( fd, get_buf_data(buffer), + get_buf_offset(buffer), + SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ); - free_buf(buffer); - return rc; + free_buf(buffer); + return rc; } /**********************************************************************\ @@ -486,111 +465,111 @@ int slurm_send_node_msg(slurm_fd open_fd, slurm_msg_t * msg) /* slurm_listen_stream * opens a stream server and listens on it - * IN slurm_address - slurm_addr to bind the server stream to - * RET slurm_fd - file descriptor of the stream created + * IN slurm_address - slurm_addr to bind the server stream to + * RET slurm_fd - file descriptor of the stream created */ slurm_fd slurm_listen_stream(slurm_addr * slurm_address) { - return _slurm_listen_stream(slurm_address); + return _slurm_listen_stream(slurm_address); } /* slurm_accept_stream * accepts a incomming stream connection on a stream server slurm_fd - * IN open_fd - file descriptor to accept connection on - * OUT slurm_address - slurm_addr of the accepted connection - * RET slurm_fd - file descriptor of the accepted connection + * IN open_fd - file descriptor to accept connection on + * OUT slurm_address - slurm_addr of the accepted connection + * RET slurm_fd - file descriptor of the accepted connection */ slurm_fd slurm_accept_stream(slurm_fd open_fd, slurm_addr * slurm_address) { - return _slurm_accept_stream(open_fd, slurm_address); + return _slurm_accept_stream(open_fd, slurm_address); } /* slurm_open_stream * opens a client connection to stream server - * IN slurm_address - slurm_addr of the connection destination + * IN slurm_address - slurm_addr of the connection destination * RET slurm_fd - file descriptor of the connection created */ slurm_fd slurm_open_stream(slurm_addr * slurm_address) { - return _slurm_open_stream(slurm_address); + return _slurm_open_stream(slurm_address); } /* slurm_write_stream * writes a buffer out a stream file descriptor - * IN open_fd - file descriptor to write on - * IN buffer - buffer to send - * IN size - size of buffer send - * IN timeout - how long to wait in milliseconds - * RET size_t - bytes sent , or -1 on errror + * IN open_fd - file descriptor to write on + * IN buffer - buffer to send + * IN size - size of buffer send + * IN timeout - how long to wait in milliseconds + * RET size_t - bytes sent , or -1 on errror */ size_t slurm_write_stream(slurm_fd open_fd, char *buffer, size_t size) { - return _slurm_send_timeout(open_fd, buffer, size, - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, - SLURM_MESSGE_TIMEOUT_MSEC_STATIC); + return _slurm_send_timeout(open_fd, buffer, size, + SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, + SLURM_MESSGE_TIMEOUT_MSEC_STATIC); } size_t slurm_write_stream_timeout(slurm_fd open_fd, char *buffer, - size_t size, int timeout) + size_t size, int timeout) { - return _slurm_send_timeout(open_fd, buffer, size, - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, - timeout); + return _slurm_send_timeout(open_fd, buffer, size, + SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, + timeout); } /* slurm_read_stream * read into buffer grom a stream file descriptor - * IN open_fd - file descriptor to read from - * OUT buffer - buffer to receive into - * IN size - size of buffer - * IN timeout - how long to wait in milliseconds - * RET size_t - bytes read , or -1 on errror + * IN open_fd - file descriptor to read from + * OUT buffer - buffer to receive into + * IN size - size of buffer + * IN timeout - how long to wait in milliseconds + * RET size_t - bytes read , or -1 on errror */ size_t slurm_read_stream(slurm_fd open_fd, char *buffer, size_t size) { - return _slurm_recv_timeout(open_fd, buffer, size, - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, - SLURM_MESSGE_TIMEOUT_MSEC_STATIC); + return _slurm_recv_timeout(open_fd, buffer, size, + SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, + SLURM_MESSGE_TIMEOUT_MSEC_STATIC); } size_t slurm_read_stream_timeout(slurm_fd open_fd, char *buffer, - size_t size, int timeout) + size_t size, int timeout) { - return _slurm_recv_timeout(open_fd, buffer, size, - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, - timeout); + return _slurm_recv_timeout(open_fd, buffer, size, + SLURM_PROTOCOL_NO_SEND_RECV_FLAGS, + timeout); } /* slurm_get_stream_addr * esentially a encapsilated get_sockname - * IN open_fd - file descriptor to retreive slurm_addr for - * OUT address - address that open_fd to bound to + * IN open_fd - file descriptor to retreive slurm_addr for + * OUT address - address that open_fd to bound to */ int slurm_get_stream_addr(slurm_fd open_fd, slurm_addr * address) { - return _slurm_get_stream_addr(open_fd, address); + return _slurm_get_stream_addr(open_fd, address); } /* slurm_close_stream * closes either a server or client stream file_descriptor - * IN open_fd - an open file descriptor to close - * RET int - the return code + * IN open_fd - an open file descriptor to close + * RET int - the return code */ int slurm_close_stream(slurm_fd open_fd) { - return _slurm_close_stream(open_fd); + return _slurm_close_stream(open_fd); } /* make an open slurm connection blocking or non-blocking - * (i.e. wait or do not wait for i/o completion ) - * IN open_fd - an open file descriptor to change the effect - * RET int - the return code + * (i.e. wait or do not wait for i/o completion ) + * IN open_fd - an open file descriptor to change the effect + * RET int - the return code */ int slurm_set_stream_non_blocking(slurm_fd open_fd) { - return _slurm_set_stream_non_blocking(open_fd); + return _slurm_set_stream_non_blocking(open_fd); } int slurm_set_stream_blocking(slurm_fd open_fd) { - return _slurm_set_stream_blocking(open_fd); + return _slurm_set_stream_blocking(open_fd); } /**********************************************************************\ @@ -599,98 +578,98 @@ int slurm_set_stream_blocking(slurm_fd open_fd) /* slurm_set_addr_uint * initializes the slurm_address with the supplied port and ip_address - * OUT slurm_address - slurm_addr to be filled in - * IN port - port in host order - * IN ip_address - ipv4 address in uint32 host order form + * OUT slurm_address - slurm_addr to be filled in + * IN port - port in host order + * IN ip_address - ipv4 address in uint32 host order form */ void slurm_set_addr_uint(slurm_addr * slurm_address, uint16_t port, - uint32_t ip_address) + uint32_t ip_address) { - _slurm_set_addr_uint(slurm_address, port, ip_address); + _slurm_set_addr_uint(slurm_address, port, ip_address); } /* slurm_set_addr_any * initialized the slurm_address with the supplied port on INADDR_ANY - * OUT slurm_address - slurm_addr to be filled in - * IN port - port in host order + * OUT slurm_address - slurm_addr to be filled in + * IN port - port in host order */ void slurm_set_addr_any(slurm_addr * slurm_address, uint16_t port) { - _slurm_set_addr_uint(slurm_address, port, SLURM_INADDR_ANY); + _slurm_set_addr_uint(slurm_address, port, SLURM_INADDR_ANY); } /* slurm_set_addr * initializes the slurm_address with the supplied port and host name - * OUT slurm_address - slurm_addr to be filled in - * IN port - port in host order - * IN host - hostname or dns name + * OUT slurm_address - slurm_addr to be filled in + * IN port - port in host order + * IN host - hostname or dns name */ void slurm_set_addr(slurm_addr * slurm_address, uint16_t port, char *host) { - _slurm_set_addr_char(slurm_address, port, host); + _slurm_set_addr_char(slurm_address, port, host); } /* reset_slurm_addr * resets the address field of a slurm_addr, port and family unchanged - * OUT slurm_address - slurm_addr to be reset in - * IN new_address - source of address to write into slurm_address + * OUT slurm_address - slurm_addr to be reset in + * IN new_address - source of address to write into slurm_address */ void reset_slurm_addr(slurm_addr * slurm_address, slurm_addr new_address) { - _reset_slurm_addr(slurm_address, new_address); + _reset_slurm_addr(slurm_address, new_address); } /* slurm_set_addr_char * initializes the slurm_address with the supplied port and host - * OUT slurm_address - slurm_addr to be filled in - * IN port - port in host order - * IN host - hostname or dns name + * OUT slurm_address - slurm_addr to be filled in + * IN port - port in host order + * IN host - hostname or dns name */ void slurm_set_addr_char(slurm_addr * slurm_address, uint16_t port, - char *host) + char *host) { - _slurm_set_addr_char(slurm_address, port, host); + _slurm_set_addr_char(slurm_address, port, host); } /* slurm_get_addr * given a slurm_address it returns its port and hostname - * IN slurm_address - slurm_addr to be queried - * OUT port - port number - * OUT host - hostname - * IN buf_len - length of hostname buffer + * IN slurm_address - slurm_addr to be queried + * OUT port - port number + * OUT host - hostname + * IN buf_len - length of hostname buffer */ void slurm_get_addr(slurm_addr * slurm_address, uint16_t * port, - char *host, unsigned int buf_len) + char *host, unsigned int buf_len) { - _slurm_get_addr(slurm_address, port, host, buf_len); + _slurm_get_addr(slurm_address, port, host, buf_len); } /* slurm_get_peer_addr * get the slurm address of the peer connection, similar to getpeeraddr - * IN fd - an open connection - * OUT slurm_address - place to park the peer's slurm_addr + * IN fd - an open connection + * OUT slurm_address - place to park the peer's slurm_addr */ int slurm_get_peer_addr(slurm_fd fd, slurm_addr * slurm_address) { - struct sockaddr name; - socklen_t namelen = (socklen_t) sizeof(struct sockaddr); - int rc; + struct sockaddr name; + socklen_t namelen = (socklen_t) sizeof(struct sockaddr); + int rc; - if ((rc = _slurm_getpeername((int) fd, &name, &namelen))) - return rc; - memcpy(slurm_address, &name, sizeof(slurm_addr)); - return 0; + if ((rc = _slurm_getpeername((int) fd, &name, &namelen))) + return rc; + memcpy(slurm_address, &name, sizeof(slurm_addr)); + return 0; } /* slurm_print_slurm_addr * prints a slurm_addr into a buf - * IN address - slurm_addr to print - * IN buf - space for string representation of slurm_addr - * IN n - max number of bytes to write (including NUL) + * IN address - slurm_addr to print + * IN buf - space for string representation of slurm_addr + * IN n - max number of bytes to write (including NUL) */ void slurm_print_slurm_addr(slurm_addr * address, char *buf, size_t n) { - _slurm_print_slurm_addr(address, buf, n); + _slurm_print_slurm_addr(address, buf, n); } /**********************************************************************\ @@ -699,24 +678,24 @@ void slurm_print_slurm_addr(slurm_addr * address, char *buf, size_t n) /* slurm_pack_slurm_addr * packs a slurm_addr into a buffer to serialization transport - * IN slurm_address - slurm_addr to pack - * IN/OUT buffer - buffer to pack the slurm_addr into + * IN slurm_address - slurm_addr to pack + * IN/OUT buffer - buffer to pack the slurm_addr into */ void slurm_pack_slurm_addr(slurm_addr * slurm_address, Buf buffer) { - _slurm_pack_slurm_addr(slurm_address, buffer); + _slurm_pack_slurm_addr(slurm_address, buffer); } /* slurm_pack_slurm_addr * unpacks a buffer into a slurm_addr after serialization transport - * OUT slurm_address - slurm_addr to unpack to - * IN/OUT buffer - buffer to upack the slurm_addr from - * returns - SLURM error code + * OUT slurm_address - slurm_addr to unpack to + * IN/OUT buffer - buffer to upack the slurm_addr from + * returns - SLURM error code */ int slurm_unpack_slurm_addr_no_alloc(slurm_addr * slurm_address, - Buf buffer) + Buf buffer) { - return _slurm_unpack_slurm_addr_no_alloc(slurm_address, buffer); + return _slurm_unpack_slurm_addr_no_alloc(slurm_address, buffer); } /**********************************************************************\ @@ -727,193 +706,213 @@ int slurm_unpack_slurm_addr_no_alloc(slurm_addr * slurm_address, /* slurm_send_rc_msg * given the original request message this function sends a - * slurm_return_code message back to the client that made the request - * IN request_msg - slurm_msg the request msg - * IN rc - the return_code to send back to the client + * slurm_return_code message back to the client that made the request + * IN request_msg - slurm_msg the request msg + * IN rc - the return_code to send back to the client */ void slurm_send_rc_msg(slurm_msg_t * request_msg, int rc) { - slurm_msg_t response_msg; - return_code_msg_t rc_msg; + slurm_msg_t response_msg; + return_code_msg_t rc_msg; + + /* no change */ + rc_msg.return_code = rc; + /* init response_msg structure */ + response_msg.address = request_msg->address; + response_msg.msg_type = RESPONSE_SLURM_RC; + response_msg.data = &rc_msg; + + /* send message */ + slurm_send_node_msg(request_msg->conn_fd, &response_msg); +} + +/* + * Send and recv a slurm request and response on the open slurm descriptor + */ +static int +_send_and_recv_msg(slurm_fd fd, slurm_msg_t *req, slurm_msg_t *resp) +{ + int rc = SLURM_SUCCESS; + + if ( (slurm_send_node_msg(fd, req) < 0) + || (slurm_receive_msg(fd, resp) < 0) ) + rc = SLURM_ERROR; - /* no change */ - rc_msg.return_code = rc; - /* init response_msg structure */ - response_msg.address = request_msg->address; - response_msg.msg_type = RESPONSE_SLURM_RC; - response_msg.data = &rc_msg; + /* + * Attempt to close an open connection + */ + if (slurm_shutdown_msg_conn(fd) < 0) + rc = SLURM_ERROR; + + return rc; - /* send message */ - slurm_send_node_msg(request_msg->conn_fd, &response_msg); } /* slurm_send_recv_controller_msg * opens a connection to the controller, sends the controller a message, * listens for the response, then closes the connection - * IN request_msg - slurm_msg request - * OUT response_msg - slurm_msg response - * RET int - return code - */ -int slurm_send_recv_controller_msg(slurm_msg_t * request_msg, - slurm_msg_t * response_msg) -{ - int msg_size; - int rc; - slurm_fd sockfd; - int error_code = 0; - - /* init message connection for communication with controller */ - if ((sockfd = slurm_open_controller_conn()) == SLURM_SOCKET_ERROR) { - return SLURM_SOCKET_ERROR; - } - - /* send request message */ - if ((rc = - slurm_send_controller_msg(sockfd, - request_msg)) == - SLURM_SOCKET_ERROR) { - error_code = 1; - goto slurm_send_recv_controller_msg_cleanup; - } - - /* receive message */ - if ((msg_size = - slurm_receive_msg(sockfd, - response_msg)) == SLURM_SOCKET_ERROR) { - error_code = 1; - goto slurm_send_recv_controller_msg_cleanup; - } - - slurm_send_recv_controller_msg_cleanup: - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) == SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; - if (error_code) - return SLURM_SOCKET_ERROR; - - return SLURM_SUCCESS; + * IN request_msg - slurm_msg request + * OUT response_msg - slurm_msg response + * RET int - return code + */ +int slurm_send_recv_controller_msg(slurm_msg_t *req, slurm_msg_t *resp) +{ + slurm_fd fd = -1; + + if ((fd = slurm_open_controller_conn()) < 0) + return SLURM_SOCKET_ERROR; + + return _send_and_recv_msg(fd, req, resp); } /* slurm_send_recv_node_msg * opens a connection to node, sends the node a message, listens * for the response, then closes the connection - * IN request_msg - slurm_msg request - * OUT response_msg - slurm_msg response - * RET int - return code - */ -int slurm_send_recv_node_msg(slurm_msg_t * request_msg, - slurm_msg_t * response_msg) -{ - int msg_size; - int rc; - slurm_fd sockfd; - int error_code = 0; - - /* init message connection for communication with controller */ - if ((sockfd = - slurm_open_msg_conn(&request_msg->address)) == - SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; - - /* send request message */ - if ((rc = - slurm_send_node_msg(sockfd, - request_msg)) == SLURM_SOCKET_ERROR) { - error_code = 1; - goto slurm_send_recv_node_msg_cleanup; - } - - /* receive message */ - if ((msg_size = - slurm_receive_msg(sockfd, - response_msg)) == SLURM_SOCKET_ERROR) { - error_code = 1; - goto slurm_send_recv_node_msg_cleanup; - } - - slurm_send_recv_node_msg_cleanup: - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) == SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; - if (error_code) - return SLURM_SOCKET_ERROR; - - return SLURM_SUCCESS; + * IN request_msg - slurm_msg request + * OUT response_msg - slurm_msg response + * RET int - return code + */ +int slurm_send_recv_node_msg(slurm_msg_t *req, slurm_msg_t *resp) +{ + slurm_fd fd = -1; + + if ((fd = slurm_open_msg_conn(&req->address)) < 0) + return SLURM_SOCKET_ERROR; + + return _send_and_recv_msg(fd, req, resp); } /* slurm_send_only_controller_msg * opens a connection to the controller, sends the controller a * message then, closes the connection - * IN request_msg - slurm_msg request - * RET int - return code + * IN request_msg - slurm_msg request + * RET int - return code */ -int slurm_send_only_controller_msg(slurm_msg_t * request_msg) +int slurm_send_only_controller_msg(slurm_msg_t *req) { - int rc; - slurm_fd sockfd; - int error_code = 0; + int rc = SLURM_SUCCESS; + slurm_fd fd = -1; - /* init message connection for communication with controller */ - if ((sockfd = slurm_open_controller_conn()) == SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; + /* + * Open connection to SLURM controller: + */ + if ((fd = slurm_open_controller_conn()) < 0) + return SLURM_SOCKET_ERROR; - /* send request message */ - if ((rc = - slurm_send_controller_msg(sockfd, - request_msg)) == - SLURM_SOCKET_ERROR) { - error_code = 1; - goto slurm_send_only_controller_msg_cleanup; - } - slurm_send_only_controller_msg_cleanup: - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) == SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; - if (error_code) - return SLURM_SOCKET_ERROR; + rc = slurm_send_node_msg(fd, req); + if (slurm_shutdown_msg_conn(fd) < 0) + return SLURM_SOCKET_ERROR; - return SLURM_SUCCESS; + return rc; } -/* slurm_send_only_controller_msg - * opens a connection to the controller, sends the controller a - * message then, closes the connection - * IN request_msg - slurm_msg request - * RET int - return code +/* + * Open a connection to the "address" specified in the slurm msg `req' + * Then, immediately close the connection w/out waiting for a reply. + * + * Returns SLURM_SUCCESS on success SLURM_FAILURE (< 0) for failure. + */ +int slurm_send_only_node_msg(slurm_msg_t *req) +{ + int rc = SLURM_SUCCESS; + slurm_fd fd = -1; + + if ((fd = slurm_open_msg_conn(&req->address)) < 0) + return SLURM_SOCKET_ERROR; + + rc = slurm_send_node_msg(fd, req); + + if (slurm_shutdown_msg_conn(fd) < 0) + return SLURM_SOCKET_ERROR; + + return rc; +} + + +/* + * Send message and recv "return code" message on an already open + * slurm file descriptor + */ +static int _send_recv_rc_msg(slurm_fd fd, slurm_msg_t *req, int *rc) +{ + int retval = SLURM_SUCCESS; + slurm_msg_t msg; + + retval = _send_and_recv_msg(fd, req, &msg); + slurm_shutdown_msg_conn(fd); + + if (retval != SLURM_SUCCESS) + goto done; + + if (msg.msg_type != RESPONSE_SLURM_RC) + slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR); + + *rc = ((return_code_msg_t *) msg.data)->return_code; + slurm_free_return_code_msg(msg.data); + + done: + return retval; +} + +/* + * Open a connection to the "address" specified in the the slurm msg "req" + * Then read back an "rc" message returning the "return_code" specified + * in the response in the "rc" parameter. */ -int slurm_send_only_node_msg(slurm_msg_t * request_msg) +int slurm_send_recv_rc_msg(slurm_msg_t *req, int *rc) { - int rc; - slurm_fd sockfd; - int error_code = 0; + slurm_fd fd = -1; - /* init message connection for communication with controller */ - if ((sockfd = slurm_open_msg_conn(&request_msg->address)) == - SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; + if ((fd = slurm_open_msg_conn(&req->address)) < 0) + return SLURM_SOCKET_ERROR; - /* send request message */ - if ((rc = slurm_send_node_msg(sockfd, request_msg)) < 0) { - error_code = 1; - goto slurm_send_only_node_msg_cleanup; - } - slurm_send_only_node_msg_cleanup: - /* shutdown message connection */ - if ((rc = slurm_shutdown_msg_conn(sockfd)) == SLURM_SOCKET_ERROR) - return SLURM_SOCKET_ERROR; - if (error_code) - return SLURM_SOCKET_ERROR; + return _send_recv_rc_msg(fd, req, rc); +} +/* + * Same as above, but send message to controller + */ +int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc) +{ + slurm_fd fd = -1; - return SLURM_SUCCESS; + if ((fd = slurm_open_controller_conn()) < 0) + return SLURM_SOCKET_ERROR; + + return _send_recv_rc_msg(fd, req, rc); } -/* Slurm message functions */ +/* + * Free a slurm message + */ void slurm_free_msg(slurm_msg_t * msg) { - (void) g_slurm_auth_destroy(msg->cred); - xfree(msg); + (void) g_slurm_auth_destroy(msg->cred); + xfree(msg); +} + + +#if _DEBUG + +static void _print_data(char *data, int len) +{ + int i; + for (i = 0; i < len; i++) { + if ((i % 10 == 0) && (i != 0)) + printf("\n"); + printf("%2.2x ", ((int) data[i] & 0xff)); + if (i >= 200) + break; + } + printf("\n\n"); } + +#endif + + +/* + * vi: shiftwidth=8 tabstop=8 expandtab + */ diff --git a/src/common/slurm_protocol_api.h b/src/common/slurm_protocol_api.h index d57790d0c115c57def9af2b1a79ff594e842a63a..eda31b23b68ab14d980d548204a6bca9c22374c7 100644 --- a/src/common/slurm_protocol_api.h +++ b/src/common/slurm_protocol_api.h @@ -155,17 +155,6 @@ int slurm_receive_msg(slurm_fd open_fd, slurm_msg_t * msg); * send message functions \**********************************************************************/ -/* sends a slurm_protocol msg to the slurmctld based on location - * information retrieved from the slurmd.conf. if unable to contant - * the primary slurmctld attempts will be made to contact the backup - * controller - * - * IN open_fd - file descriptor to send msg on - * IN msg - a slurm msg struct to be sent - * RET int - size of msg sent in bytes - */ -int slurm_send_controller_msg(slurm_fd open_fd, slurm_msg_t * msg); - /* sends a message to an arbitrary node * * IN open_fd - file descriptor to send msg on @@ -405,6 +394,17 @@ int slurm_send_recv_controller_msg(slurm_msg_t * request_msg, int slurm_send_recv_node_msg(slurm_msg_t * request_msg, slurm_msg_t * response_msg); +/* + * Open a connection to req->address, send message and receive + * a "return code" message, returning return code in "rc" + */ +int slurm_send_recv_rc_msg(slurm_msg_t *req, int *rc); + +/* + * Same as above, but send to controller + */ +int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc); + /* slurm_send_only_controller_msg * opens a connection to the controller, sends the node a message then, * closes the connection diff --git a/src/common/slurm_protocol_interface.h b/src/common/slurm_protocol_interface.h index deda358ec20885487e58059484dd129b331620e0..0571ad4e90056ff6918818e8e0b472047dfc817b 100644 --- a/src/common/slurm_protocol_interface.h +++ b/src/common/slurm_protocol_interface.h @@ -64,7 +64,7 @@ #include "src/common/pack.h" #include "src/common/slurm_protocol_common.h" -#define SLURM_MESSGE_TIMEOUT_MSEC_STATIC 5000 +#define SLURM_MESSGE_TIMEOUT_MSEC_STATIC 2000 /****************\ ** Data Types ** @@ -123,18 +123,22 @@ slurm_fd _slurm_open_msg_conn ( slurm_addr * slurm_address ) ; /* _slurm_msg_recvfrom * Get message over the given connection, default timeout value - * IN open_fd - an open file descriptor - * OUT buffer - loaded with data - * IN size - size of buffer in bytes - * IN flags - communication specific flags + * IN fd - an open file descriptor + * OUT pbuf - xmalloc'd buffer, loaded with message data + * OUT buflen - size of allocated buffer in bytes + * IN flags - communication specific flags + * * RET number of bytes read */ -ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , - size_t size , uint32_t flags ) ; -/* _slurm_msg_recvfrom_timeout is identical to _slurm_msg_recvfrom except - * IN timeout - maximum time to wait for a message in milliseconds */ -ssize_t _slurm_msg_recvfrom_timeout ( slurm_fd open_fd, char *buffer , - size_t size , uint32_t flags, int timeout) ; +ssize_t _slurm_msg_recvfrom(slurm_fd fd, char **pbuf, size_t *buflen, + uint32_t flags); + +/* _slurm_msg_recvfrom_timeout reads len bytes from file descriptor fd + * timing out after `timeout' milliseconds. + * + */ +ssize_t _slurm_msg_recvfrom_timeout(slurm_fd fd, char **buf, size_t *len, + uint32_t flags, int timeout); /* _slurm_msg_sendto * Send message over the given connection, default timeout value diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index 28f9dac749880d521a6cbdb15e69335f7eae76a8..57e2869b358e39675635f8039b138b5328187479 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -1,6 +1,7 @@ /*****************************************************************************\ * slurm_protocol_socket_implementation.c - slurm communications interfaces - * based upon sockets + * based upon sockets. + * $Id$ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -41,6 +42,7 @@ #include <stdio.h> #include <stdarg.h> #include <arpa/inet.h> +#include <sys/param.h> #if HAVE_SYS_SOCKET_H # include <sys/socket.h> @@ -53,569 +55,377 @@ #include "src/common/slurm_protocol_interface.h" #include "src/common/slurm_protocol_defs.h" #include "src/common/log.h" +#include "src/common/fd.h" +#include "src/common/xsignal.h" +#include "src/common/xmalloc.h" #include "src/common/util-net.h" -#define TEMP_BUFFER_SIZE 1024 +/* + * Maximum message size. Messages larger than this value (in bytes) + * will not be received. + */ +#define MAX_MSG_SIZE (512*1024) -/* internal static prototypes */ -/***************************************************************** +/**************************************************************** * MIDDLE LAYER MSG FUNCTIONS ****************************************************************/ -/* return time size start_time in msec, rounded off */ +/* + * Return time in msec since "start time" + */ static int _tot_wait (struct timeval *start_time) { - struct timeval end_time; - int msec_delay; + struct timeval end_time; + int msec_delay; - gettimeofday(&end_time, NULL); - msec_delay = (end_time.tv_sec - start_time->tv_sec ) * 1000; - msec_delay += ((end_time.tv_usec - start_time->tv_usec + 500) / 1000); - return msec_delay; + gettimeofday(&end_time, NULL); + msec_delay = (end_time.tv_sec - start_time->tv_sec ) * 1000; + msec_delay += ((end_time.tv_usec - start_time->tv_usec + 500) / 1000); + return msec_delay; } slurm_fd _slurm_init_msg_engine ( slurm_addr * slurm_address ) { - return _slurm_listen_stream ( slurm_address ) ; + return _slurm_listen_stream ( slurm_address ) ; } slurm_fd _slurm_open_msg_conn ( slurm_addr * slurm_address ) { - return _slurm_open_stream ( slurm_address ) ; -} - -/* this should be a no-op that just returns open_fd in a message - * implementation */ -slurm_fd _slurm_accept_msg_conn (slurm_fd open_fd , - slurm_addr * slurm_address) -{ - return _slurm_accept_stream ( open_fd , slurm_address ) ; -} -/* this should be a no-op in a message implementation */ -int _slurm_close_accepted_conn ( slurm_fd open_fd ) -{ - return _slurm_close ( open_fd ) ; -} - -ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , - uint32_t flags ) -{ - return _slurm_msg_recvfrom_timeout ( open_fd , buffer , size , flags , - SLURM_MESSGE_TIMEOUT_MSEC_STATIC ) ; -} - -ssize_t _slurm_msg_recvfrom_timeout ( slurm_fd open_fd, char *buffer , - size_t size , uint32_t flags, - int timeout) -{ - size_t recv_len ; - - char size_buffer_temp [TEMP_BUFFER_SIZE] ; - char * size_buffer = size_buffer_temp ; - char * moving_buffer = NULL ; - unsigned int size_buffer_len = 8 ; - uint32_t transmit_size, nw_format_size ; - unsigned int total_len ; - unsigned int excess_len = 0 ; - - moving_buffer = (char *)&nw_format_size ; - total_len = 0 ; - while ( total_len < sizeof ( uint32_t ) ) - { - if ( ( recv_len = _slurm_recv_timeout ( open_fd , - moving_buffer , - (sizeof ( uint32_t ) - total_len), - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ) - == SLURM_SOCKET_ERROR ) - { - if ( errno == EINTR ) - continue ; - else - return SLURM_PROTOCOL_ERROR ; - } - else if ( recv_len > 0 ) - { - total_len += recv_len ; - moving_buffer += recv_len ; - } - else if ( recv_len == 0 ) - { - /*debug ( "Error datagram recv_len = 0 ") ; */ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_ZERO_RECV_LENGTH); - return SLURM_PROTOCOL_ERROR ; - } - else - { - /*debug ( "Error datagram recv_len < -1") ;*/ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_NEGATIVE_RECV_LENGTH ) ; - return SLURM_PROTOCOL_ERROR ; - } - } - - transmit_size = ntohl(nw_format_size); - if (transmit_size > size) { - error ("_slurm_msg_recvfrom_timeout buffer too small (%d of %u), excess discarded", - size, transmit_size); - excess_len = transmit_size - size; - transmit_size = size; - - } - - moving_buffer = buffer ; - total_len = 0 ; - while ( total_len < transmit_size ) - { - if ( ( recv_len = _slurm_recv_timeout (open_fd, moving_buffer, - (transmit_size-total_len) , - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , - timeout ) ) == SLURM_SOCKET_ERROR ) - { - if ( errno == EINTR ) - { - continue ; - } - else - { - return SLURM_PROTOCOL_ERROR ; - } - return recv_len ; - } - else if ( recv_len > 0 ) - { - total_len += recv_len ; - moving_buffer += recv_len ; - } - else if ( recv_len == 0 ) - { - /*debug ( "Error datagram recv_len = 0 ") ; */ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_ZERO_RECV_LENGTH ) ; - return SLURM_PROTOCOL_ERROR ; - } - else - { - /*debug ( "Error datagram recv_len < -1") ;*/ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_NEGATIVE_RECV_LENGTH ) ; - return SLURM_PROTOCOL_ERROR ; - } - } - - if ( excess_len ) { - /* read and toss any data transmitted over buffer size */ - moving_buffer = size_buffer ; - size_buffer_len = TEMP_BUFFER_SIZE; - while ( excess_len ) - { - if (size_buffer_len > excess_len) - size_buffer_len = excess_len; - if ( ( recv_len = _slurm_recv_timeout ( open_fd , - moving_buffer , size_buffer_len , - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , - timeout ) ) == SLURM_SOCKET_ERROR ) - { - if ( errno == EINTR ) - continue ; - else - return SLURM_PROTOCOL_ERROR ; - } - else if ( recv_len > 0 ) - { - excess_len -= recv_len ; - } - else if ( recv_len == 0 ) - { - /*debug ( "Error datagram recv_len = 0 ") ; */ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_ZERO_RECV_LENGTH ) ; - return SLURM_PROTOCOL_ERROR ; - } - else - { - /*debug ( "Error datagram recv_len < -1") ;*/ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_NEGATIVE_RECV_LENGTH ) ; - return SLURM_PROTOCOL_ERROR ; - } - } - slurm_seterrno ( SLURM_COMMUNICATIONS_RECEIVE_ERROR ) ; - return SLURM_PROTOCOL_ERROR ; - } - - return total_len ; -} - -ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , - uint32_t flags ) -{ - return _slurm_msg_sendto_timeout ( open_fd, buffer , size , flags, - SLURM_MESSGE_TIMEOUT_MSEC_STATIC ) ; -} - -ssize_t _slurm_msg_sendto_timeout ( slurm_fd open_fd, char *buffer , - size_t size , uint32_t flags , int timeout ) -{ - size_t send_len ; - - uint32_t usize; - - struct sigaction newaction ; - struct sigaction oldaction ; - - newaction . sa_handler = SIG_IGN ; - - /* ignore SIGPIPE so that send can return a error code if the - * other side closes the socket */ - sigaction(SIGPIPE, &newaction , & oldaction ); - - usize = htonl(size); - - while ( true ) - { - if ( ( send_len = _slurm_send_timeout ( open_fd , - (char *) &usize , sizeof ( uint32_t ) , - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , - timeout ) ) == SLURM_PROTOCOL_ERROR ) - { - if ( errno == EINTR ) - continue ; - else - goto _slurm_msg_sendto_exit_error ; - } - else if ( send_len != sizeof ( uint32_t ) ) - { - /*debug ( "_slurm_msg_sendto only transmitted %i of %i bytes", - send_len , sizeof ( uint32_t ) ) ;*/ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_NOT_ALL_DATA_SENT ) ; - goto _slurm_msg_sendto_exit_error ; - } - else - break ; - } - while ( true ) - { - if ( ( send_len = _slurm_send_timeout ( open_fd , buffer , size , - SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , - timeout ) ) == SLURM_PROTOCOL_ERROR ) - { - if ( errno == EINTR ) - continue ; - else - goto _slurm_msg_sendto_exit_error ; - } - else if ( send_len != size ) - { - /*debug ( "_slurm_msg_sendto only transmitted %i of %i bytes", - send_len , size ) ;*/ - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_IMPL_NOT_ALL_DATA_SENT ) ; - goto _slurm_msg_sendto_exit_error ; - } - else - break ; - } - - sigaction(SIGPIPE, &oldaction , & newaction ); - return send_len ; - - _slurm_msg_sendto_exit_error: - sigaction(SIGPIPE, &oldaction , & newaction ); - return SLURM_PROTOCOL_ERROR ; -} - -int _slurm_send_timeout ( slurm_fd open_fd, char *buffer , size_t size , - uint32_t flags, int timeout ) -{ - int rc ; - int bytes_sent = 0 ; - int fd_flags ; - int msec_wait = 0; - struct pollfd ufds; - struct timeval start_time; - - ufds.fd = open_fd; - ufds.events = POLLOUT; - fd_flags = _slurm_fcntl ( open_fd , F_GETFL ) ; - _slurm_set_stream_non_blocking ( open_fd ) ; - gettimeofday(&start_time, NULL); - while ( bytes_sent < size ) - { - rc = poll(&ufds, 1, (timeout-msec_wait)); - if ( rc < 0 ) - { - if ( (errno != EINTR ) || - ((msec_wait = _tot_wait(&start_time)) >= timeout)) - goto _slurm_send_timeout_exit_error; - else - continue; - } - else if ( rc == 0 ) - { - slurm_seterrno ( SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT ) ; - goto _slurm_send_timeout_exit_error; - } - else - { - rc = _slurm_send ( open_fd, &buffer[bytes_sent] , - (size-bytes_sent) , flags ) ; - if ( rc == SLURM_PROTOCOL_ERROR || rc < 0 ) - { - if ( errno == EINTR ) - continue ; - else - goto _slurm_send_timeout_exit_error; - } - else if ( rc == 0 ) - { - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_ZERO_BYTES_SENT); - goto _slurm_send_timeout_exit_error; - } - else - { - bytes_sent += rc ; - } - } - } - - if ( fd_flags != SLURM_PROTOCOL_ERROR ) - { - _slurm_fcntl ( open_fd , F_SETFL , fd_flags ) ; - } - return bytes_sent ; - - _slurm_send_timeout_exit_error: - if ( fd_flags != SLURM_PROTOCOL_ERROR ) - { - _slurm_fcntl ( open_fd , F_SETFL , fd_flags ) ; - } - return SLURM_PROTOCOL_ERROR ; - -} - -int _slurm_recv_timeout ( slurm_fd open_fd, char *buffer , size_t size , - uint32_t flags, int timeout ) -{ - int rc ; - int bytes_recv = 0 ; - int fd_flags ; - int msec_wait = 0; - struct pollfd ufds; - struct timeval start_time; - - ufds.fd = open_fd; - ufds.events = POLLIN; - fd_flags = _slurm_fcntl ( open_fd , F_GETFL ) ; - _slurm_set_stream_non_blocking ( open_fd ) ; - gettimeofday(&start_time, NULL); - while ( bytes_recv < size ) - { - rc = poll(&ufds, 1, (timeout-msec_wait)); - if ( rc < 0 ) - { - if ( (errno != EINTR ) || /* Real error */ - ((msec_wait = _tot_wait(&start_time)) >= timeout)) - goto _slurm_recv_timeout_exit_error; - else - continue; - } - else if ( rc == 0 ) - { - slurm_seterrno ( SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT ) ; - goto _slurm_recv_timeout_exit_error; - } - else - { - rc = _slurm_recv ( open_fd, &buffer[bytes_recv], - (size-bytes_recv), flags ) ; - if ( (rc == SLURM_PROTOCOL_ERROR) || (rc < 0) ) - { - - if ( errno == EINTR ) - continue ; - else - goto _slurm_recv_timeout_exit_error; - } - else if ( rc == 0 ) - { - slurm_seterrno ( - SLURM_PROTOCOL_SOCKET_ZERO_BYTES_SENT); - goto _slurm_recv_timeout_exit_error; - } - else - { - bytes_recv += rc ; - break ; - } - } - } - if ( fd_flags != SLURM_PROTOCOL_ERROR ) - { - _slurm_fcntl ( open_fd , F_SETFL , fd_flags ) ; - } - return bytes_recv ; - - _slurm_recv_timeout_exit_error: - if ( fd_flags != SLURM_PROTOCOL_ERROR ) - { - _slurm_fcntl ( open_fd , F_SETFL , fd_flags ) ; - } - return SLURM_PROTOCOL_ERROR ; + return _slurm_open_stream ( slurm_address ) ; +} + +slurm_fd _slurm_accept_msg_conn (slurm_fd fd, slurm_addr *addr) +{ + return _slurm_accept_stream(fd, addr); +} + +/* + * This would be a no-op in a message implementation + */ +int _slurm_close_accepted_conn (slurm_fd fd) +{ + return _slurm_close (fd); +} + +ssize_t _slurm_msg_recvfrom(slurm_fd fd, char **pbuf, size_t *lenp, + uint32_t flags) +{ + return _slurm_msg_recvfrom_timeout(fd, pbuf, lenp, flags, + SLURM_MESSGE_TIMEOUT_MSEC_STATIC); +} + +ssize_t _slurm_msg_recvfrom_timeout(slurm_fd fd, char **pbuf, size_t *lenp, + uint32_t flags, int tmout) +{ + ssize_t len; + uint32_t msglen; + + len = _slurm_recv_timeout( fd, (char *)&msglen, + sizeof(msglen), 0, tmout ); + + if (len < 0) return SLURM_ERROR; + + if (len < ((ssize_t) sizeof(msglen))) + slurm_seterrno_ret(SLURM_COMMUNICATIONS_RECEIVE_ERROR); + + msglen = ntohl(msglen); + + if (msglen > MAX_MSG_SIZE) + slurm_seterrno_ret(SLURM_PROTOCOL_INSANE_MSG_LENGTH); + + /* + * Allocate memory on heap for message + */ + *pbuf = xmalloc(msglen); + + if (_slurm_recv_timeout(fd, *pbuf, msglen, 0, tmout) != msglen) { + xfree(*pbuf); + *pbuf = NULL; + return SLURM_PROTOCOL_ERROR; + } + + *lenp = msglen; + + return (ssize_t) msglen; +} + +ssize_t _slurm_msg_sendto(slurm_fd fd, char *buffer, size_t size, + uint32_t flags) +{ + return _slurm_msg_sendto_timeout( fd, buffer, size, flags, + SLURM_MESSGE_TIMEOUT_MSEC_STATIC); +} + +ssize_t _slurm_msg_sendto_timeout(slurm_fd fd, char *buffer, size_t size, + uint32_t flags, int timeout) +{ + size_t len; + uint32_t usize; + SigFunc *ohandler; + + /* + * Ignore SIGPIPE so that send can return a error code if the + * other side closes the socket + */ + ohandler = xsignal(SIGPIPE, SIG_IGN); + + usize = htonl(size); + + len = _slurm_send_timeout( fd, (char *)&usize, sizeof(usize), 0, + timeout ); + + if (len < sizeof(usize)) { + len = SLURM_PROTOCOL_ERROR; + goto done; + } + + if ((len = _slurm_send_timeout(fd, buffer, size, 0, timeout)) < 0) + goto done; + else if (len < size) { + len = SLURM_PROTOCOL_ERROR; + slurm_seterrno(SLURM_PROTOCOL_SOCKET_IMPL_NOT_ALL_DATA_SENT); + goto done; + } + + done: + xsignal(SIGPIPE, ohandler); + return len; +} + +int _slurm_send_timeout(slurm_fd fd, char *buf, size_t size, + uint32_t flags, int timeout) +{ + int rc; + int sent = 0; + int fd_flags; + struct pollfd ufds; + struct timeval tstart; + + ufds.fd = fd; + ufds.events = POLLOUT; + + fd_flags = _slurm_fcntl(fd, F_GETFL); + fd_set_nonblocking(fd); + + gettimeofday(&tstart, NULL); + + while (sent < size) { + if ((timeout -= _tot_wait(&tstart)) <= 0) { + slurm_seterrno(SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT); + sent = SLURM_ERROR; + goto done; + } + if ((rc = poll(&ufds, 1, timeout)) <= 0) { + if ((rc == 0) || (errno == EINTR)) { + continue; + else { + sent = SLURM_ERROR; + goto done; + } + } + rc = _slurm_send(fd, &buf[sent], (size - sent), flags) + if (rc < 0) { + if (errno == EINTR) + continue; + else { + sent = SLURM_ERROR; + goto done; + } + } + if (rc == 0) { + slurm_seterrno(SLURM_PROTOCOL_SOCKET_ZERO_BYTES_SENT); + sent = SLURM_ERROR; + goto done; + } + + sent += rc; + } + + done: + if (fd_flags != SLURM_PROTOCOL_ERROR) + _slurm_fcntl(fd , F_SETFL , fd_flags); + + return sent; + +} + +int _slurm_recv_timeout(slurm_fd fd, char *buffer, size_t size, + uint32_t flags, int timeout ) +{ + int rc; + int recvlen = 0; + int fval; + struct pollfd ufds; + struct timeval tstart; + + ufds.fd = fd; + ufds.events = POLLIN; + + fval = _slurm_fcntl(fd, F_GETFL); + fd_set_nonblocking(fd); + + gettimeofday(&tstart, NULL); + + while (recvlen < size) { + + if ((timeout -= _tot_wait(&tstart)) < 0) { + slurm_seterrno(SLURM_PROTOCOL_SOCKET_IMPL_TIMEOUT); + recvlen = SLURM_ERROR; + goto done; + } + + if ((rc = poll(&ufds, 1, timeout)) <= 0) { + if ((errno == EINTR) || (rc == 0)) + continue; + else { + recvlen = SLURM_ERROR; + goto done; + } + } + rc = _slurm_recv(fd, &buffer[recvlen], size - recvlen, flags); + if (rc < 0) { + if (errno == EINTR) + continue; + else { + recvlen = SLURM_ERROR; + goto done; + } + } + if (rc == 0) { + slurm_seterrno(SLURM_PROTOCOL_SOCKET_ZERO_BYTES_SENT); + recvlen = SLURM_ERROR; + goto done; + } + recvlen += rc; + } + + done: + /* + * Reset fd flags to prior state + */ + if (fval != SLURM_PROTOCOL_ERROR) + _slurm_fcntl(fd, F_SETFL, fval); + + return recvlen; } int _slurm_shutdown_msg_engine ( slurm_fd open_fd ) { - return _slurm_close ( open_fd ) ; -} - -slurm_fd _slurm_listen_stream ( slurm_addr * slurm_address ) -{ - int rc ; - slurm_fd connection_fd ; - const int one = 1; - if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) - == SLURM_SOCKET_ERROR ) - { - debug ( "Error creating slurm stream socket: %m" ) ; - return connection_fd ; - } - - if ( ( rc = _slurm_setsockopt(connection_fd , SOL_SOCKET, - SO_REUSEADDR, &one, sizeof(one) ) ) ) - { - debug ("setsockopt SO_REUSEADDR failed"); - goto error_cleanup ; - } - - if ( ( rc = _slurm_bind ( connection_fd , - ( struct sockaddr const * ) slurm_address , - sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR ) - { - debug ( "Error binding slurm stream socket: %m" ) ; - goto error_cleanup ; - } - - if ( ( rc = _slurm_listen ( connection_fd , - SLURM_PROTOCOL_DEFAULT_LISTEN_BACKLOG ) ) - == SLURM_SOCKET_ERROR ) - { - debug ( "Error listening on slurm stream socket: %m" ) ; - goto error_cleanup ; - } - - - return connection_fd ; - - error_cleanup: - _slurm_close_stream ( connection_fd ) ; - return rc; - + return _slurm_close ( open_fd ) ; +} + +slurm_fd _slurm_listen_stream(slurm_addr *addr) +{ + int rc; + slurm_fd fd; + const int one = 1; + const size_t sz1 = sizeof(one); + + if ((fd = _slurm_create_socket(SLURM_STREAM)) < 0) { + debug("Error creating slurm stream socket: %m"); + return fd; + } + + rc = _slurm_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sz1); + if (rc < 0) { + debug("setsockopt SO_REUSEADDR failed: %m"); + goto error; + } + + rc = _slurm_bind(fd, (struct sockaddr const *) addr, sizeof(*addr)); + if (rc < 0) { + debug("Error binding slurm stream socket: %m"); + goto error; + } + + if (_slurm_listen(fd, SLURM_PROTOCOL_DEFAULT_LISTEN_BACKLOG) < 0) { + debug ( "Error listening on slurm stream socket: %m" ) ; + rc = SLURM_ERROR; + goto error; + } + + return fd; + + error: + _slurm_close_stream(fd); + return rc; + } slurm_fd _slurm_accept_stream(slurm_fd fd, slurm_addr *addr) { - socklen_t len = sizeof(slurm_addr); - return _slurm_accept(fd, (struct sockaddr *)addr, &len); + socklen_t len = sizeof(slurm_addr); + return _slurm_accept(fd, (struct sockaddr *)addr, &len); } -slurm_fd _slurm_open_stream ( slurm_addr * slurm_address ) +slurm_fd _slurm_open_stream(slurm_addr *addr) { - int rc ; - slurm_fd connection_fd ; + int rc; + slurm_fd fd; + + if ( (addr->sin_family == 0) || (addr->sin_port == 0) ) + return SLURM_SOCKET_ERROR; - if ( (slurm_address->sin_family == 0) && - (slurm_address->sin_port == 0) ) - { - error ( "Attempt to open socket with null address" ); - return SLURM_SOCKET_ERROR; - } + if ((fd =_slurm_create_socket(SLURM_STREAM)) < 0) { + debug("Error creating slurm stream socket: %m"); + return fd; + } - if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) - == SLURM_SOCKET_ERROR ) - { - debug ( "Error creating slurm stream socket: %m" ) ; - return connection_fd ; - } + rc = _slurm_connect(fd, (struct sockaddr const *)addr, sizeof(*addr)); - if ( ( rc = _slurm_connect ( connection_fd , - ( struct sockaddr const * ) slurm_address , - sizeof ( slurm_addr ) ) ) - == SLURM_SOCKET_ERROR ) - { - debug ( "Error connecting on slurm stream socket: %m" ) ; - goto error_cleanup ; - } + if (rc < 0) goto error; - return connection_fd ; + return fd; - error_cleanup: - _slurm_close_stream ( connection_fd ) ; - return rc; + error: + _slurm_close_stream(fd); + return rc; } - -int _slurm_get_stream_addr ( slurm_fd open_fd , slurm_addr * address ) + +int _slurm_get_stream_addr(slurm_fd fd, slurm_addr *addr ) { - int size ; - - size = sizeof ( address ) ; - return _slurm_getsockname ( open_fd , ( struct sockaddr * ) address , - & size ) ; + int size = sizeof(addr); + return _slurm_getsockname(fd, (struct sockaddr *)addr, &size); } int _slurm_close_stream ( slurm_fd open_fd ) { - return _slurm_close ( open_fd ) ; + return _slurm_close ( open_fd ) ; } -int _slurm_set_stream_non_blocking ( slurm_fd open_fd ) +int _slurm_set_stream_non_blocking(slurm_fd fd) { - int flags ; - if ( ( flags = _slurm_fcntl ( open_fd , F_GETFL ) ) - == SLURM_SOCKET_ERROR ) - { - return SLURM_SOCKET_ERROR ; - } - flags |= O_NONBLOCK ; - - return _slurm_fcntl ( open_fd , F_SETFL , flags ) ; + fd_set_nonblocking(fd); + return SLURM_SUCCESS; } -int _slurm_set_stream_blocking ( slurm_fd open_fd ) +int _slurm_set_stream_blocking(slurm_fd fd) { - int flags ; - if ( ( flags = _slurm_fcntl ( open_fd , F_GETFL ) ) - == SLURM_SOCKET_ERROR ) - { - return SLURM_SOCKET_ERROR ; - } - flags &= !O_NONBLOCK ; - return _slurm_fcntl ( open_fd , F_SETFL , flags ) ; + fd_set_blocking(fd); + return SLURM_SUCCESS; } extern int _slurm_socket (int __domain, int __type, int __protocol) { - return socket ( __domain, __type, __protocol ) ; -} + return socket ( __domain, __type, __protocol ) ; +} extern slurm_fd _slurm_create_socket ( slurm_socket_type_t type ) { - switch ( type ) - { - case SLURM_STREAM : - return _slurm_socket ( AF_INET, SOCK_STREAM, - IPPROTO_TCP) ; - break; - case SLURM_MESSAGE : - return _slurm_socket ( AF_INET, SOCK_DGRAM, - IPPROTO_UDP ) ; - break; - default : - return SLURM_SOCKET_ERROR; - } + switch ( type ) + { + case SLURM_STREAM : + return _slurm_socket ( AF_INET, SOCK_STREAM, + IPPROTO_TCP) ; + break; + case SLURM_MESSAGE : + return _slurm_socket ( AF_INET, SOCK_DGRAM, + IPPROTO_UDP ) ; + break; + default : + return SLURM_SOCKET_ERROR; + } } /* Create two new sockets, of type TYPE in domain DOMAIN and using @@ -623,23 +433,23 @@ extern slurm_fd _slurm_create_socket ( slurm_socket_type_t type ) * descriptors for them in FDS[0] and FDS[1]. If PROTOCOL is zero, * one will be chosen automatically. Returns 0 on success, -1 for errors. */ extern int _slurm_socketpair (int __domain, int __type, - int __protocol, int __fds[2]) + int __protocol, int __fds[2]) { - return SLURM_PROTOCOL_FUNCTION_NOT_IMPLEMENTED ; + return SLURM_PROTOCOL_FUNCTION_NOT_IMPLEMENTED ; } /* Give the socket FD the local address ADDR (which is LEN bytes long). */ extern int _slurm_bind (int __fd, struct sockaddr const * __addr, - socklen_t __len) + socklen_t __len) { - return bind ( __fd , __addr , __len ) ; + return bind ( __fd , __addr , __len ) ; } /* Put the local address of FD into *ADDR and its length in *LEN. */ extern int _slurm_getsockname (int __fd, struct sockaddr * __addr, - socklen_t *__restrict __len) + socklen_t *__restrict __len) { - return getsockname ( __fd , __addr , __len ) ; + return getsockname ( __fd , __addr , __len ) ; } /* Open a connection on socket FD to peer at ADDR (which LEN bytes long). @@ -647,85 +457,84 @@ extern int _slurm_getsockname (int __fd, struct sockaddr * __addr, * and the only address from which to accept transmissions. * Return 0 on success, -1 for errors. */ extern int _slurm_connect (int __fd, struct sockaddr const * __addr, - socklen_t __len) + socklen_t __len) { - return connect ( __fd , __addr , __len ) ; + return connect ( __fd , __addr , __len ) ; } /* Put the address of the peer connected to socket FD into *ADDR * (which is *LEN bytes long), and its actual length into *LEN. */ extern int _slurm_getpeername (int __fd, struct sockaddr * __addr, - socklen_t *__restrict __len) + socklen_t *__restrict __len) { - return getpeername ( __fd , __addr , __len ) ; + return getpeername ( __fd , __addr , __len ) ; } /* Send N bytes of BUF to socket FD. Returns the number sent or -1. */ extern ssize_t _slurm_send (int __fd, __const void *__buf, size_t __n, - int __flags) + int __flags) { - return send ( __fd , __buf , __n , __flags ) ; + return send ( __fd , __buf , __n , __flags ) ; } /* Read N bytes into BUF from socket FD. * Returns the number read or -1 for errors. */ extern ssize_t _slurm_recv (int __fd, void *__buf, size_t __n, int __flags) { - return recv ( __fd , __buf , __n , __flags ) ; + return recv ( __fd , __buf , __n , __flags ) ; } /* Send N bytes of BUF on socket FD to peer at address ADDR (which is * ADDR_LEN bytes long). Returns the number sent, or -1 for errors. */ -extern ssize_t _slurm_sendto (int __fd, __const void *__buf, size_t __n, - int __flags, struct sockaddr const * __addr, - socklen_t __addr_len) +extern ssize_t _slurm_sendto (int __fd, __const void *__buf, size_t __n, int __flags, struct sockaddr const * __addr, + socklen_t __addr_len) { - return sendto ( __fd , __buf , __n , __flags , __addr, __addr_len) ; + return sendto ( __fd , __buf , __n , __flags , __addr, __addr_len) ; } /* Read N bytes into BUF through socket FD. * If ADDR is not NULL, fill in *ADDR_LEN bytes of it with tha address of * the sender, and store the actual size of the address in *ADDR_LEN. * Returns the number of bytes read or -1 for errors. */ extern ssize_t _slurm_recvfrom (int __fd, void *__restrict __buf, - size_t __n, int __flags, - struct sockaddr * __addr, - socklen_t *__restrict __addr_len) + size_t __n, int __flags, + struct sockaddr * __addr, + socklen_t *__restrict __addr_len) { - return recvfrom ( __fd , __buf , __n , __flags , __addr, __addr_len) ; + return recvfrom ( __fd , __buf , __n , __flags , __addr, __addr_len) ; } /* Send a msg described MESSAGE on socket FD. * Returns the number of bytes sent, or -1 for errors. */ extern ssize_t _slurm_sendmsg (int __fd, __const struct msghdr *__msg, - int __flags) + int __flags) { - return sendmsg ( __fd , __msg , __flags ) ; + return sendmsg ( __fd , __msg , __flags ) ; } /* Send a msg described MESSAGE on socket FD. * Returns the number of bytes read or -1 for errors. */ extern ssize_t _slurm_recvmsg (int __fd, struct msghdr *__msg, int __flags) { - return recvmsg ( __fd , __msg , __flags ); + return recvmsg ( __fd , __msg , __flags ); } /* Put the current value for socket FD's option OPTNAME at protocol level LEVEL * into OPTVAL (which is *OPTLEN bytes long), and set *OPTLEN to the value's * actual length. Returns 0 on success, -1 for errors. */ extern int _slurm_getsockopt (int __fd, int __level, int __optname, - void *__restrict __optval, - socklen_t *__restrict __optlen) + void *__restrict __optval, + socklen_t *__restrict __optlen) { - return getsockopt ( __fd , __level , __optname , __optval , __optlen ) ; + return getsockopt ( __fd , __level , __optname , __optval , __optlen ) ; } /* Set socket FD's option OPTNAME at protocol level LEVEL * to *OPTVAL (which is OPTLEN bytes long). * Returns 0 on success, -1 for errors. */ extern int _slurm_setsockopt (int __fd, int __level, int __optname, - __const void *__optval, socklen_t __optlen) + __const void *__optval, socklen_t __optlen) { - return setsockopt ( __fd , __level , __optname , __optval , __optlen ) ; + return setsockopt ( __fd , __level , __optname , __optval , __optlen ) ; } @@ -734,7 +543,7 @@ extern int _slurm_setsockopt (int __fd, int __level, int __optname, * Returns 0 on success, -1 for errors. */ extern int _slurm_listen (int __fd, int __n) { - return listen ( __fd , __n ) ; + return listen ( __fd , __n ) ; } /* Await a connection on socket FD. @@ -743,9 +552,9 @@ extern int _slurm_listen (int __fd, int __n) * peer and *ADDR_LEN to the address's actual length, and return the * new socket's descriptor, or -1 for errors. */ extern int _slurm_accept (int __fd, struct sockaddr * __addr, - socklen_t *__restrict __addr_len) + socklen_t *__restrict __addr_len) { - return accept ( __fd , __addr , __addr_len ) ; + return accept ( __fd , __addr , __addr_len ) ; } /* Shut down all or part of the connection open on socket FD. @@ -756,128 +565,136 @@ extern int _slurm_accept (int __fd, struct sockaddr * __addr, * Returns 0 on success, -1 for errors. */ extern int _slurm_shutdown (int __fd, int __how) { - return shutdown ( __fd , __how ); + return shutdown ( __fd , __how ); } extern int _slurm_close (int __fd ) { - return close ( __fd ) ; + return close ( __fd ) ; } extern int _slurm_fcntl(int fd, int cmd, ... ) { - int rc ; - va_list va ; + int rc ; + va_list va ; - va_start ( va , cmd ) ; - rc =_slurm_vfcntl ( fd , cmd , va ) ; - va_end ( va ) ; - return rc ; + va_start ( va , cmd ) ; + rc =_slurm_vfcntl ( fd , cmd , va ) ; + va_end ( va ) ; + return rc ; } extern int _slurm_vfcntl(int fd, int cmd, va_list va ) { - long arg ; + long arg ; - switch ( cmd ) - { - case F_GETFL : - return fcntl ( fd , cmd ) ; - break ; - case F_SETFL : - arg = va_arg ( va , long ) ; - return fcntl ( fd , cmd , arg) ; - break ; - default : - return SLURM_PROTOCOL_ERROR ; - break ; - } + switch ( cmd ) + { + case F_GETFL : + return fcntl ( fd , cmd ) ; + break ; + case F_SETFL : + arg = va_arg ( va , long ) ; + return fcntl ( fd , cmd , arg) ; + break ; + default : + return SLURM_PROTOCOL_ERROR ; + break ; + } } /* sets the fields of a slurm_addr */ -void _slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , - uint32_t ip_address ) +void _slurm_set_addr_uint (slurm_addr *addr, uint16_t port, uint32_t ipaddr) { - slurm_address -> sin_family = AF_SLURM ; - slurm_address -> sin_port = htons ( port ) ; - slurm_address -> sin_addr.s_addr = htonl ( ip_address ) ; + addr->sin_family = AF_SLURM ; + addr->sin_port = htons(port); + addr->sin_addr.s_addr = htonl(ipaddr); } /* resets the address field of a slurm_addr, port and family are unchanged */ -void _reset_slurm_addr ( slurm_addr * slurm_address , slurm_addr new_address ) -{ - slurm_address -> sin_addr.s_addr = new_address.sin_addr.s_addr ; -} - -void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , - char * host ) -{ - /* If NULL hostname passed in, we only update the port - * of slurm_address - */ - if (host != NULL) { - struct hostent * host_info; - char hostent_buf[TEMP_BUFFER_SIZE]; - host_info = get_host_by_name( host, (void *) &hostent_buf, - sizeof(hostent_buf), NULL ); - if (host_info == NULL) { - error ("get_host_by_name failure on %s", host); - slurm_address->sin_family = 0; - slurm_address->sin_port = 0; - return; - } - memcpy ( & slurm_address -> sin_addr . s_addr , - host_info -> h_addr , host_info -> h_length ) ; - } - slurm_address -> sin_family = AF_SLURM ; - slurm_address -> sin_port = htons ( port ) ; -} - -void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , - char * host , unsigned int buf_len ) -{ - struct hostent * host_info; - char hostent_buf[TEMP_BUFFER_SIZE]; - - host_info = get_host_by_addr ( - ( char * ) &( slurm_address -> sin_addr . s_addr ) , - sizeof ( slurm_address -> sin_addr . s_addr ) , - AF_SLURM, - (void *) &hostent_buf, sizeof(hostent_buf), NULL ); - if (host_info == NULL) { - error ("get_host_by_addr failure: %m"); - *port = 0; - strncpy ( host, "", buf_len); - } else { - *port = slurm_address -> sin_port ; - strncpy ( host , host_info -> h_name , buf_len ) ; - } +void _reset_slurm_addr (slurm_addr *addr, slurm_addr new_addr) +{ + addr->sin_addr.s_addr = new_addr.sin_addr.s_addr; +} + +void _slurm_set_addr_char (slurm_addr * addr, uint16_t port, char *host) +{ + struct hostent * he = NULL; + int h_err = 0; + char * h_buf[4096]; + + /* + * If NULL hostname passed in, we only update the port + * of addr + */ + addr->sin_family = AF_SLURM; + addr->sin_port = htons(port); + if (host == NULL) + return; + + he = get_host_by_name(host, (void *)&h_buf, sizeof(h_buf), &h_err); + + if (he != NULL) + memcpy (&addr->sin_addr.s_addr, he->h_addr, he->h_length); + else { + error("Unable to resolve \"%s\": %s", host, hstrerror(h_err)); + addr->sin_family = 0; + addr->sin_port = 0; + } + return; +} + +void _slurm_get_addr (slurm_addr *addr, uint16_t *port, char *host, + unsigned int buflen ) +{ + struct hostent *he; + char h_buf[4096]; + int h_err = 0; + char * s_addr = (char *) &addr->sin_addr.s_addr; + int len = sizeof(addr->sin_addr.s_addr); + + he = get_host_by_addr( s_addr, len, AF_SLURM, + (void *) &h_buf, sizeof(h_buf), &h_err ); + + if (he != NULL) { + *port = addr->sin_port; + strncpy(host, he->h_name, buflen); + } else { + error("Lookup failed: %s", host_strerror(h_err)); + *port = 0; + strncpy(host, "", buflen); + } + return; } void _slurm_print_slurm_addr ( slurm_addr * address, char *buf, size_t n ) { - char addrbuf[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &address->sin_addr, addrbuf, INET_ADDRSTRLEN); - /* warning: silently truncates */ - snprintf(buf, n, "%s:%d", addrbuf, ntohs(address->sin_port)); + char addrbuf[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &address->sin_addr, addrbuf, INET_ADDRSTRLEN); + /* warning: silently truncates */ + snprintf(buf, n, "%s:%d", addrbuf, ntohs(address->sin_port)); } - -void _slurm_pack_slurm_addr ( slurm_addr * slurm_address , Buf buffer ) + +void _slurm_pack_slurm_addr(slurm_addr *addr, Buf buffer) { - pack32 ( ntohl ( slurm_address -> sin_addr.s_addr ) , buffer ) ; - pack16 ( ntohs ( slurm_address -> sin_port ) , buffer ) ; + pack32( ntohl( addr->sin_addr.s_addr ), buffer ); + pack16( ntohs( addr->sin_port ), buffer ); } -int _slurm_unpack_slurm_addr_no_alloc ( slurm_addr * slurm_address , Buf buffer ) +int _slurm_unpack_slurm_addr_no_alloc(slurm_addr *addr, Buf buffer) { - slurm_address -> sin_family = AF_SLURM ; - safe_unpack32 ( & slurm_address -> sin_addr.s_addr , buffer ) ; - slurm_address -> sin_addr.s_addr = - htonl ( slurm_address -> sin_addr.s_addr ); - safe_unpack16 ( & slurm_address -> sin_port , buffer ) ; - slurm_address -> sin_port = htons ( slurm_address -> sin_port ) ; - return SLURM_SUCCESS; + addr->sin_family = AF_SLURM ; + safe_unpack32(&addr->sin_addr.s_addr, buffer); + safe_unpack16(&addr->sin_port, buffer); + + addr->sin_addr.s_addr = htonl(addr->sin_addr.s_addr); + addr->sin_port = htons(addr->sin_port); + return SLURM_SUCCESS; unpack_error: - return SLURM_ERROR; + return SLURM_ERROR; } + +/* + * vi: tabstop=8 shiftwidth=8 expandtab + */