Skip to content
Snippets Groups Projects
Commit 2ddf57f5 authored by tewk's avatar tewk
Browse files

moving from a fork implementation to a pthread implementation

TODO
fix shmem segment
write setenv
write slurm_addr packer
parent 90f45dd3
No related branches found
No related tags found
No related merge requests found
...@@ -8,7 +8,18 @@ ...@@ -8,7 +8,18 @@
#include <src/common/slurm_protocol_api.h> #include <src/common/slurm_protocol_api.h>
#include <src/slurmd/shmem_struct.h> #include <src/slurmd/shmem_struct.h>
#define SHMEM_KEY "slurm_shmem_key" /* function prototypes */
void * add_task ( slurmd_shmem_t * shmem , task_t * new_task );
void copy_task ( task_t * dest , task_t * const src );
void copy_job_step ( job_step_t * dest , job_step_t * src );
void clear_task ( task_t * task );
void clear_job_step( job_step_t * job_step );
/* gets a pointer to the slurmd shared memory segment
* if it doesn't exist, one is created
* returns - a void * pointer to the shared memory segment
*/
void * get_shmem ( ) void * get_shmem ( )
{ {
int shmem_id ; int shmem_id ;
...@@ -20,29 +31,37 @@ void * get_shmem ( ) ...@@ -20,29 +31,37 @@ void * get_shmem ( )
return shmem_addr ; return shmem_addr ;
} }
/* initializes the shared memory segment, this should only be called once by the master slurmd
* after the initial get_shmem call
* shmem - pointer to the shared memory segment returned by get_shmem ( )
*/
void init_shmem ( slurmd_shmem_t * shmem ) void init_shmem ( slurmd_shmem_t * shmem )
{ {
int i ; int i ;
/* set everthing to zero */
memset ( shmem , 0 , sizeof ( slurmd_shmem_t ) ); memset ( shmem , 0 , sizeof ( slurmd_shmem_t ) );
/* sanity check */
/* set all task objects to unused */
for ( i=0 ; i < MAX_TASKS ; i ++ ) for ( i=0 ; i < MAX_TASKS ; i ++ )
{ {
clear_task ( & shmem->tasks[i] ) ; clear_task ( & shmem->tasks[i] ) ;
/*
shmem->tasks[i] . used = false ;
shmem->tasks[i] . job_step = NULL ;
shmem->tasks[i] . next = NULL ;
*/
} }
/* set all job_step objects to unused */
for ( i=0 ; i < MAX_JOB_STEPS ; i ++ ) for ( i=0 ; i < MAX_JOB_STEPS ; i ++ )
{ {
clear_job_step ( & shmem->job_steps[i] ) ; clear_job_step ( & shmem->job_steps[i] ) ;
/*
shmem->job_steps[i] . used = false ;
shmem->job_steps[i] . haed_task = NULL ;
*/
} }
} }
/* runs through the job_step array looking for a unused job_step.
* upon finding one the passed src job_step is copied into the shared mem job_step array
* shmem - pointer to the shared memory segment returned by get_shmem ( )
* job_step_t - src job_step to be added to the shared memory list
* returns - the address of the assigned job_step in the shared mem job_step array or
* the function dies on a fatal log call if the array is full
*/
void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step ) void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step )
{ {
int i ; int i ;
...@@ -59,6 +78,13 @@ void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step ) ...@@ -59,6 +78,13 @@ void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step )
return (void * ) SLURM_ERROR ; return (void * ) SLURM_ERROR ;
} }
/* runs through the task array looking for a unused task.
* upon finding one the passed src task is copied into the shared mem task array
* shmem - pointer to the shared memory segment returned by get_shmem ( )
* new_task - src task to be added to the shared memory list
* returns - the address of the assigned task in the shared mem task array
* the function dies on a fatal log call if the array is full
*/
void * add_task ( slurmd_shmem_t * shmem , task_t * new_task ) void * add_task ( slurmd_shmem_t * shmem , task_t * new_task )
{ {
int i ; int i ;
...@@ -75,6 +101,7 @@ void * add_task ( slurmd_shmem_t * shmem , task_t * new_task ) ...@@ -75,6 +101,7 @@ void * add_task ( slurmd_shmem_t * shmem , task_t * new_task )
return (void * ) SLURM_ERROR ; return (void * ) SLURM_ERROR ;
} }
/* copies the contents of one task to another, has no memory allocate or dynamic length members */
void copy_task ( task_t * dest , task_t * const src ) void copy_task ( task_t * dest , task_t * const src )
{ {
dest -> threadid = src -> threadid; dest -> threadid = src -> threadid;
...@@ -84,12 +111,21 @@ void copy_task ( task_t * dest , task_t * const src ) ...@@ -84,12 +111,21 @@ void copy_task ( task_t * dest , task_t * const src )
dest -> gid = src -> gid; dest -> gid = src -> gid;
} }
/* copies the contents of one job_step to another, has no memory allocate or dynamic length members */
void copy_job_step ( job_step_t * dest , job_step_t * src ) void copy_job_step ( job_step_t * dest , job_step_t * src )
{ {
dest -> job_id = src -> job_id ; dest -> job_id = src -> job_id ;
dest -> job_step_id = src -> job_step_id ; dest -> job_step_id = src -> job_step_id ;
} }
/* prepends a new task onto the front of a list of tasks assocuated with a job_step.
* it calls add_task which copies the passed task into a task array in shared memoery
* sets pointers from the task to the corresponding job_step array
* note if the task array is full, the add_task function will assert and exiti
* shmem - pointer to the shared memory segment returned by get_shmem ( )
* job_step - job_step to receive the new task
* task - task to be prepended
*/
int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task ) int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task )
{ {
task_t * new_task ; task_t * new_task ;
...@@ -108,6 +144,7 @@ int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * tas ...@@ -108,6 +144,7 @@ int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * tas
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
/* clears a job_step and associated task list for future use */
int deallocate_job_step ( job_step_t * jobstep ) int deallocate_job_step ( job_step_t * jobstep )
{ {
task_t * task_ptr = jobstep -> head_task ; task_t * task_ptr = jobstep -> head_task ;
...@@ -122,6 +159,7 @@ int deallocate_job_step ( job_step_t * jobstep ) ...@@ -122,6 +159,7 @@ int deallocate_job_step ( job_step_t * jobstep )
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
/* clears a job_step array memeber for future use */
void clear_task ( task_t * task ) void clear_task ( task_t * task )
{ {
task -> used = false ; task -> used = false ;
...@@ -129,6 +167,7 @@ void clear_task ( task_t * task ) ...@@ -129,6 +167,7 @@ void clear_task ( task_t * task )
task -> next = NULL ; task -> next = NULL ;
} }
/* clears a job_step array memeber for future use */
void clear_job_step( job_step_t * job_step ) void clear_job_step( job_step_t * job_step )
{ {
job_step -> used = false ; job_step -> used = false ;
......
...@@ -36,11 +36,7 @@ typedef struct slurmd_shmem ...@@ -36,11 +36,7 @@ typedef struct slurmd_shmem
void * get_shmem ( ); void * get_shmem ( );
void init_shmem ( slurmd_shmem_t * shmem ); void init_shmem ( slurmd_shmem_t * shmem );
void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step ) ; void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step ) ;
void * add_task ( slurmd_shmem_t * shmem , task_t * new_task );
void copy_task ( task_t * dest , task_t * const src );
void copy_job_step ( job_step_t * dest , job_step_t * src );
int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task );
int deallocate_job_step ( job_step_t * jobstep ); int deallocate_job_step ( job_step_t * jobstep );
void clear_task ( task_t * task ); int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task );
void clear_job_step( job_step_t * job_step );
#endif #endif
...@@ -54,26 +54,22 @@ time_t init_time; ...@@ -54,26 +54,22 @@ time_t init_time;
slurmd_shmem_t * shmem_seg ; slurmd_shmem_t * shmem_seg ;
/* function prototypes */ /* function prototypes */
void * request_thread ( void * arg ) ;
void slurmd_req ( slurm_msg_t * msg ); void slurmd_req ( slurm_msg_t * msg );
int send_node_registration_status_msg ( ) ;
int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg ) ;
int slurmd_msg_engine ( void * args ) ; int slurmd_msg_engine ( void * args ) ;
void * request_thread ( void * arg ) ; int send_node_registration_status_msg ( ) ;
void init ( ) ;
void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) ;
void slurm_rpc_kill_tasks ( slurm_msg_t * msg ) ; void slurm_rpc_kill_tasks ( slurm_msg_t * msg ) ;
void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) ;
int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg ) ;
int main (int argc, char *argv[]) int main (int argc, char *argv[])
{ {
int error_code ; int error_code ;
char node_name[MAX_NAME_LEN]; char node_name[MAX_NAME_LEN];
log_options_t opts = LOG_OPTS_STDERR_ONLY ; log_options_t opts = LOG_OPTS_STDERR_ONLY ;
init_time = time (NULL); init_time = time (NULL);
log_init(argv[0], opts, SYSLOG_FACILITY_DAEMON, NULL); log_init(argv[0], opts, SYSLOG_FACILITY_DAEMON, NULL);
/* /*
if ( ( error_code = init_slurm_conf () ) ) if ( ( error_code = init_slurm_conf () ) )
fatal ("slurmd: init_slurm_conf error %d", error_code); fatal ("slurmd: init_slurm_conf error %d", error_code);
...@@ -81,17 +77,22 @@ int main (int argc, char *argv[]) ...@@ -81,17 +77,22 @@ int main (int argc, char *argv[])
fatal ("slurmd: error %d from read_slurm_conf reading %s", error_code, SLURM_CONF); fatal ("slurmd: error %d from read_slurm_conf reading %s", error_code, SLURM_CONF);
*/ */
/* shared memory init */
shmem_seg = get_shmem ( ) ; shmem_seg = get_shmem ( ) ;
init_shmem ( shmem_seg ) ; init_shmem ( shmem_seg ) ;
if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) ) if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) )
fatal ("slurmd: errno %d from gethostname", errno); fatal ("slurmd: errno %d from gethostname", errno);
task_mgr_init ( ) ;
/* send registration message to slurmctld*/
send_node_registration_status_msg ( ) ; send_node_registration_status_msg ( ) ;
slurmd_msg_engine ( NULL ) ; slurmd_msg_engine ( NULL ) ;
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
/* sends a node_registration_status_msg to the slurmctld upon boot
* announcing availibility for computationt */
int send_node_registration_status_msg ( ) int send_node_registration_status_msg ( )
{ {
slurm_msg_t request_msg ; slurm_msg_t request_msg ;
...@@ -107,21 +108,31 @@ int send_node_registration_status_msg ( ) ...@@ -107,21 +108,31 @@ int send_node_registration_status_msg ( )
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
/* calls machine dependent system info calls to fill structure
* node_reg_msg - structure to fill with system info
* returns - return code
*/
int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg ) int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg )
{ {
int error_code ; int error_code ;
char node_name[MAX_NAME_LEN]; char node_name[MAX_NAME_LEN];
/* get hostname */
if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) ) if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) )
fatal ("slurmd: errno %d from gethostname", errno); fatal ("slurmd: errno %d from gethostname", errno);
/* fill in data structure */
node_reg_msg -> timestamp = time ( NULL ) ; node_reg_msg -> timestamp = time ( NULL ) ;
node_reg_msg -> node_name = xstrdup ( node_name ) ; node_reg_msg -> node_name = xstrdup ( node_name ) ;
get_procs ( & node_reg_msg -> cpus ); get_procs ( & node_reg_msg -> cpus );
get_memory ( & node_reg_msg -> real_memory_size ) ; get_memory ( & node_reg_msg -> real_memory_size ) ;
get_tmp_disk ( & node_reg_msg -> temporary_disk_space ) ; get_tmp_disk ( & node_reg_msg -> temporary_disk_space ) ;
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
/* accept thread for incomming slurm messages
* args - do nothing right now */
int slurmd_msg_engine ( void * args ) int slurmd_msg_engine ( void * args )
{ {
int error_code ; int error_code ;
...@@ -185,6 +196,10 @@ int slurmd_msg_engine ( void * args ) ...@@ -185,6 +196,10 @@ int slurmd_msg_engine ( void * args )
return 0 ; return 0 ;
} }
/* worker thread method for accepted message connections
* arg - a slurm_msg_t representing the accepted incomming message
* returns - nothing, void * because of pthread def
*/
void * request_thread ( void * arg ) void * request_thread ( void * arg )
{ {
slurm_msg_t * msg = ( slurm_msg_t * ) arg ; slurm_msg_t * msg = ( slurm_msg_t * ) arg ;
...@@ -196,6 +211,9 @@ void * request_thread ( void * arg ) ...@@ -196,6 +211,9 @@ void * request_thread ( void * arg )
return NULL ; return NULL ;
} }
/* multiplexing message handler
* msg - incomming request message
*/
void slurmd_req ( slurm_msg_t * msg ) void slurmd_req ( slurm_msg_t * msg )
{ {
...@@ -217,6 +235,11 @@ void slurmd_req ( slurm_msg_t * msg ) ...@@ -217,6 +235,11 @@ void slurmd_req ( slurm_msg_t * msg )
slurm_free_msg ( msg ) ; slurm_free_msg ( msg ) ;
} }
/******************************/
/* rpc methods */
/******************************/
/* Launches tasks */ /* Launches tasks */
void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) void slurm_rpc_launch_tasks ( slurm_msg_t * msg )
{ {
...@@ -274,7 +297,6 @@ void slurm_rpc_kill_tasks ( slurm_msg_t * msg ) ...@@ -274,7 +297,6 @@ void slurm_rpc_kill_tasks ( slurm_msg_t * msg )
} }
} }
/* Reconfigure - re-initialized from configuration files */
void slurm_rpc_slurmd_example ( slurm_msg_t * msg ) void slurm_rpc_slurmd_example ( slurm_msg_t * msg )
{ {
/* init */ /* init */
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
#include <sys/wait.h> #include <sys/wait.h>
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
#include <pthread.h>
#include <src/common/log.h>
#include <src/common/list.h> #include <src/common/list.h>
#include <src/common/xerrno.h> #include <src/common/xerrno.h>
#include <src/common/xmalloc.h> #include <src/common/xmalloc.h>
...@@ -17,32 +19,50 @@ global variables ...@@ -17,32 +19,50 @@ global variables
/* file descriptor defines */ /* file descriptor defines */
#define STDIN 0 #define STDIN_FILENO 0
#define STDOUT 1 #define STDOUT_FILENO 1
#define STDERR 2 #define STDERR_FILENO 2
#define READFD 0 #define MAX_TASKS_PER_LAUNCH 64
#define WRITEFD 1 #define CHILD_IN 0
#define CHILD_IN_RD 0
#define CHILD_IN_WR 1
#define CHILD_OUT 2
#define CHILD_OUT_RD 2
#define CHILD_OUT_WR 3
#define CHILD_ERR 4
#define CHILD_ERR_RD 4
#define CHILD_ERR_WR 5
/* prototypes */ /* prototypes */
void slurm_free_task ( void * _task ) ; void slurm_free_task ( void * _task ) ;
int iowatch_launch ( launch_tasks_msg_t * launch_msg ) ; void * iowatch_launch_thread ( void * arg ) ;
int match_job_id_job_step_id ( void * _x, void * _key ) ;
int append_task_to_list ( launch_tasks_msg_t * launch_msg , int pid ) ;
int kill_task ( task_t * task ) ; int kill_task ( task_t * task ) ;
int interconnect_init ( launch_tasks_msg_t * launch_msg ); int interconnect_init ( launch_tasks_msg_t * launch_msg );
int fan_out_task_launch ( launch_tasks_msg_t * launch_msg ); int fan_out_task_launch ( launch_tasks_msg_t * launch_msg );
void * task_exec_thread ( void * arg ) ;
int init_parent_pipes ( int * pipes ) ;
void setup_parent_pipes ( int * pipes ) ;
int setup_child_pipes ( int * pipes ) ;
int forward_io ( task_start_t * task_arg ) ;
void * stdout_io_pipe_thread ( void * arg ) ;
/******************************************************************
*task launch method call hierarchy
*
*launch_tasks()
* interconnect_init()
* fan_out_task_launch() (pthread_create)
* iowatch_launch_thread()
* task_exec_thread() (pthread_create)
******************************************************************/
void task_mgr_init ( ) /* exported module funtion to launch tasks */
{
}
int launch_tasks ( launch_tasks_msg_t * launch_msg ) int launch_tasks ( launch_tasks_msg_t * launch_msg )
{ {
return interconnect_init ( launch_msg ); return interconnect_init ( launch_msg );
} }
/* Contains interconnect specific setup instructions and then calls fan_out_task_launch */ /* Contains interconnect specific setup instructions and then calls
* fan_out_task_launch */
int interconnect_init ( launch_tasks_msg_t * launch_msg ) int interconnect_init ( launch_tasks_msg_t * launch_msg )
{ {
return fan_out_task_launch ( launch_msg ) ; return fan_out_task_launch ( launch_msg ) ;
...@@ -51,137 +71,264 @@ int interconnect_init ( launch_tasks_msg_t * launch_msg ) ...@@ -51,137 +71,264 @@ int interconnect_init ( launch_tasks_msg_t * launch_msg )
int fan_out_task_launch ( launch_tasks_msg_t * launch_msg ) int fan_out_task_launch ( launch_tasks_msg_t * launch_msg )
{ {
int i ; int i ;
int cpid[64] ; int rc ;
task_start_t task_start[MAX_TASKS_PER_LAUNCH] ;
/*place on the stack so we don't have to worry about mem clean up should
*this task_launch get brutally killed by a kill_job step message */
//task_start = xmalloc ( sizeof ( task_start[MAX_TASKS_PER_LAUNCH] ) );
/* launch requested number of threads */
for ( i = 0 ; i < launch_msg->tasks_to_launch ; i ++ ) for ( i = 0 ; i < launch_msg->tasks_to_launch ; i ++ )
{ {
cpid[i] = fork ( ) ; task_start[i].launch_msg = launch_msg ;
if ( cpid[i] == 0 ) task_start[i].slurmd_fanout_id = i ;
{ rc = pthread_create ( & task_start[i].pthread_id , NULL , iowatch_launch_thread , ( void * ) & task_start[i] ) ;
break ;
}
} }
/*parent*/ /* wait for all the launched threads to finish */
if ( i == launch_msg->tasks_to_launch ) for ( i = 0 ; i < launch_msg->tasks_to_launch ; i ++ )
{
int waiting = i ;
int j ;
int pid ;
while (waiting > 0) {
pid = waitpid(0, NULL, 0);
if (pid < 0) {
xperror("waitpid");
exit(1);
}
for (j = 0; j < launch_msg->tasks_to_launch ; j++) {
if (cpid[j] == pid)
waiting--;
}
}
}
/*child*/
else
{ {
iowatch_launch ( launch_msg ) ; rc = pthread_join( task_start[i].pthread_id , & task_start[i].thread_return ) ;
} }
//xfree ( tast_start )
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
int iowatch_launch ( launch_tasks_msg_t * launch_msg )
void * iowatch_launch_thread ( void * arg )
{ {
int rc ; task_start_t * task_start = ( task_start_t * ) arg ;
int newstdin[2] ; int pipes[6] ;
int newstdout[2] ;
int newstderr[2] ;
/* /*
int * childin = &newstdin[1] ; int * childin = &newstdin[1] ;
int * childout = &newstdout[0] ; int * childout = &newstdout[0] ;
int * childerr = &newstderr[0] ; int * childerr = &newstderr[0] ;
*/ */
task_t * task ; /* init arg to be passed to task launch thread */
int pid ; task_start->pipes = pipes ;
/* open pipes to be used in dup after fork */ /* create pipes to read child stdin, stdout, sterr */
if( ( rc = pipe ( newstdin ) ) ) init_parent_pipes ( pipes ) ;
/* create task thread */
pthread_create ( & task_start->exec_pthread_id , NULL , task_exec_thread , ( void * ) arg ) ;
/* pipe output from child task to srun over sockets */
setup_parent_pipes ( pipes ) ;
//forward_io ( arg ) ;
/* wait for thread to die */
pthread_join ( task_start->exec_pthread_id , & task_start->exec_thread_return ) ;
return ( void * ) SLURM_SUCCESS ;
}
int forward_io ( task_start_t * task_arg )
{
pthread_create ( & task_arg->io_pthread_id[STDOUT_FILENO] , NULL , stdout_io_pipe_thread , task_arg ) ;
return SLURM_SUCCESS ;
}
void * stdout_io_pipe_thread ( void * arg )
{
slurm_fd connection_fd ;
slurm_addr dest_addr ;
task_start_t * io_arg = ( task_start_t * ) arg ;
char buffer[SLURMD_IO_MAX_BUFFER_SIZE] ;
int bytes_read ;
int sock_bytes_written ;
/* dest_addr = somethiong from arg */
if ( ( connection_fd = slurm_open_stream ( & dest_addr ) ) == SLURM_PROTOCOL_ERROR )
{ {
return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; info ( "error opening socket to srun to pipe stdout" ) ;
pthread_exit ( 0 ) ;
} }
if( ( rc = pipe ( newstdout ) ) )
while ( true )
{ {
return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; if ( ( bytes_read = read ( io_arg->pipes[CHILD_OUT_RD] , buffer , SLURMD_IO_MAX_BUFFER_SIZE ) ) <= 0 )
{
info ( "error reading stdout stream for task %i", 1 ) ;
pthread_exit ( 0 ) ;
}
if ( ( sock_bytes_written = slurm_write_stream ( connection_fd , buffer , bytes_read ) ) == SLURM_PROTOCOL_ERROR )
{
info ( "error sending stdout stream for task %i", 1 ) ;
pthread_exit ( 0 ) ;
}
} }
if( ( rc = pipe ( newstderr ) ) ) }
/*
int forward_io ( task_thread_arg_t * task_arg )
{
slurm_addr in_out_addr = task_arg->launch_msg->
slurm_addr sig_err_addr = task_arg->launch_msg->
slurm_fd in_out ;
slurm_fd sig_err ;
int * pipes = task_arg-> pipes ;
fd_set n_rd_set ;
fd_set n_wr_set ;
fd_set n_err_set ;
slurm_fd_set ns_rd_set ;
slurm_fd_set ns_wr_set ;
slurm_fd_set ns_err_set ;
fd_set rd_set ;
fd_set wr_set ;
fd_set err_set ;
slurm_fd_set s_rd_set ;
slurm_fd_set s_wr_set ;
slurm_fd_set s_err_set ;
int s_rc ;
int rc ;
struct timeval timeout ;
timeout . tv_sec = 0 ;
timeout . tv_sec = SLURM_SELECT_TIMEOUT ;
in_out = slurm_stream_connect ( & in_out_addr ) ;
sig_err = slurm_stream_connect ( & sig_err_addr ) ;
FD_ZERO ( n_rd_set ) ;
FD_ZERO ( n_wd_set ) ;
FD_ZERO ( n_err_set ) ;
slurm_FD_ZERO ( ns_rd_set ) ;
slurm_FD_ZERO ( ns_wr_set ) ;
slurm_FD_ZERO ( ns_err_set ) ;
slurm_FD_SET ( in_out, & ns_rd_set ) ;
slurm_FD_SET ( in_out, & ns_err_set ) ;
slurm_FD_SET ( ig_err & ns_rd_set ) ;
slurm_FD_SET ( ig_err & ns_err_set ) ;
FD_SET ( pipes[CHILD_OUT_RD], & n_rd_set ) ;
FD_SET ( pipes[CHILD_ERR_RD], & n_rd_set ) ;
FD_SET ( pipes[CHILD_IN_WR], & n_err_set ) ;
FD_SET ( pipes[CHILD_OUT_RD], & n_err_set ) ;
FD_SET ( pipes[CHILD_ERR_RD], & n_err_set ) ;
while ( )
{ {
return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ; memcpy ( &rd_set , &n_rd_set , sizeof ( rd_set ) ) ;
memcpy ( &wr_set , &n_wr_set , sizeof ( wr_set ) ) ;
memcpy ( &err_set , &ns_err_set , sizeof ( err_set ) ) ;
memcpy ( &s_rd_set , &ns_rd_set , sizeof ( s_rd_set ) ) ;
memcpy ( &s_wr_set , &ns_wr_set , sizeof ( s_wr_set ) ) ;
memcpy ( &s_err_set , &ns_err_set , sizeof ( s_err_set ) ) ;
rc = select ( pipes[CHILD_ERR_RD] + 1 , rd_set , wr_set , err_set , & timeout ) ;
if ( rc > 0 )
timeout . tv_sec = 0 ;
timeout . tv_sec = SLURM_SELECT_TIMEOUT ;
s_rc = slurm_select ( sig_err_addr + 1 , s_rd_set , s_wr_set , s_err_set , & timeout ) ;
FD_ZERO ( n_rd_set ) ;
FD_ZERO ( n_wd_set ) ;
FD_ZERO ( n_err_set ) ;
slurm_FD_ZERO ( ns_rd_set ) ;
slurm_FD_ZERO ( ns_wr_set ) ;
slurm_FD_ZERO ( ns_err_set ) ;
slurm_FD_SET ( in_out, & ns_rd_set ) ;
slurm_FD_SET ( ig_err & ns_rd_set ) ;
slurm_FD_SET ( in_out, & ns_err_set ) ;
slurm_FD_SET ( ig_err & ns_err_set ) ;
FD_SET ( pipes[CHILD_OUT_RD], & n_rd_set ) ;
FD_SET ( pipes[CHILD_ERR_RD], & n_rd_set ) ;
FD_SET ( pipes[CHILD_IN_WR], & n_err_set ) ;
FD_SET ( pipes[CHILD_OUT_RD], & n_err_set ) ;
FD_SET ( pipes[CHILD_ERR_RD], & n_err_set ) ;
} }
switch ( ( pid = fork ( ) ) ) }
*/
void * task_exec_thread ( void * arg )
{
task_start_t * task_arg = ( task_start_t * ) arg ;
launch_tasks_msg_t * launch_msg = task_arg -> launch_msg ;
int * pipes = task_arg->pipes ;
int rc ;
setup_child_pipes ( pipes ) ;
/* setuid and gid*/
if ( ( rc = setuid ( launch_msg->uid ) ) == SLURM_ERROR ) ;
if ( ( rc = setgid ( launch_msg->gid ) ) == SLURM_ERROR ) ;
/* run bash and cmdline */
chdir ( launch_msg->cwd ) ;
execl ( "/bin/bash" , "bash" , "-c" , launch_msg->cmd_line );
return ( void * ) SLURM_SUCCESS ;
}
void setup_parent_pipes ( int * pipes )
{
close ( pipes[CHILD_IN_RD] ) ;
close ( pipes[CHILD_OUT_WR] ) ;
close ( pipes[CHILD_ERR_WR] ) ;
}
int init_parent_pipes ( int * pipes )
{
int rc ;
/* open pipes to be used in dup after fork */
if( ( rc = pipe ( & pipes[CHILD_IN] ) ) )
{ {
/*error*/ return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
case -1:
return SLURM_ERROR ;
break ;
/*child*/
case 0:
/*dup stdin*/
close ( STDIN );
dup ( newstdin[READFD] ) ;
close ( newstdin[READFD] );
close ( newstdin[READFD] );
/*dup stdout*/
close ( STDOUT );
dup ( newstdout[WRITEFD] ) ;
close ( newstdout[READFD] );
close ( newstdout[WRITEFD] );
/*dup stderr*/
close ( STDERR );
dup ( newstderr[WRITEFD] ) ;
close ( newstderr[READFD] );
close ( newstderr[READFD] );
/* setuid and gid*/
if ( ( rc = setuid ( launch_msg->uid ) ) == SLURM_ERROR )
if ( ( rc = setgid ( launch_msg->gid ) ) == SLURM_ERROR ) ;
/* run bash and cmdline */
chdir ( launch_msg->cwd ) ;
execl ( "/bin/bash" , "bash" , "-c" , launch_msg->cmd_line );
return SLURM_SUCCESS ;
break ;
/*parent*/
default:
task = xmalloc ( sizeof ( task_t ) ) ;
append_task_to_list ( launch_msg , pid ) ;
close ( newstdin[READFD] ) ;
close ( newstdout[WRITEFD] ) ;
close ( newstderr[WRITEFD] ) ;
break ;
} }
if( ( rc = pipe ( & pipes[CHILD_OUT] ) ) )
if ( pid != 0 ) /*parent*/
{ {
/* select and io copy from std streams to sockets */ return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
}
if( ( rc = pipe ( & pipes[CHILD_ERR] ) ) )
{
return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
} }
return SLURM_SUCCESS ; return SLURM_SUCCESS ;
} }
int setup_child_pipes ( int * pipes )
int append_task_to_list ( launch_tasks_msg_t * launch_msg , int pid )
{ {
task_t * task ; int error_code = 0 ;
task = ( task_t * ) xmalloc ( sizeof ( task_t ) ) ;
if ( task == NULL ) /*dup stdin*/
return ENOMEM ; close ( STDIN_FILENO );
if ( SLURM_ERROR == ( error_code = dup ( CHILD_IN_RD ) ) ) info ("dup failed on child standard in pipe");
task -> pid = pid; close ( CHILD_IN_RD );
task -> uid = launch_msg -> uid; close ( CHILD_IN_WR );
task -> gid = launch_msg -> gid;
/*dup stdout*/
return SLURM_SUCCESS ; close ( STDOUT_FILENO );
if ( SLURM_ERROR == ( error_code = dup ( CHILD_OUT_WR ) ) ) info ("dup failed on child standard out pipe");
close ( CHILD_OUT_RD );
close ( CHILD_OUT_WR );
/*dup stderr*/
close ( STDERR_FILENO );
if ( SLURM_ERROR == ( error_code = dup ( CHILD_ERR_WR ) ) ) info ("dup failed on child standard err pipe");
close ( CHILD_ERR_RD );
close ( CHILD_ERR_WR );
return error_code ;
} }
int kill_tasks ( kill_tasks_msg_t * kill_task_msg ) int kill_tasks ( kill_tasks_msg_t * kill_task_msg )
......
...@@ -14,9 +14,31 @@ ...@@ -14,9 +14,31 @@
#endif /* HAVE_CONFIG_H */ #endif /* HAVE_CONFIG_H */
#endif #endif
#define STDIN_IO_THREAD 0
#define STDOUT_IO_THREAD 1
#define STDERR_IO_THREAD 2
#define STDSIG_IO_THREAD 3
#define HEART_BEAT_IO_THREAD 4
#define SLURMD_NUMBER_OF_IO_THREADS 5
#define SLURMD_IO_MAX_BUFFER_SIZE 4096
/* function prototypes */ /* function prototypes */
void task_mgr_init ( ) ;
int launch_tasks ( launch_tasks_msg_t * launch_msg ) ; int launch_tasks ( launch_tasks_msg_t * launch_msg ) ;
int kill_tasks ( kill_tasks_msg_t * kill_task_msg ) ; int kill_tasks ( kill_tasks_msg_t * kill_task_msg ) ;
typedef struct task_start
{
/*task control thread id*/
pthread_t pthread_id;
void * thread_return;
/*actual exec thread id*/
pthread_t exec_pthread_id;
void * exec_thread_return;
/*io threads ids*/
pthread_t io_pthread_id[SLURMD_NUMBER_OF_IO_THREADS];
void * io_thread_return[SLURMD_NUMBER_OF_IO_THREADS];
launch_tasks_msg_t * launch_msg;
int slurmd_fanout_id; /* the node specific task number used to compute port to stream std streams to */
int * pipes;
} task_start_t ;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment