From 22c8e808a53e52833baef553e22e355fc480e91a Mon Sep 17 00:00:00 2001 From: tewk <tewk@unknown> Date: Thu, 22 Aug 2002 01:47:54 +0000 Subject: [PATCH] Added timeout on socket calls --- src/common/slurm_protocol_api.c | 38 +++++++++++++++ src/common/slurm_protocol_api.h | 2 + src/common/slurm_protocol_common.h | 4 ++ src/common/slurm_protocol_interface.h | 5 ++ src/common/slurm_protocol_socket_common.h | 1 - .../slurm_protocol_socket_implementation.c | 47 +++++++++++-------- src/slurmd/nbio.c | 6 ++- 7 files changed, 81 insertions(+), 22 deletions(-) diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index de6e49fae26..2a31f434e20 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -424,6 +424,43 @@ slurm_fd slurm_open_stream ( slurm_addr * slurm_address ) return _slurm_open_stream ( slurm_address ) ; } +size_t slurm_write_stream_timeout_tv ( slurm_fd open_fd , char * buffer , size_t size , struct timeval * timeout ) +{ + return _slurm_send_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ; +} + +size_t slurm_read_stream_timeout_tv ( slurm_fd open_fd , char * buffer , size_t size , struct timeval * timeout ) +{ + return _slurm_recv_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ; +} + +size_t slurm_write_stream_timeout ( slurm_fd open_fd , char * buffer , size_t size , int timeout ) +{ + struct timeval time_out ; + time_out . tv_sec = timeout ; + time_out . tv_usec = 0 ; + return _slurm_send_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & time_out ) ; +} + +size_t slurm_read_stream_timeout ( slurm_fd open_fd , char * buffer , size_t size , int timeout ) +{ + struct timeval time_out ; + time_out . tv_sec = timeout ; + time_out . tv_usec = 0 ; + return _slurm_recv_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & time_out ) ; +} + +size_t slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size ) +{ + return _slurm_send_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , SLURM_MESSGE_TIMEOUT_SEC ) ; +} + +size_t slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size ) +{ + return _slurm_recv_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , SLURM_MESSGE_TIMEOUT_SEC ) ; +} + +/* size_t slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size ) { int rc ; @@ -469,6 +506,7 @@ size_t slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size ) } } } +*/ int slurm_get_stream_addr ( slurm_fd open_fd , slurm_addr * address ) { diff --git a/src/common/slurm_protocol_api.h b/src/common/slurm_protocol_api.h index 5e035363f8a..647172486b5 100644 --- a/src/common/slurm_protocol_api.h +++ b/src/common/slurm_protocol_api.h @@ -215,6 +215,7 @@ int inline slurm_close_stream ( slurm_fd open_fd ) ; * RET size_t - bytes sent , or -1 on errror */ size_t inline slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size ) ; +size_t inline slurm_write_stream_timeout ( slurm_fd open_fd , char * buffer , size_t size , int timeout ) ; /* slurm_read_stream * read into buffer grom a stream file descriptor @@ -224,6 +225,7 @@ size_t inline slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t siz * RET size_t - bytes read , or -1 on errror */ size_t inline slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size ) ; +size_t inline slurm_read_stream_timeout ( slurm_fd open_fd , char * buffer , size_t size , int timeout ) ; /* slurm_get_stream_addr * esentially a encapsilated get_sockname diff --git a/src/common/slurm_protocol_common.h b/src/common/slurm_protocol_common.h index 4629effb53c..f43c926c045 100644 --- a/src/common/slurm_protocol_common.h +++ b/src/common/slurm_protocol_common.h @@ -1,7 +1,10 @@ #ifndef _SLURM_PROTOCOL_COMMON_H #define _SLURM_PROTOCOL_COMMON_H +#include <sys/time.h> +#include <time.h> #include <src/common/slurm_errno.h> + /* for sendto and recvfrom commands */ #define SLURM_PROTOCOL_NO_SEND_RECV_FLAGS 0 /* for accpet commands */ @@ -15,6 +18,7 @@ this may need to be increased to 350k-512k */ #define SLURM_PROTOCOL_VERSION 1 #define SLURM_PROTOCOL_NO_FLAGS 0 /* used in the header to set flags to empty */ +extern struct timeval * SLURM_MESSGE_TIMEOUT_SEC ; #if MONGO_IMPLEMENTATION # include <src/common/slurm_protocol_mongo_common.h> diff --git a/src/common/slurm_protocol_interface.h b/src/common/slurm_protocol_interface.h index 3dc9e0c6fb7..5ed26762bb0 100644 --- a/src/common/slurm_protocol_interface.h +++ b/src/common/slurm_protocol_interface.h @@ -71,7 +71,9 @@ slurm_fd _slurm_create_socket (slurm_socket_type_t type) ; slurm_fd _slurm_init_msg_engine ( slurm_addr * slurm_address ) ; slurm_fd _slurm_open_msg_conn ( slurm_addr * slurm_address ) ; ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ; +ssize_t _slurm_msg_recvfrom_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address , struct timeval * timeout) ; ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ; +ssize_t _slurm_msg_sendto_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address , struct timeval * timeout ) ; slurm_fd _slurm_accept_msg_conn ( slurm_fd open_fd , slurm_addr * slurm_address ) ; int _slurm_close_accepted_conn ( slurm_fd open_fd ) ; @@ -87,6 +89,9 @@ extern int _slurm_close_stream ( slurm_fd open_fd ) ; extern inline int _slurm_set_stream_non_blocking ( slurm_fd open_fd ) ; extern inline int _slurm_set_stream_blocking ( slurm_fd open_fd ) ; +int _slurm_send_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, struct timeval * timeout ) ; +int _slurm_recv_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, struct timeval * timeout ) ; + /***************************/ /* slurm address functions */ /***************************/ diff --git a/src/common/slurm_protocol_socket_common.h b/src/common/slurm_protocol_socket_common.h index bf41053c18d..9867ac75b01 100644 --- a/src/common/slurm_protocol_socket_common.h +++ b/src/common/slurm_protocol_socket_common.h @@ -18,7 +18,6 @@ #define AF_SLURM AF_INET #define SLURM_INADDR_ANY 0x00000000 -#define SLURM_MESSGE_TIMEOUT_SEC 10 /* LINUX SPECIFIC */ /* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */ diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index a1d9c39abba..badb6c1091b 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -50,6 +50,12 @@ #include <src/common/log.h> #include <src/common/pack.h> +/* global constants */ +struct timeval SLURM_MESSGE_TIMEOUT_SEC_STATIC = { tv_sec:10L , tv_usec:0 } ; +struct timeval * SLURM_MESSGE_TIMEOUT_SEC = & SLURM_MESSGE_TIMEOUT_SEC_STATIC ; + + +/* internal static prototypes */ /***************************************************************** * MIDDLE LAYER MSG FUNCTIONS ****************************************************************/ @@ -75,6 +81,11 @@ int _slurm_close_accepted_conn ( slurm_fd open_fd ) } ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) +{ + return _slurm_msg_recvfrom_timeout ( open_fd , buffer , size , flags , slurm_address , SLURM_MESSGE_TIMEOUT_SEC ) ; +} + +ssize_t _slurm_msg_recvfrom_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address , struct timeval * timeout) { size_t recv_len ; @@ -89,8 +100,8 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin total_len = 0 ; while ( total_len < sizeof ( uint32_t ) ) { - //if ( ( recv_len = _slurm_recv_timeout ( open_fd , moving_buffer , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , SLURM_MESSGE_TIMEOUT_SEC ) ) == SLURM_SOCKET_ERROR ) - if ( ( recv_len = _slurm_recv ( open_fd , moving_buffer , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) + //if ( ( recv_len = _slurm_recv ( open_fd , moving_buffer , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) + if ( ( recv_len = _slurm_recv_timeout ( open_fd , moving_buffer , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ) == SLURM_SOCKET_ERROR ) { if ( errno == EINTR ) { @@ -125,8 +136,8 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin total_len = 0 ; while ( total_len < transmit_size ) { - //if ( ( recv_len = _slurm_recv_timeout ( open_fd , moving_buffer , transmit_size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , SLURM_MESSGE_TIMEOUT_SEC ) ) == SLURM_SOCKET_ERROR ) - if ( ( recv_len = _slurm_recv ( open_fd , moving_buffer , transmit_size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) + //if ( ( recv_len = _slurm_recv ( open_fd , moving_buffer , transmit_size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) + if ( ( recv_len = _slurm_recv_timeout ( open_fd , moving_buffer , transmit_size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ) == SLURM_SOCKET_ERROR ) { if ( errno == EINTR ) { @@ -161,6 +172,11 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin } ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) +{ + return _slurm_msg_sendto_timeout ( open_fd, buffer , size , flags, slurm_address , SLURM_MESSGE_TIMEOUT_SEC ) ; +} + +ssize_t _slurm_msg_sendto_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address , struct timeval * timeout ) { size_t send_len ; @@ -180,8 +196,8 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3 while ( true ) { - //if ( ( send_len = _slurm_send ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , SLURM_MESSGE_TIMEOUT_SEC ) ) == SLURM_PROTOCOL_ERROR ) - if ( ( send_len = _slurm_send ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS) ) == SLURM_PROTOCOL_ERROR ) + //if ( ( send_len = _slurm_send ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS) ) == SLURM_PROTOCOL_ERROR ) + if ( ( send_len = _slurm_send_timeout ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ) == SLURM_PROTOCOL_ERROR ) { if ( errno == EINTR ) { @@ -210,8 +226,8 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3 } while ( true ) { - //if ( ( send_len = _slurm_send ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , SLURM_MESSGE_TIMEOUT_SEC ) ) == SLURM_PROTOCOL_ERROR ) - if ( ( send_len = _slurm_send ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_PROTOCOL_ERROR ) + //if ( ( send_len = _slurm_send ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_PROTOCOL_ERROR ) + if ( ( send_len = _slurm_send_timeout ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , timeout ) ) == SLURM_PROTOCOL_ERROR ) { if ( errno == EINTR ) { @@ -246,16 +262,12 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3 return SLURM_PROTOCOL_ERROR ; } -int _slurm_send_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, int timeout ) +int _slurm_send_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, struct timeval * timeout ) { int rc ; int bytes_sent = 0 ; int fd_flags ; _slurm_fd_set set ; - struct timeval time_out_val ; - - time_out_val . tv_sec = timeout ; - time_out_val . tv_usec = 0 ; _slurm_FD_ZERO ( & set ) ; fd_flags = _slurm_fcntl ( open_fd , F_GETFL ) ; @@ -263,7 +275,7 @@ int _slurm_send_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_ while ( bytes_sent < size ) { _slurm_FD_SET ( open_fd , &set ) ; - rc = _slurm_select ( open_fd + 1 , NULL , & set, NULL , & time_out_val ) ; + rc = _slurm_select ( open_fd + 1 , NULL , & set, NULL , timeout ) ; if ( rc == SLURM_PROTOCOL_ERROR || rc < 0 ) { if ( errno == EINTR ) @@ -322,16 +334,13 @@ int _slurm_send_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_ } -int _slurm_recv_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, int timeout ) +int _slurm_recv_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, struct timeval * timeout ) { int rc ; int bytes_recv = 0 ; int fd_flags ; _slurm_fd_set set ; - struct timeval time_out_val ; - time_out_val . tv_sec = timeout ; - time_out_val . tv_usec = 0 ; _slurm_FD_ZERO ( & set ) ; fd_flags = _slurm_fcntl ( open_fd , F_GETFL ) ; @@ -339,7 +348,7 @@ int _slurm_recv_timeout ( slurm_fd open_fd, char *buffer , size_t size , uint32_ while ( bytes_recv < size ) { _slurm_FD_SET ( open_fd , &set ) ; - rc = _slurm_select ( open_fd + 1 , & set , NULL , NULL , & time_out_val ) ; + rc = _slurm_select ( open_fd + 1 , & set , NULL , NULL , timeout ) ; if ( rc == SLURM_PROTOCOL_ERROR || rc < 0 ) { if ( errno == EINTR ) diff --git a/src/slurmd/nbio.c b/src/slurmd/nbio.c index e0ef98912b1..ee874d5922d 100644 --- a/src/slurmd/nbio.c +++ b/src/slurmd/nbio.c @@ -483,8 +483,10 @@ int error_task_socket ( nbio_attr_t * nbio_attr , int fd_index ) case ESLURMD_UNKNOWN_SOCKET_ERROR : case ESLURMD_SOCKET_DISCONNECT : case ESLURMD_EOF_ON_SOCKET : - slurm_close_stream ( nbio_attr -> fd [fd_index] ) ; - nbio_attr -> fd [fd_index] = -1 ; + if ( !slurm_close_stream ( nbio_attr -> fd [fd_index] ) ) ; + { + nbio_attr -> fd [fd_index] = -1 ; + } switch ( nbio_attr -> reconnect_flags[fd_index] ) { case CONNECTED : -- GitLab