diff --git a/src/slurmd/Makefile.am b/src/slurmd/Makefile.am index ba01a920df2420bb297b6311b3bcd7c758e43576..50443a7b566bdb5b03ce622f8895ce3e781e77a0 100644 --- a/src/slurmd/Makefile.am +++ b/src/slurmd/Makefile.am @@ -15,5 +15,7 @@ slurmd_SOURCES = slurmd.c \ task_mgr.c \ shmem_struct.c \ circular_buffer.c \ - credential_utils.c + credential_utils.c \ + io.c \ + pipes.c diff --git a/src/slurmd/io.c b/src/slurmd/io.c new file mode 100644 index 0000000000000000000000000000000000000000..cc69f9508696a5980cf659cac48f578aac70eb9d --- /dev/null +++ b/src/slurmd/io.c @@ -0,0 +1,435 @@ +#include <stdlib.h> +#include <sys/types.h> +#include <pwd.h> +#include <grp.h> +#include <sys/wait.h> +#include <errno.h> +#include <unistd.h> +#include <string.h> +#include <pthread.h> + +#include <src/common/log.h> +#include <src/common/list.h> +#include <src/common/xerrno.h> +#include <src/common/xmalloc.h> +#include <src/common/slurm_protocol_api.h> +#include <src/common/slurm_errno.h> +#include <src/common/util_signals.h> + +#include <src/slurmd/task_mgr.h> +#include <src/slurmd/shmem_struct.h> +#include <src/slurmd/circular_buffer.h> +#include <src/slurmd/io.h> +#include <src/slurmd/pipes.h> + +/* global variables */ +int connect_io_stream ( task_start_t * task_start , int out_or_err ) ; +int send_io_stream_header ( task_start_t * task_start , int out_or_err ) ; +ssize_t read_EINTR(int fd, void *buf, size_t count) ; + + +/****************************************************************** + *task launch method call hierarchy + * + *launch_tasks() + * interconnect_init() + * fan_out_task_launch() (pthread_create) + * task_exec_thread() (fork) for task exec + * task_exec_thread() (pthread_create) for io piping + ******************************************************************/ +int forward_io ( task_start_t * task_start ) +{ + pthread_attr_t pthread_attr ; + +#define STDIN_OUT_SOCK 0 +#define SIG_STDERR_SOCK 1 + + posix_signal_pipe_ignore ( ) ; + + /* open stdout*/ + connect_io_stream ( task_start , STDIN_OUT_SOCK ) ; + /* open stderr*/ + connect_io_stream ( task_start , SIG_STDERR_SOCK ) ; + + /* spawn io pipe threads */ + /* set detatch state */ + pthread_attr_init( & pthread_attr ) ; + /*pthread_attr_setdetachstate ( & pthread_attr , PTHREAD_CREATE_DETACHED ) ;*/ + if ( pthread_create ( & task_start->io_pthread_id[STDIN_FILENO] , NULL , stdin_io_pipe_thread , task_start ) ) + goto return_label; + if ( pthread_create ( & task_start->io_pthread_id[STDOUT_FILENO] , NULL , stdout_io_pipe_thread , task_start ) ) + goto kill_stdin_thread; + if ( pthread_create ( & task_start->io_pthread_id[STDERR_FILENO] , NULL , stderr_io_pipe_thread , task_start ) ) + goto kill_stdout_thread; + + /* threads have been detatched*/ + + pthread_join ( task_start->io_pthread_id[STDERR_FILENO] , NULL ) ; + info ( "errexit" ) ; + pthread_join ( task_start->io_pthread_id[STDOUT_FILENO] , NULL ) ; + info ( "outexit" ) ; + /*pthread_join ( task_start->io_pthread_id[STDIN_FILENO] , NULL ) ;*/ + pthread_cancel ( task_start->io_pthread_id[STDIN_FILENO] ); + info ( "inexit" ) ; + /* thread join on stderr or stdout signifies task termination we should kill the stdin thread */ + + goto return_label; + + kill_stdout_thread: + pthread_kill ( task_start->io_pthread_id[STDOUT_FILENO] , SIGKILL ); + kill_stdin_thread: + pthread_kill ( task_start->io_pthread_id[STDIN_FILENO] , SIGKILL ); + return_label: + return SLURM_SUCCESS ; +} + +void * stdin_io_pipe_thread ( void * arg ) +{ + task_start_t * task_start = ( task_start_t * ) arg ; + int bytes_read ; + int bytes_written ; + int local_errno ; + circular_buffer_t * cir_buf ; + + init_circular_buffer ( & cir_buf ) ; + + posix_signal_pipe_ignore ( ) ; + + while ( true ) + { + if ( ( cir_buf->write_size == 0 ) ) + { + info ( "stdin cir_buf->write_size == 0 this shouldn't happen" ) ; + continue ; + } + + if ( ( bytes_read = slurm_read_stream ( task_start->sockets[STDIN_OUT_SOCK] , cir_buf->tail , cir_buf->write_size ) ) <= 0 ) + { + local_errno = errno ; + if ( bytes_read == 0) + { + info ( "0 returned EOF on socket ") ; + break ; + } + else if ( bytes_read == -1 ) + { + switch ( local_errno ) + { + case EBADF: + case EPIPE: + case ECONNREFUSED: + case ECONNRESET: + case ENOTCONN: + break ; + default: + info ( "error reading stdin stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; + error ( "uncaught errno %i", local_errno ) ; + break; + } + } + else + { + info ( "bytes_read: %i don't know what to do with this return code ", bytes_read ) ; + } + } + else + { + cir_buf_write_update ( cir_buf , bytes_read ) ; + } + + /* debug */ + //write ( 1 , "stdin-", 6 ) ; + //write ( 1 , cir_buf->head , cir_buf->read_size ) ; + info ( "%i stdin bytes read", bytes_read ) ; + /* debug */ + + while ( true) + { + + if ( ( bytes_written = write ( task_start->pipes[CHILD_IN_WR] , cir_buf->head , cir_buf->read_size ) ) <= 0 ) + { + if ( ( bytes_written == SLURM_PROTOCOL_ERROR ) && ( errno == EINTR ) ) + { + continue ; + } + else + { + + local_errno = errno ; + info ( "error sending stdin stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; + goto stdin_return ; + } + } + else + { + cir_buf_read_update ( cir_buf , bytes_written ) ; + break ; + } + } + } + stdin_return: + free_circular_buffer ( cir_buf ) ; + pthread_exit ( NULL ) ; +} + +#define RECONNECT_RETRY_TIME 1 +void * stdout_io_pipe_thread ( void * arg ) +{ + task_start_t * task_start = ( task_start_t * ) arg ; + int bytes_read ; + int sock_bytes_written ; + int local_errno ; + int attempt_reconnect = false ; + time_t last_reconnect_try = 0 ; + circular_buffer_t * cir_buf ; + + init_circular_buffer ( & cir_buf ) ; + + posix_signal_pipe_ignore ( ) ; + + while ( true ) + { + if ( ( cir_buf->write_size == 0 ) ) + { + info ( "stdout cir_buf->write_size == 0 this shouldn't happen" ) ; + continue ; + } + + /* read stdout code */ + if ( ( bytes_read = read_EINTR ( task_start->pipes[CHILD_OUT_RD] , cir_buf->tail , cir_buf->write_size ) ) <= 0 ) + { + local_errno = errno ; + info ( "error reading stdout stream for task %i, errno %i , bytes read %i ", + task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] , local_errno , bytes_read ) ; + goto stdout_return ; + } + else + { + cir_buf_write_update ( cir_buf , bytes_read ) ; + } + + /* debug */ + write ( 1 , cir_buf->head , cir_buf->read_size ) ; + info ( "%i stdout bytes read", bytes_read ) ; + /* debug */ + + /* reconnect code */ + if ( attempt_reconnect ) + { + time_t curr_time = time ( NULL ) ; + if ( difftime ( curr_time , last_reconnect_try ) > RECONNECT_RETRY_TIME ) + { + slurm_close_stream ( task_start->sockets[STDIN_OUT_SOCK] ) ; + if ( ( task_start->sockets[STDIN_OUT_SOCK] = slurm_open_stream ( & ( task_start -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) + { + local_errno = errno ; + info ( "error reconnecting socket to srun to pipe stdout errno %i" , local_errno ) ; + last_reconnect_try = time ( NULL ) ; + continue ; + } + attempt_reconnect = false ; + } + else + { + continue ; + } + } + + /* write out socket code */ + if ( ( sock_bytes_written = slurm_write_stream ( task_start->sockets[STDIN_OUT_SOCK] , cir_buf->head , cir_buf->read_size ) ) == SLURM_PROTOCOL_ERROR ) + { + local_errno = errno ; + switch ( local_errno ) + { + case EBADF: + case EPIPE: + case ECONNREFUSED: + case ECONNRESET: + case ENOTCONN: + info ( "std out connection losti %i", local_errno ) ; + attempt_reconnect = true ; + slurm_close_stream ( task_start->sockets[STDIN_OUT_SOCK] ) ; + break ; + default: + info ( "error sending stdout stream for task %i , errno %i", 1 , local_errno ) ; + error ( "uncaught errno %i", local_errno ) ; + break ; + } + continue ; + } + cir_buf_read_update ( cir_buf , sock_bytes_written ) ; + } + + stdout_return: + free_circular_buffer ( cir_buf ) ; + slurm_close_stream ( task_start->sockets[STDIN_OUT_SOCK] ) ; + pthread_exit ( NULL ) ; +} + +void * stderr_io_pipe_thread ( void * arg ) +{ + task_start_t * task_start = ( task_start_t * ) arg ; + int bytes_read ; + int sock_bytes_written ; + int local_errno ; + int attempt_reconnect = false ; + time_t last_reconnect_try = 0 ; + circular_buffer_t * cir_buf ; + + init_circular_buffer ( & cir_buf ) ; + + while ( true ) + { + if ( ( cir_buf->write_size == 0 ) ) + { + info ( "stderr cir_buf->write_size == 0 this shouldn't happen" ) ; + continue ; + } + + /* read stderr code */ + if ( ( bytes_read = read ( task_start->pipes[CHILD_ERR_RD] , cir_buf->tail , cir_buf->write_size ) ) <= 0 ) + { + debug ( "bytes_read: %i , errno: %i", bytes_read , errno ) ; + if ( ( bytes_read == SLURM_PROTOCOL_ERROR ) && ( errno == EINTR ) ) + { + continue ; + } + else + { + + local_errno = errno ; + info ( "error reading stderr stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; + goto stderr_return ; + } + } + else + { + cir_buf_write_update ( cir_buf , bytes_read ) ; + } + + /* debug */ + /* + info ( "%i stderr bytes read", bytes_read ) ; + write ( 2 , cir_buf->head , cir_buf->read_size ) ; + */ + /* debug */ + + /* reconnect code */ + if ( attempt_reconnect ) + { + time_t curr_time = time ( NULL ) ; + if ( difftime ( curr_time , last_reconnect_try ) > RECONNECT_RETRY_TIME ) + { + slurm_close_stream ( task_start->sockets[SIG_STDERR_SOCK] ) ; + if ( ( task_start->sockets[SIG_STDERR_SOCK] = slurm_open_stream ( &( task_start -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) + { + local_errno = errno ; + info ( "error reconnecting socket to srun to pipe stderr errno %i" , local_errno ) ; + last_reconnect_try = time ( NULL ) ; + continue ; + } + attempt_reconnect = false ; + } + else + { + continue ; + } + } + + /* write out socket code */ + if ( ( sock_bytes_written = slurm_write_stream ( task_start->sockets[SIG_STDERR_SOCK] , cir_buf->head , cir_buf->read_size ) ) == SLURM_PROTOCOL_ERROR ) + { + local_errno = errno ; + switch ( local_errno ) + { + case EBADF: + case EPIPE: + case ECONNREFUSED: + case ECONNRESET: + case ENOTCONN: + info ( "std err connection lost %i ", local_errno ) ; + attempt_reconnect = true ; + slurm_close_stream ( task_start->sockets[SIG_STDERR_SOCK] ) ; + break ; + default: + info ( "error sending stderr stream for task %i , errno %i", 1 , local_errno ) ; + error ( "uncaught errno %i", local_errno ) ; + break ; + } + continue ; + } + cir_buf_read_update ( cir_buf , sock_bytes_written ) ; + } + + stderr_return: + free_circular_buffer ( cir_buf ) ; + slurm_close_stream ( task_start->sockets[SIG_STDERR_SOCK] ) ; + pthread_exit ( NULL ) ; +} + +int connect_io_stream ( task_start_t * task_start , int out_or_err ) +{ + int local_errno ; + if ( ( task_start->sockets[out_or_err] = slurm_open_stream ( & ( task_start -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) + { + local_errno = errno ; + info ( "error opening socket to srun to pipe %s errno %i" , out_or_err ? "stdout" : "stderr" , local_errno ) ; + return SLURM_PROTOCOL_ERROR ; + } + else + { + return send_io_stream_header ( task_start , out_or_err) ; + } + +} + +int send_io_stream_header ( task_start_t * task_start , int out_or_err ) +{ + slurm_io_stream_header_t io_header ; + char buffer[sizeof(slurm_io_stream_header_t)] ; + char * buf_ptr = buffer ; + int buf_size = sizeof(slurm_io_stream_header_t) ; + int size = sizeof(slurm_io_stream_header_t) ; + + + if( out_or_err == STDIN_OUT_SOCK ) + { + init_io_stream_header ( & io_header , + task_start -> launch_msg -> credential -> signature , + task_start -> launch_msg -> global_task_ids[task_start -> local_task_id ] , + SLURM_IO_STREAM_INOUT + ) ; + pack_io_stream_header ( & io_header , (void ** ) & buf_ptr , & size ) ; + return slurm_write_stream ( task_start->sockets[STDIN_OUT_SOCK] , buffer , buf_size - size ) ; + } + else + { + + init_io_stream_header ( & io_header , + task_start -> launch_msg -> credential -> signature , + task_start -> launch_msg -> global_task_ids[task_start -> local_task_id ] , + SLURM_IO_STREAM_SIGERR + ) ; + pack_io_stream_header ( & io_header , (void ** ) & buf_ptr , & size ) ; + return slurm_write_stream ( task_start->sockets[SIG_STDERR_SOCK] , buffer , buf_size - size ) ; + } +} + + +ssize_t read_EINTR(int fd, void *buf, size_t count) +{ + ssize_t bytes_read ; + while ( true ) + { + if ( ( bytes_read = read ( fd, buf, count ) ) <= 0 ) + { + debug ( "bytes_read: %i , errno: %i", bytes_read , errno ) ; + if ( ( bytes_read == SLURM_PROTOCOL_ERROR ) && ( errno == EINTR ) ) + { + continue ; + } + } + return bytes_read ; + } +} + diff --git a/src/slurmd/io.h b/src/slurmd/io.h new file mode 100644 index 0000000000000000000000000000000000000000..19bf92b3c40130f0df69beedbac864e7d8c882df --- /dev/null +++ b/src/slurmd/io.h @@ -0,0 +1,28 @@ +#ifndef _SLURMD_IO_H_ +#define _SLURMD_IO_H_ +#include <src/slurmd/task_mgr.h> + +/* file descriptor defines */ + +#define STDIN_FILENO 0 +#define STDOUT_FILENO 1 +#define STDERR_FILENO 2 +#define MAX_TASKS_PER_LAUNCH 64 +#define CHILD_IN 0 +#define CHILD_IN_RD 0 +#define CHILD_IN_WR 1 +#define CHILD_OUT 2 +#define CHILD_OUT_RD 2 +#define CHILD_OUT_WR 3 +#define CHILD_ERR 4 +#define CHILD_ERR_RD 4 +#define CHILD_ERR_WR 5 + +/* prototypes */ + +int forward_io ( task_start_t * task_arg ) ; +void * stdin_io_pipe_thread ( void * arg ) ; +void * stdout_io_pipe_thread ( void * arg ) ; +void * stderr_io_pipe_thread ( void * arg ) ; + +#endif diff --git a/src/slurmd/pipes.c b/src/slurmd/pipes.c new file mode 100644 index 0000000000000000000000000000000000000000..077667a59655c5e227f409050a235d5b46fa87fd --- /dev/null +++ b/src/slurmd/pipes.c @@ -0,0 +1,59 @@ +#include <unistd.h> +#include <errno.h> + +#include <src/common/slurm_errno.h> +#include <src/common/log.h> +#include <src/slurmd/pipes.h> +#include <src/slurmd/io.h> +void setup_parent_pipes ( int * pipes ) +{ + close ( pipes[CHILD_IN_RD] ) ; + close ( pipes[CHILD_OUT_WR] ) ; + close ( pipes[CHILD_ERR_WR] ) ; +} + +int init_parent_pipes ( int * pipes ) +{ + int rc ; + /* open pipes to be used in dup after fork */ + if( ( rc = pipe ( & pipes[CHILD_IN] ) ) ) + { + return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; + } + if( ( rc = pipe ( & pipes[CHILD_OUT] ) ) ) + { + return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; + } + if( ( rc = pipe ( & pipes[CHILD_ERR] ) ) ) + { + return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; + } + return SLURM_SUCCESS ; +} + +int setup_child_pipes ( int * pipes ) +{ + int error_code = SLURM_SUCCESS ; + int local_errno; + + /*dup stdin*/ + //close ( STDIN_FILENO ); + if ( SLURM_ERROR == ( error_code |= dup2 ( pipes[CHILD_IN_RD] , STDIN_FILENO ) ) ) + { + local_errno = errno ; + error ("dup failed on child standard in pipe, errno %i" , local_errno ); + //return error_code ; + } + close ( CHILD_IN_RD ); + close ( CHILD_IN_WR ); + + /*dup stdout*/ + //close ( STDOUT_FILENO ); + if ( SLURM_ERROR == ( error_code |= dup2 ( pipes[CHILD_OUT_WR] , STDOUT_FILENO ) ) ) + { + local_errno = errno ; + error ("dup failed on child standard out pipe, errno %i" , local_errno ); + //return error_code ; + } + return error_code ; +} diff --git a/src/slurmd/pipes.h b/src/slurmd/pipes.h new file mode 100644 index 0000000000000000000000000000000000000000..017b1a8b0116ac35a61a1e0b79508fa461e23dc9 --- /dev/null +++ b/src/slurmd/pipes.h @@ -0,0 +1,10 @@ +#ifndef _SLURMD_PIPES_H_ +#define _SLURMD_PIPES_H_ + +/*pipes.c*/ +int init_parent_pipes ( int * pipes ) ; +void setup_parent_pipes ( int * pipes ) ; +int setup_child_pipes ( int * pipes ) ; + +#endif + diff --git a/src/slurmd/task_mgr.c b/src/slurmd/task_mgr.c index 03e88a51e21514e53b49b2b93c41801236578fac..23c9d51a6a4c0c52da93ea21f0864bc14c49e441 100644 --- a/src/slurmd/task_mgr.c +++ b/src/slurmd/task_mgr.c @@ -19,45 +19,18 @@ #include <src/slurmd/task_mgr.h> #include <src/slurmd/shmem_struct.h> #include <src/slurmd/circular_buffer.h> +#include <src/slurmd/pipes.h> +#include <src/slurmd/io.h> /* global variables */ -/* file descriptor defines */ - -#define STDIN_FILENO 0 -#define STDOUT_FILENO 1 -#define STDERR_FILENO 2 -#define MAX_TASKS_PER_LAUNCH 64 -#define CHILD_IN 0 -#define CHILD_IN_RD 0 -#define CHILD_IN_WR 1 -#define CHILD_OUT 2 -#define CHILD_OUT_RD 2 -#define CHILD_OUT_WR 3 -#define CHILD_ERR 4 -#define CHILD_ERR_RD 4 -#define CHILD_ERR_WR 5 - -/* extern slurmd_shmem_t * shmem_seg ; */ - /* prototypes */ -void slurm_free_task ( void * _task ) ; int kill_task ( task_t * task ) ; int interconnect_init ( launch_tasks_request_msg_t * launch_msg ); int fan_out_task_launch ( launch_tasks_request_msg_t * launch_msg ); -int forward_io ( task_start_t * task_arg ) ; void * task_exec_thread ( void * arg ) ; - -int init_parent_pipes ( int * pipes ) ; -void setup_parent_pipes ( int * pipes ) ; -int setup_child_pipes ( int * pipes ) ; - -void * stdin_io_pipe_thread ( void * arg ) ; -void * stdout_io_pipe_thread ( void * arg ) ; -void * stderr_io_pipe_thread ( void * arg ) ; - -int setup_task_env (task_start_t * task_start ) ; +int send_task_exit_msg ( int task_return_code , task_start_t * task_start ) ; /****************************************************************** *task launch method call hierarchy @@ -149,355 +122,12 @@ int fan_out_task_launch ( launch_tasks_request_msg_t * launch_msg ) return SLURM_SUCCESS ; } -int forward_io ( task_start_t * task_arg ) -{ - pthread_attr_t pthread_attr ; - int local_errno; - slurm_io_stream_header_t io_header ; - -#define STDIN_OUT_SOCK 0 -#define SIG_STDERR_SOCK 1 - - posix_signal_pipe_ignore ( ) ; - - /* open stdout & stderr sockets */ - if ( ( task_arg->sockets[STDIN_OUT_SOCK] = slurm_open_stream ( & ( task_arg -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) - { - local_errno = errno ; - info ( "error opening socket to srun to pipe stdout errno %i" , local_errno ) ; - /* pthread_exit ( 0 ) ; */ - } - else - { - char buffer[sizeof(slurm_io_stream_header_t)] ; - char * buf_ptr = buffer ; - int buf_size = sizeof(slurm_io_stream_header_t) ; - int size = sizeof(slurm_io_stream_header_t) ; - - init_io_stream_header ( & io_header , - task_arg -> launch_msg -> credential -> signature , - task_arg -> launch_msg -> global_task_ids[task_arg -> local_task_id ] , - SLURM_IO_STREAM_INOUT - ) ; - pack_io_stream_header ( & io_header , (void ** ) & buf_ptr , & size ) ; - slurm_write_stream ( task_arg->sockets[STDIN_OUT_SOCK] , buffer , buf_size - size ) ; - } - - if ( ( task_arg->sockets[SIG_STDERR_SOCK] = slurm_open_stream ( &( task_arg -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) - { - local_errno = errno ; - info ( "error opening socket to srun to pipe stdout errno %i" , local_errno ) ; - /* pthread_exit ( 0 ) ; */ - } - else - { - char buffer[sizeof(slurm_io_stream_header_t)] ; - char * buf_ptr = buffer ; - int buf_size = sizeof(slurm_io_stream_header_t) ; - int size = sizeof(slurm_io_stream_header_t) ; - - init_io_stream_header ( & io_header , - task_arg -> launch_msg -> credential -> signature , - task_arg -> launch_msg -> global_task_ids[task_arg -> local_task_id ] , - SLURM_IO_STREAM_SIGERR - ) ; - pack_io_stream_header ( & io_header , (void ** ) & buf_ptr , & size ) ; - slurm_write_stream ( task_arg->sockets[SIG_STDERR_SOCK] , buffer , buf_size - size ) ; - } - - /* spawn io pipe threads */ - pthread_attr_init( & pthread_attr ) ; - //pthread_attr_setdetachstate ( & pthread_attr , PTHREAD_CREATE_DETACHED ) ; - if ( pthread_create ( & task_arg->io_pthread_id[STDIN_FILENO] , NULL , stdin_io_pipe_thread , task_arg ) ) - goto return_label; - if ( pthread_create ( & task_arg->io_pthread_id[STDOUT_FILENO] , NULL , stdout_io_pipe_thread , task_arg ) ) - goto kill_stdin_thread; - if ( pthread_create ( & task_arg->io_pthread_id[STDERR_FILENO] , NULL , stderr_io_pipe_thread , task_arg ) ) - goto kill_stdout_thread; - - ///* threads have been detatched*/ - - pthread_join ( task_arg->io_pthread_id[STDERR_FILENO] , NULL ) ; - info ( "errexit" ) ; - pthread_join ( task_arg->io_pthread_id[STDOUT_FILENO] , NULL ) ; - info ( "outexit" ) ; - /* thread join on stderr or stdout signifies task termination we should kill the stdin thread */ - /*pthread_kill ( task_arg->io_pthread_id[STDIN_FILENO] , SIGKILL );*/ - - goto return_label; - - kill_stdout_thread: - pthread_kill ( task_arg->io_pthread_id[STDOUT_FILENO] , SIGKILL ); - kill_stdin_thread: - pthread_kill ( task_arg->io_pthread_id[STDIN_FILENO] , SIGKILL ); - return_label: - return SLURM_SUCCESS ; -} - -void * stdin_io_pipe_thread ( void * arg ) -{ - task_start_t * io_arg = ( task_start_t * ) arg ; - //char buffer[SLURMD_IO_MAX_BUFFER_SIZE] ; - int bytes_read ; - int bytes_written ; - int local_errno ; - circular_buffer_t * cir_buf ; - - init_circular_buffer ( & cir_buf ) ; - - posix_signal_pipe_ignore ( ) ; - - while ( true ) - { - /*if ( ( bytes_read = slurm_read_stream ( io_arg->sockets[0] , buffer , SLURMD_IO_MAX_BUFFER_SIZE ) ) == SLURM_PROTOCOL_ERROR )*/ - if ( ( bytes_read = slurm_read_stream ( io_arg->sockets[STDIN_OUT_SOCK] , cir_buf->tail , cir_buf->write_size ) ) <= 0 ) - { - local_errno = errno ; - switch ( local_errno ) - { - case 0: - case EBADF: - case EPIPE: - case ECONNREFUSED: - case ECONNRESET: - case ENOTCONN: - break ; - default: - info ( "error reading stdin stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; - error ( "uncaught errno %i", local_errno ) ; - } - continue ; - } - cir_buf_write_update ( cir_buf , bytes_read ) ; - /* debug */ - //write ( 1 , "stdin-", 6 ) ; - //write ( 1 , buffer , bytes_read ) ; - //write ( 1 , cir_buf->head , cir_buf->read_size ) ; - info ( "%i stdin bytes read", bytes_read ) ; - /* debug */ - while ( true) - { - /*if ( ( bytes_written = write ( io_arg->pipes[CHILD_IN_WR] , buffer , bytes_read ) ) <= 0 )*/ - if ( ( bytes_written = write ( io_arg->pipes[CHILD_IN_WR] , cir_buf->head , cir_buf->read_size ) ) <= 0 ) - { - if ( ( bytes_written == SLURM_PROTOCOL_ERROR ) && ( errno == EINTR ) ) - { - continue ; - } - else - { - - local_errno = errno ; - info ( "error sending stdin stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; - goto stdin_return ; - } - } - else - { - cir_buf_read_update ( cir_buf , bytes_written ) ; - break ; - } - } - } - stdin_return: - free_circular_buffer ( cir_buf ) ; - pthread_exit ( NULL ) ; -} - -#define RECONNECT_RETRY_TIME 1 -void * stdout_io_pipe_thread ( void * arg ) -{ - task_start_t * io_arg = ( task_start_t * ) arg ; - //char buffer[SLURMD_IO_MAX_BUFFER_SIZE] ; - int bytes_read ; - int sock_bytes_written ; - int local_errno ; - int attempt_reconnect = false ; - time_t last_reconnect_try = 0 ; - circular_buffer_t * cir_buf ; - - init_circular_buffer ( & cir_buf ) ; - - posix_signal_pipe_ignore ( ) ; - - while ( true ) - { - /*sleep ( 1 ) ;*/ - /* read stderr code */ - /*if ( ( bytes_read = read ( io_arg->pipes[CHILD_OUT_RD] , buffer , SLURMD_IO_MAX_BUFFER_SIZE ) ) <= 0 )*/ - if ( ( bytes_read = read ( io_arg->pipes[CHILD_OUT_RD] , cir_buf->tail , cir_buf->write_size ) ) <= 0 ) - { - debug ( "bytes_read: %i , errno: %i", bytes_read , errno ) ; - if ( ( bytes_read == SLURM_PROTOCOL_ERROR ) && ( errno == EINTR ) ) - { - continue ; - } - else - { - - local_errno = errno ; - info ( "error reading stdout stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; - goto stdout_return ; - } - } - cir_buf_write_update ( cir_buf , bytes_read ) ; - /* debug */ - //write ( 1 , buffer , bytes_read ) ; - write ( 1 , cir_buf->head , cir_buf->read_size ) ; - info ( "%i stdout bytes read", bytes_read ) ; - /* debug */ - /* reconnect code */ - if ( attempt_reconnect ) - { - time_t curr_time = time ( NULL ) ; - if ( difftime ( curr_time , last_reconnect_try ) > RECONNECT_RETRY_TIME ) - { - slurm_close_stream ( io_arg->sockets[STDIN_OUT_SOCK] ) ; - if ( ( io_arg->sockets[STDIN_OUT_SOCK] = slurm_open_stream ( & ( io_arg -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) - { - local_errno = errno ; - info ( "error reconnecting socket to srun to pipe stdout errno %i" , local_errno ) ; - last_reconnect_try = time ( NULL ) ; - continue ; - } - attempt_reconnect = false ; - } - else - { - continue ; - } - } - /* write out socket code */ - //if ( ( sock_bytes_written = slurm_write_stream ( io_arg->sockets[0] , buffer , bytes_read ) ) == SLURM_PROTOCOL_ERROR ) - if ( ( sock_bytes_written = slurm_write_stream ( io_arg->sockets[STDIN_OUT_SOCK] , cir_buf->head , cir_buf->read_size ) ) == SLURM_PROTOCOL_ERROR ) - { - local_errno = errno ; - switch ( local_errno ) - { - case EBADF: - case EPIPE: - case ECONNREFUSED: - case ECONNRESET: - case ENOTCONN: - info ( "std out connection lost" ) ; - attempt_reconnect = true ; - slurm_close_stream ( io_arg->sockets[STDIN_OUT_SOCK] ) ; - break ; - default: - info ( "error sending stdout stream for task %i , errno %i", 1 , local_errno ) ; - error ( "uncaught errno %i", local_errno ) ; - } - continue ; - } - cir_buf_read_update ( cir_buf , sock_bytes_written ) ; - } - - stdout_return: - free_circular_buffer ( cir_buf ) ; - slurm_close_stream ( io_arg->sockets[STDIN_OUT_SOCK] ) ; - pthread_exit ( NULL ) ; -} - -void * stderr_io_pipe_thread ( void * arg ) -{ - task_start_t * io_arg = ( task_start_t * ) arg ; - //char buffer[SLURMD_IO_MAX_BUFFER_SIZE] ; - int bytes_read ; - int sock_bytes_written ; - int local_errno ; - int attempt_reconnect = false ; - time_t last_reconnect_try = 0 ; - circular_buffer_t * cir_buf ; - - init_circular_buffer ( & cir_buf ) ; - - while ( true ) - { - /*sleep ( 1 ) ;*/ - /* read stderr code */ - //if ( ( bytes_read = read ( io_arg->pipes[CHILD_ERR_RD] , buffer , SLURMD_IO_MAX_BUFFER_SIZE ) ) <= 0 ) - if ( ( bytes_read = read ( io_arg->pipes[CHILD_ERR_RD] , cir_buf->tail , cir_buf->write_size ) ) <= 0 ) - { - debug ( "bytes_read: %i , errno: %i", bytes_read , errno ) ; - if ( ( bytes_read == SLURM_PROTOCOL_ERROR ) && ( errno == EINTR ) ) - { - continue ; - } - else - { - - local_errno = errno ; - info ( "error reading stderr stream for task %i, errno %i , bytes read %i ", 1 , local_errno , bytes_read ) ; - goto stderr_return ; - } - } - cir_buf_write_update ( cir_buf , bytes_read ) ; - /* debug */ - /* - write ( 2 , buffer , bytes_read ) ; - info ( "%i stderr bytes read", bytes_read ) ; - write ( 2 , cir_buf->head , cir_buf->read_size ) ; - */ - /* debug */ - /* reconnect code */ - if ( attempt_reconnect ) - { - time_t curr_time = time ( NULL ) ; - if ( difftime ( curr_time , last_reconnect_try ) > RECONNECT_RETRY_TIME ) - { - slurm_close_stream ( io_arg->sockets[SIG_STDERR_SOCK] ) ; - if ( ( io_arg->sockets[SIG_STDERR_SOCK] = slurm_open_stream ( &( io_arg -> io_streams_dest ) ) ) == SLURM_PROTOCOL_ERROR ) - { - local_errno = errno ; - info ( "error reconnecting socket to srun to pipe stderr errno %i" , local_errno ) ; - last_reconnect_try = time ( NULL ) ; - continue ; - } - attempt_reconnect = false ; - } - else - { - continue ; - } - } - /* write out socket code */ - /*if ( ( sock_bytes_written = slurm_write_stream ( io_arg->sockets[1] , buffer , bytes_read ) ) == SLURM_PROTOCOL_ERROR )*/ - if ( ( sock_bytes_written = slurm_write_stream ( io_arg->sockets[SIG_STDERR_SOCK] , cir_buf->head , cir_buf->read_size ) ) == SLURM_PROTOCOL_ERROR ) - { - local_errno = errno ; - switch ( local_errno ) - { - case EBADF: - case EPIPE: - case ECONNREFUSED: - case ECONNRESET: - case ENOTCONN: - info ( "std err connection lost %s ", local_errno ) ; - attempt_reconnect = true ; - slurm_close_stream ( io_arg->sockets[SIG_STDERR_SOCK] ) ; - break ; - default: - info ( "error sending stderr stream for task %i , errno %i", 1 , local_errno ) ; - error ( "uncaught errno %i", local_errno ) ; - } - continue ; - } - cir_buf_read_update ( cir_buf , sock_bytes_written ) ; - } - - stderr_return: - free_circular_buffer ( cir_buf ) ; - slurm_close_stream ( io_arg->sockets[SIG_STDERR_SOCK] ) ; - pthread_exit ( NULL ) ; -} void * task_exec_thread ( void * arg ) { task_start_t * task_start = ( task_start_t * ) arg ; launch_tasks_request_msg_t * launch_msg = task_start -> launch_msg ; - slurm_msg_t resp_msg ; - task_exit_msg_t task_exit ; int * pipes = task_start->pipes ; int rc ; int cpid ; @@ -507,10 +137,6 @@ void * task_exec_thread ( void * arg ) /* create pipes to read child stdin, stdout, sterr */ init_parent_pipes ( task_start->pipes ) ; - /* - setup_parent_pipes ( task_start->pipes ) ; - forward_io ( arg ) ; - */ #define FORK_ERROR -1 #define CHILD_PROCCESS 0 @@ -536,7 +162,7 @@ void * task_exec_thread ( void * arg ) _exit ( SLURM_FAILURE ) ; } - /* setuid and gid*/ + /* setgid and uid*/ if ( ( rc = setgid ( pwd -> pw_gid ) ) == SLURM_ERROR ) { info ( "set group id failed " ) ; @@ -560,13 +186,14 @@ void * task_exec_thread ( void * arg ) /* run bash and cmdline */ debug( "cwd %s", launch_msg->cwd ) ; chdir ( launch_msg->cwd ) ; - //execl ("/bin/bash", "bash", "-c", launch_msg->cmd_line, 0); - execve ( launch_msg->argv[0], launch_msg->argv , launch_msg->env ); + + /* error if execve returns + * clean up */ + error("execve(): %s: %m", launch_msg->argv[0]); close ( STDIN_FILENO ); close ( STDOUT_FILENO ); close ( STDERR_FILENO ); - error("execve(): %s: %m", launch_msg->argv[0]); local_errno = errno ; _exit ( local_errno ) ; break; @@ -576,92 +203,42 @@ void * task_exec_thread ( void * arg ) forward_io ( arg ) ; task_start->exec_pid = cpid ; waitpid ( cpid , & task_return_code , 0 ) ; - - - task_exit . return_code = task_return_code ; - task_exit . task_id = launch_msg -> global_task_ids[task_start -> local_task_id ] ; - - resp_msg . address = launch_msg -> response_addr ; - resp_msg . data = & task_exit ; - resp_msg . msg_type = MESSAGE_TASK_EXIT; - slurm_send_only_node_msg ( & resp_msg ) ; + send_task_exit_msg ( task_return_code , task_start ) ; break; } return ( void * ) SLURM_SUCCESS ; } -void setup_parent_pipes ( int * pipes ) -{ - close ( pipes[CHILD_IN_RD] ) ; - close ( pipes[CHILD_OUT_WR] ) ; - close ( pipes[CHILD_ERR_WR] ) ; -} -int init_parent_pipes ( int * pipes ) +int send_task_exit_msg ( int task_return_code , task_start_t * task_start ) { - int rc ; - /* open pipes to be used in dup after fork */ - if( ( rc = pipe ( & pipes[CHILD_IN] ) ) ) - { - return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; - } - if( ( rc = pipe ( & pipes[CHILD_OUT] ) ) ) - { - return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; - } - if( ( rc = pipe ( & pipes[CHILD_ERR] ) ) ) - { - return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; - } - return SLURM_SUCCESS ; -} - -int setup_child_pipes ( int * pipes ) -{ - int error_code = 0 ; - int local_errno; + slurm_msg_t resp_msg ; + task_exit_msg_t task_exit ; - /*dup stdin*/ - //close ( STDIN_FILENO ); - if ( SLURM_ERROR == ( error_code |= dup2 ( pipes[CHILD_IN_RD] , STDIN_FILENO ) ) ) - { - local_errno = errno ; - error ("dup failed on child standard in pipe, errno %i" , local_errno ); - //return error_code ; - } - close ( CHILD_IN_RD ); - close ( CHILD_IN_WR ); + /* init task_exit_message */ + task_exit . return_code = task_return_code ; + task_exit . task_id = task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] ; - /*dup stdout*/ - //close ( STDOUT_FILENO ); - if ( SLURM_ERROR == ( error_code |= dup2 ( pipes[CHILD_OUT_WR] , STDOUT_FILENO ) ) ) - { - local_errno = errno ; - error ("dup failed on child standard out pipe, errno %i" , local_errno ); - //return error_code ; - } - close ( CHILD_OUT_RD ); - close ( CHILD_OUT_WR ); + /* init slurm_msg_t */ + resp_msg . address = task_start -> launch_msg -> response_addr ; + resp_msg . data = & task_exit ; + resp_msg . msg_type = MESSAGE_TASK_EXIT; - /*dup stderr*/ - //close ( STDERR_FILENO ); - if ( SLURM_ERROR == ( error_code |= dup2 ( pipes[CHILD_ERR_WR] , STDERR_FILENO ) ) ) - { - local_errno = errno ; - error ("dup failed on child standard err pipe, errno %i" , local_errno ); - //return error_code ; - } - close ( CHILD_ERR_RD ); - close ( CHILD_ERR_WR ); - - return error_code ; + /* send message */ + return slurm_send_only_node_msg ( & resp_msg ) ; } int kill_tasks ( kill_tasks_msg_t * kill_task_msg ) { int error_code = SLURM_SUCCESS ; + + /* get shmemptr */ slurmd_shmem_t * shmem_ptr = get_shmem ( ) ; + + /* find job step */ job_step_t * job_step_ptr = find_job_step ( shmem_ptr , kill_task_msg -> job_id , kill_task_msg -> job_step_id ) ; + + /* cycle through job_step and kill tasks*/ task_t * task_ptr = job_step_ptr -> head_task ; while ( task_ptr != NULL ) { @@ -671,20 +248,22 @@ int kill_tasks ( kill_tasks_msg_t * kill_task_msg ) return error_code ; } - int kill_task ( task_t * task ) { - kill ( task -> task_start . exec_pid , SIGKILL ) ; - return SLURM_SUCCESS ; + return kill ( task -> task_start . exec_pid , SIGKILL ) ; } int reattach_tasks_streams ( reattach_tasks_streams_msg_t * req_msg ) { int i; int error_code = SLURM_SUCCESS ; + /* get shmemptr */ slurmd_shmem_t * shmem_ptr = get_shmem ( ) ; + + /* find job step */ job_step_t * job_step_ptr = find_job_step ( shmem_ptr , req_msg->job_id , req_msg->job_step_id ) ; + /* cycle through tasks and set streams address */ for ( i = 0 ; i < req_msg->tasks_to_reattach ; i ++ ) { task_t * task = find_task ( job_step_ptr , req_msg->global_task_ids[i] ) ; diff --git a/src/slurmd/task_mgr.h b/src/slurmd/task_mgr.h index c3906b5b478c5fa2a1bf822590557e2b55aae7e5..79458503d6afb85d9f6a909dd004d9ca79d7640f 100644 --- a/src/slurmd/task_mgr.h +++ b/src/slurmd/task_mgr.h @@ -14,6 +14,8 @@ # include <inttypes.h> #endif /* HAVE_CONFIG_H */ +#include <src/common/slurm_protocol_api.h> + #define STDIN_IO_THREAD 0 #define STDOUT_IO_THREAD 1 #define STDERR_IO_THREAD 2