Skip to content
Snippets Groups Projects
Commit b4eba531 authored by tewk's avatar tewk
Browse files

incremental checkin for nbio, don't want to lose it

made io_threads die when buffer is full on write to buffer, this should never happen
parent c833e5b1
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,7 @@ void * stdin_io_pipe_thread ( void * arg )
if ( ( cir_buf->write_size == 0 ) )
{
debug3 ( "stdin cir_buf->write_size == 0 this shouldn't happen" ) ;
continue ;
break ;
}
if ( ( bytes_read = slurm_read_stream ( task_start->sockets[STDIN_OUT_SOCK] , cir_buf->tail , cir_buf->write_size ) ) <= 0 )
......@@ -130,7 +130,7 @@ void * stdout_io_pipe_thread ( void * arg )
if ( ( cir_buf->write_size == 0 ) )
{
debug3 ( "stdout cir_buf->write_size == 0 this shouldn't happen" ) ;
continue ;
break ;
}
/* read stdout code */
......@@ -241,7 +241,7 @@ void * stderr_io_pipe_thread ( void * arg )
if ( ( cir_buf->write_size == 0 ) )
{
debug3 ( "stderr cir_buf->write_size == 0 this shouldn't happen" ) ;
continue ;
break ;
}
/* read stderr code */
......
......@@ -40,6 +40,14 @@ typedef enum
ER_SET
} nbio_set_t ;
typedef enum
{
CONNECTED ,
RECONNECT ,
DRAIN,
DRAINED
} reconnect_state_t ;
typedef struct nbio_attr
{
task_start_t * task_start ;
......@@ -49,24 +57,44 @@ typedef struct nbio_attr
circular_buffer_t * in_cir_buf ;
circular_buffer_t * out_cir_buf ;
circular_buffer_t * err_cir_buf ;
int nbio_flags [5] ;
int flush_flag ;
int die ;
int reconnect_flags [2] ;
time_t reconnect_timers [2] ;
int max_fd ;
} nbio_attr_t ;
typedef struct io_debug
{
char * name ;
int local_task_id ;
int global_task_id ;
} io_debug_t ;
int nbio_set_init ( nbio_attr_t * nbio_attr , slurm_fd_set * set_ptr ) ;
int memcpy_sets ( slurm_fd_set * init_set , slurm_fd_set * next_set ) ;
int write_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd write_fd , char * const name ) ;
int read_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd write_fd , char * const name ) ;
int write_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd write_fd , char * const name ) ;
int read_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd read_fd , char * const name ) ;
int error_task_pipe ( ) ;
int error_task_socket ( ) ;
int write_task_socket ( circular_buffer_t * cir_buf, slurm_fd write_fd , io_debug_t * dbg ) ;
int read_task_pipe ( circular_buffer_t * cir_buf, slurm_fd write_fd , io_debug_t * dbg ) ;
int write_task_pipe ( circular_buffer_t * cir_buf, slurm_fd write_fd , io_debug_t * dbg ) ;
int read_task_socket ( circular_buffer_t * cir_buf, slurm_fd read_fd , io_debug_t * dbg ) ;
int error_task_pipe ( nbio_attr_t * nbio_attr , int fd_index ) ;
int error_task_socket ( nbio_attr_t * nbio_attr , int fd_index ) ;
int set_max_fd ( nbio_attr_t * nbio_attr ) ;
int nbio_cleanup ( nbio_attr_t * nbio_attr ) ;
int init_io_debug ( io_debug_t * io_dbg , task_start_t * task_start , char * name )
{
io_dbg -> name = name ;
io_dbg -> local_task_id = task_start -> local_task_id ;
io_dbg -> global_task_id = task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] ;
return SLURM_SUCCESS ;
}
int init_nbio_attr ( nbio_attr_t * nbio_attr , task_start_t * task_start )
{
int i;
nbio_attr -> max_fd = 0 ;
nbio_attr -> flush_flag = false ;
nbio_attr -> task_start = task_start ;
nbio_attr -> fd[IN_OUT_FD] = task_start -> sockets[STDIN_OUT_SOCK];
nbio_attr -> fd[SIG_ERR_FD] = task_start -> sockets[SIG_STDERR_SOCK];
......@@ -77,6 +105,7 @@ int init_nbio_attr ( nbio_attr_t * nbio_attr , task_start_t * task_start )
init_circular_buffer ( & nbio_attr -> out_cir_buf ) ;
init_circular_buffer ( & nbio_attr -> err_cir_buf ) ;
nbio_set_init ( nbio_attr , nbio_attr -> init_set ) ;
for ( i=0 ; i < 2 ; i ++ ) { nbio_attr -> reconnect_flags[i] = RECONNECT ; }
return SLURM_SUCCESS ;
}
......@@ -84,6 +113,13 @@ void * do_nbio ( void * arg )
{
nbio_attr_t nbio_attr ;
task_start_t * task_start = ( task_start_t * ) arg ;
io_debug_t in_dbg ;
io_debug_t out_dbg ;
io_debug_t err_dbg ;
init_io_debug ( & in_dbg , task_start , "stdin" ) ;
init_io_debug ( & out_dbg , task_start , "stdout" ) ;
init_io_debug ( & err_dbg , task_start , "stderr" ) ;
posix_signal_pipe_ignore ( ) ;
init_nbio_attr ( & nbio_attr , task_start ) ;
......@@ -95,75 +131,120 @@ void * do_nbio ( void * arg )
select_timer . tv_sec = RECONNECT_TIMEOUT_SECONDS ;
select_timer . tv_usec = RECONNECT_TIMEOUT_MICROSECONDS ;
set_max_fd ( & nbio_attr ) ;
rc = slurm_select ( nbio_attr . max_fd , & nbio_attr . init_set[RD_SET] , & nbio_attr . init_set[WR_SET] , & nbio_attr . init_set[ER_SET] , NULL ) ;
if ( rc == SLURM_ERROR)
{
debug3 ( "select errror %m errno: %i", errno ) ;
continue ;
}
else if ( rc == 0 )
{
}
else if ( rc < 0 )
{
debug3 ( "select has unknown error: %i", rc ) ;
}
if ( nbio_attr . die )
{
break ;
}
reconnect ( ) ;
nbio_set_init ( & nbio_attr , nbio_attr . next_set ) ;
/* error fd set */
if ( slurm_FD_ISSET ( nbio_attr . fd [CHILD_IN_WR_FD] , & nbio_attr . init_set [ER_SET] ) )
{
error_task_pipe ( ) ;
break ;
error_task_pipe ( &nbio_attr , CHILD_IN_WR_FD ) ;
}
if ( slurm_FD_ISSET ( nbio_attr . fd [CHILD_OUT_RD_FD] , & nbio_attr . init_set [ER_SET] ) )
{
error_task_pipe ( ) ;
break ;
error_task_pipe ( &nbio_attr , CHILD_OUT_RD_FD ) ;
}
if ( slurm_FD_ISSET ( nbio_attr . fd [CHILD_ERR_RD_FD] , & nbio_attr . init_set [ER_SET] ) )
{
error_task_pipe ( ) ;
break ;
error_task_pipe ( &nbio_attr , CHILD_ERR_RD_FD ) ;
}
if ( slurm_FD_ISSET ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . init_set [ER_SET] ) )
{
error_task_socket ( ) ;
error_task_socket ( &nbio_attr , IN_OUT_FD ) ;
}
if ( slurm_FD_ISSET ( nbio_attr . fd [SIG_ERR_FD] , & nbio_attr . init_set [ER_SET] ) )
{
error_task_socket ( ) ;
error_task_socket ( &nbio_attr , SIG_ERR_FD ) ;
}
if ( slurm_FD_ISSET ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . init_set [RD_SET] ) )
/* read fd set */
if ( slurm_FD_ISSET ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . init_set [RD_SET] )
&& nbio_attr . reconnect_flags [IN_OUT_FD] == CONNECTED )
{
read_task_socket ( nbio_attr . in_cir_buf , nbio_attr . task_start , nbio_attr . fd [IN_OUT_FD] , "stdin" );
if ( read_task_socket ( nbio_attr . in_cir_buf , nbio_attr . fd [IN_OUT_FD] , & in_dbg ) )
{
error_task_socket ( &nbio_attr , IN_OUT_FD ) ;
}
slurm_FD_SET ( nbio_attr . fd [CHILD_IN_WR_FD] , & nbio_attr . next_set [WR_SET] );
}
if ( slurm_FD_ISSET ( nbio_attr . fd [CHILD_OUT_RD_FD] , & nbio_attr . init_set [RD_SET] ) )
{
read_task_pipe ( nbio_attr . out_cir_buf , nbio_attr . task_start , nbio_attr . fd [CHILD_OUT_RD_FD] , "stdout" );
if ( read_task_pipe ( nbio_attr . out_cir_buf , nbio_attr . fd [CHILD_OUT_RD_FD] , & out_dbg ) )
{
error_task_pipe ( & nbio_attr , CHILD_OUT_RD_FD ) ;
}
slurm_FD_SET ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . next_set [WR_SET] );
}
if ( slurm_FD_ISSET ( nbio_attr . fd [CHILD_ERR_RD_FD] , & nbio_attr . init_set [RD_SET] ) )
{
read_task_pipe ( nbio_attr . err_cir_buf , nbio_attr . task_start , nbio_attr . fd [CHILD_ERR_RD_FD] , "stderr" );
if ( read_task_pipe ( nbio_attr . err_cir_buf , nbio_attr . fd [CHILD_ERR_RD_FD] , & err_dbg ) )
{
error_task_pipe ( & nbio_attr , CHILD_ERR_RD_FD ) ;
}
slurm_FD_SET ( nbio_attr . fd [SIG_ERR_FD] , & nbio_attr . next_set [WR_SET] );
}
/* write fd set */
if ( slurm_FD_ISSET ( nbio_attr . fd [CHILD_IN_WR_FD] , & nbio_attr . next_set [WR_SET] ) )
{
write_task_pipe ( nbio_attr . in_cir_buf , nbio_attr . task_start , nbio_attr . fd [CHILD_IN_WR_FD] , "stdin" );
if ( write_task_pipe ( nbio_attr . in_cir_buf , nbio_attr . fd [CHILD_IN_WR_FD] , & in_dbg ) )
{
error_task_pipe ( & nbio_attr , CHILD_IN_WR_FD ) ;
}
slurm_FD_CLR ( nbio_attr . fd [CHILD_IN_WR_FD] , & nbio_attr . next_set [WR_SET] );
}
if ( slurm_FD_ISSET ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . next_set [WR_SET] ) )
if ( slurm_FD_ISSET ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . next_set [WR_SET] )
&& nbio_attr . reconnect_flags [IN_OUT_FD] == CONNECTED )
{
write_task_socket ( nbio_attr . out_cir_buf , nbio_attr . task_start , nbio_attr . fd [IN_OUT_FD] , "stdout" );
if ( write_task_socket ( nbio_attr . out_cir_buf , nbio_attr . fd [IN_OUT_FD] , & out_dbg ) )
{
error_task_socket ( &nbio_attr , IN_OUT_FD ) ;
}
slurm_FD_CLR ( nbio_attr . fd [IN_OUT_FD] , & nbio_attr . next_set [WR_SET] );
}
if ( slurm_FD_ISSET ( nbio_attr . fd [SIG_ERR_FD] , & nbio_attr . next_set [WR_SET] ) )
if ( slurm_FD_ISSET ( nbio_attr . fd [SIG_ERR_FD] , & nbio_attr . next_set [WR_SET] )
&& nbio_attr . reconnect_flags [IN_OUT_FD] == CONNECTED )
{
write_task_socket ( nbio_attr . err_cir_buf , nbio_attr . task_start , nbio_attr . fd [SIG_ERR_FD] , "stderr" );
if ( write_task_socket ( nbio_attr . err_cir_buf , nbio_attr . fd [SIG_ERR_FD] , & err_dbg ) )
{
error_task_socket ( &nbio_attr , SIG_ERR_FD ) ;
}
slurm_FD_CLR ( nbio_attr . fd [SIG_ERR_FD] , & nbio_attr . next_set [WR_SET] );
}
memcpy_sets ( nbio_attr . init_set , nbio_attr . next_set ) ;
}
nbio_cleanup ( & nbio_attr ) ;
return SLURM_SUCCESS ;
}
int memcpy_sets ( slurm_fd_set * init_set , slurm_fd_set * next_set )
{
int i ;
for ( i=0 ; i < 3 ; i++ )
{
memcpy ( & init_set[i] , & next_set[i] , sizeof ( slurm_fd_set ) ) ;
......@@ -171,15 +252,15 @@ int memcpy_sets ( slurm_fd_set * init_set , slurm_fd_set * next_set )
return SLURM_SUCCESS ;
}
int read_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd read_fd , char * const name )
int read_task_pipe ( circular_buffer_t * cir_buf, slurm_fd read_fd , io_debug_t * dbg )
{
int bytes_read ;
int local_errno ;
/* test for wierd state */
if ( ( cir_buf->write_size == 0 ) )
{
info ( "%s cir_buf->write_size == 0 this shouldn't happen" , name ) ;
if ( dbg ) debug3 ( "%s cir_buf->write_size == 0 this shouldn't happen" , dbg -> name ) ;
slurm_seterrno_ret ( ESLURMD_CIRBUF_POINTER_0 ) ;
}
......@@ -187,8 +268,8 @@ int read_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, slu
if ( ( bytes_read = read_EINTR ( read_fd , cir_buf->tail , cir_buf->write_size ) ) <= 0 )
{
local_errno = errno ;
info ( "error reading %s stream for task %i, %m errno: %i , bytes read %i ",
name , task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] , local_errno , bytes_read ) ;
if ( dbg ) debug3 ( "%i error reading %s pipe stream, %m errno: %i , bytes read %i ",
dbg -> global_task_id , dbg -> name , local_errno , bytes_read ) ;
slurm_seterrno_ret ( ESLURMD_PIPE_DISCONNECT ) ;
}
else
......@@ -199,23 +280,23 @@ int read_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, slu
}
int write_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd write_fd , char * const name )
int write_task_pipe ( circular_buffer_t * cir_buf, slurm_fd write_fd , io_debug_t * dbg )
{
int bytes_written ;
int local_errno ;
/* test for wierd state */
if ( ( cir_buf->read_size == 0 ) )
{
info ( "%s cir_buf->read_size == 0 this shouldn't happen" , name ) ;
if ( dbg ) debug3 ( "%s cir_buf->read_size == 0 this shouldn't happen" , dbg -> name ) ;
slurm_seterrno_ret ( ESLURMD_CIRBUF_POINTER_0 ) ;
}
if ( ( bytes_written = write_EINTR ( write_fd , cir_buf->head , cir_buf->read_size ) ) <= 0 )
{
local_errno = errno ;
info ( "error sending %s stream for task %i, %m errno: %i , bytes written %i ",
name , task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] , local_errno , bytes_written ) ;
if ( dbg ) debug3 ( "%i error sending %s pipe stream, %m errno: %i , bytes written %i ",
dbg -> global_task_id , dbg -> name , local_errno , bytes_written ) ;
slurm_seterrno_ret ( ESLURMD_PIPE_DISCONNECT ) ;
}
else
......@@ -225,15 +306,15 @@ int write_task_pipe ( circular_buffer_t * cir_buf, task_start_t * task_start, sl
}
}
int read_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd read_fd , char * const name )
int read_task_socket ( circular_buffer_t * cir_buf, slurm_fd read_fd , io_debug_t * dbg )
{
int bytes_read ;
int local_errno ;
/* test for wierd state */
if ( ( cir_buf->write_size == 0 ) )
{
info ( "%s cir_buf->write_size == 0 this shouldn't happen" , name ) ;
if ( dbg ) debug3 ( "%s cir_buf->write_size == 0 this shouldn't happen" , dbg -> name ) ;
slurm_seterrno_ret ( ESLURMD_CIRBUF_POINTER_0 ) ;
}
......@@ -243,7 +324,7 @@ int read_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start, s
/* test for EOF on socket */
if ( bytes_read == 0)
{
info ( "0 returned EOF on socket ") ;
if ( dbg ) debug3 ( "%i 0 returned EOF on socket" , dbg -> global_task_id ) ;
slurm_seterrno_ret ( ESLURMD_EOF_ON_SOCKET ) ;
}
else if ( bytes_read == -1 )
......@@ -255,38 +336,38 @@ int read_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start, s
case ECONNREFUSED:
case ECONNRESET:
case ENOTCONN:
info ("lost %s socket connectiont" , name );
if ( dbg ) debug3 ("lost %s socket connection %m errno: %i" , dbg -> name , local_errno );
slurm_seterrno_ret ( ESLURMD_SOCKET_DISCONNECT ) ;
break ;
default:
info ( "error reading %s stream for task %i, %m errno: %i , bytes read %i ",
name , task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] , local_errno , bytes_read ) ;
error ( "uncaught errno %i", local_errno ) ;
if ( dbg ) debug3 ( "%i error reading %s sock stream, %m errno: %i , bytes read %i ",
dbg -> global_task_id , dbg -> name , local_errno , bytes_read ) ;
slurm_seterrno_ret ( ESLURMD_UNKNOWN_SOCKET_ERROR ) ;
break;
}
}
else
{
info ( "bytes_read: %i don't know what to do with this return code ", bytes_read ) ;
debug3 ( "bytes_read: %i don't know what to do with this return code ", bytes_read ) ;
slurm_seterrno_ret ( ESLURMD_UNKNOWN_SOCKET_ERROR ) ;
}
}
else
{
cir_buf_write_update ( cir_buf , bytes_read ) ;
return SLURM_SUCCESS ;
}
return SLURM_SUCCESS ;
}
int write_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start, slurm_fd write_fd , char * const name )
int write_task_socket ( circular_buffer_t * cir_buf, slurm_fd write_fd , io_debug_t * dbg )
{
int sock_bytes_written ;
int local_errno ;
/* test for wierd state */
if ( ( cir_buf->read_size == 0 ) )
{
info ( "%s cir_buf->read_size == 0 this shouldn't happen" , name ) ;
if ( dbg ) debug3 ( "%s cir_buf->read_size == 0 this shouldn't happen" , dbg -> name ) ;
slurm_seterrno_ret ( ESLURMD_CIRBUF_POINTER_0 ) ;
}
......@@ -296,7 +377,7 @@ int write_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start,
/* test for EOF on socket */
if ( sock_bytes_written == 0)
{
info ( "0 returned EOF on socket ") ;
if ( dbg ) debug3 ( "%i 0 returned EOF on socket" , dbg -> global_task_id ) ;
slurm_seterrno_ret ( ESLURMD_EOF_ON_SOCKET ) ;
}
else if ( sock_bytes_written == -1 )
......@@ -308,61 +389,143 @@ int write_task_socket ( circular_buffer_t * cir_buf, task_start_t * task_start,
case ECONNREFUSED:
case ECONNRESET:
case ENOTCONN:
info ( "%s connection losti %i", name , local_errno ) ;
slurm_close_stream ( task_start->sockets[STDIN_OUT_SOCK] ) ;
if ( dbg ) debug3 ( "lost %s socket connection %m errno: %i", dbg -> name , local_errno ) ;
slurm_close_stream ( write_fd ) ;
slurm_seterrno_ret ( ESLURMD_SOCKET_DISCONNECT ) ;
break ;
default:
info ( "error sending %s stream for task %i, errno %i",
name , task_start -> launch_msg -> global_task_ids[ task_start -> local_task_id ] , local_errno ) ;
error ( "uncaught errno %i", local_errno ) ;
if ( dbg ) debug3 ( "%i error sending %s sock stream, %m errno %i, sock bytes written %i",
dbg -> global_task_id, dbg -> name , local_errno , sock_bytes_written ) ;
slurm_seterrno_ret ( ESLURMD_UNKNOWN_SOCKET_ERROR ) ;
break ;
}
}
else
{
info ( "bytes_read: %i don't know what to do with this return code ", sock_bytes_written ) ;
debug3 ( "bytes_read: %i don't know what to do with this return code ", sock_bytes_written ) ;
slurm_seterrno_ret ( ESLURMD_UNKNOWN_SOCKET_ERROR ) ;
}
}
else
{
cir_buf_read_update ( cir_buf , sock_bytes_written ) ;
return SLURM_SUCCESS ;
}
return SLURM_SUCCESS ;
}
int error_task_pipe ( )
int error_task_pipe ( nbio_attr_t * nbio_attr , int fd_index )
{
switch ( errno )
{
case ESLURMD_CIRBUF_POINTER_0 :
break ;
case ESLURMD_PIPE_DISCONNECT :
nbio_attr -> flush_flag = true ;
break ;
}
return SLURM_SUCCESS ;
}
int error_task_socket ( )
int error_task_socket ( nbio_attr_t * nbio_attr , int fd_index )
{
switch ( errno )
{
case ESLURMD_CIRBUF_POINTER_0 :
if ( nbio_attr -> flush_flag )
{
nbio_attr -> reconnect_flags[fd_index] = DRAINED ;
}
else
{
debug3 ( "ESLURMD_CIRBUF_POINTER_0 shouldn't have occured" ) ;
}
break ;
case ESLURMD_UNKNOWN_SOCKET_ERROR :
case ESLURMD_SOCKET_DISCONNECT :
case ESLURMD_EOF_ON_SOCKET :
switch ( nbio_attr -> reconnect_flags[fd_index] )
{
case CONNECTED :
nbio_attr -> reconnect_flags[fd_index] = RECONNECT ;
break ;
case DRAIN :
case DRAINED :
nbio_attr -> die = true;
break ;
case RECONNECT :
break ;
default :
debug3 ( "Unknown case in error_task_socket:ESLURMD_EOF_ON_SOCKET: %i" , nbio_attr -> reconnect_flags[fd_index] ) ;
break ;
}
break ;
default :
debug3 ( "Unknown case in error_task_socket: %i" , nbio_attr -> reconnect_flags[fd_index] ) ;
break ;
}
return SLURM_SUCCESS ;
}
int nbio_set_init ( nbio_attr_t * nbio_attr , slurm_fd_set * set_ptr )
{
int i ;
nbio_attr -> max_fd = 0 ;
for ( i=0 ; i < 3 ; i++ )
{
FD_ZERO ( & set_ptr[i] ) ;
}
slurm_FD_SET ( nbio_attr -> fd [IN_OUT_FD] , & set_ptr [RD_SET] ) ;
slurm_FD_SET ( nbio_attr -> fd [CHILD_OUT_RD_FD] , & set_ptr [RD_SET] ) ;
slurm_FD_SET ( nbio_attr -> fd [CHILD_ERR_RD_FD] , & set_ptr [RD_SET] ) ;
for ( i=0 ; i < 5 ; i++ )
if ( nbio_attr -> flush_flag )
{
slurm_FD_SET ( nbio_attr -> fd [i] , & set_ptr[ER_SET] ) ;
/* write fds */
slurm_FD_SET ( nbio_attr -> fd [IN_OUT_FD] , & set_ptr [WR_SET] ) ;
slurm_FD_SET ( nbio_attr -> fd [SIG_ERR_FD] , & set_ptr [WR_SET] ) ;
/* error fds */
slurm_FD_SET ( nbio_attr -> fd [IN_OUT_FD] , & set_ptr [ER_SET] ) ;
slurm_FD_SET ( nbio_attr -> fd [SIG_ERR_FD] , & set_ptr [ER_SET] ) ;
}
else
{
/* read fds */
slurm_FD_SET ( nbio_attr -> fd [IN_OUT_FD] , & set_ptr [RD_SET] ) ;
slurm_FD_SET ( nbio_attr -> fd [CHILD_OUT_RD_FD] , & set_ptr [RD_SET] ) ;
slurm_FD_SET ( nbio_attr -> fd [CHILD_ERR_RD_FD] , & set_ptr [RD_SET] ) ;
nbio_attr -> max_fd = 0 ;
/* error fds */
for ( i=0 ; i < 5 ; i++ )
{
slurm_FD_SET ( nbio_attr -> fd [i] , & set_ptr[ER_SET] ) ;
}
}
return SLURM_SUCCESS ;
}
int set_max_fd ( nbio_attr_t * nbio_attr )
{
int i ;
nbio_attr -> max_fd = 0 ;
for ( i=0 ; i < 5 ; i++ )
{
nbio_attr -> max_fd = MAX ( nbio_attr -> max_fd , nbio_attr -> fd [ i ] ) ;
}
return SLURM_SUCCESS ;
}
int nbio_cleanup ( nbio_attr_t * nbio_attr )
{
free_circular_buffer ( nbio_attr -> in_cir_buf ) ;
free_circular_buffer ( nbio_attr -> out_cir_buf ) ;
free_circular_buffer ( nbio_attr -> err_cir_buf ) ;
slurm_close_stream ( nbio_attr -> fd [IN_OUT_FD] ) ;
slurm_close_stream ( nbio_attr -> fd [SIG_ERR_FD] ) ;
close ( nbio_attr -> fd [CHILD_IN_WR_FD] ) ;
close ( nbio_attr -> fd [CHILD_OUT_RD_FD] ) ;
close ( nbio_attr -> fd [CHILD_ERR_RD_FD] ) ;
return SLURM_SUCCESS ;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment