From 2ddf57f5fc9ac9874b758e5814063ad5229c2149 Mon Sep 17 00:00:00 2001
From: tewk <tewk@unknown>
Date: Fri, 28 Jun 2002 18:03:31 +0000
Subject: [PATCH] moving from a fork implementation to a pthread implementation
 TODO fix shmem segment write setenv write slurm_addr packer

---
 src/slurmd/shmem_struct.c |  59 +++++-
 src/slurmd/shmem_struct.h |   8 +-
 src/slurmd/slurmd.c       |  46 +++--
 src/slurmd/task_mgr.c     | 375 ++++++++++++++++++++++++++------------
 src/slurmd/task_mgr.h     |  26 ++-
 5 files changed, 370 insertions(+), 144 deletions(-)

diff --git a/src/slurmd/shmem_struct.c b/src/slurmd/shmem_struct.c
index 145978efe22..283e6265f0e 100644
--- a/src/slurmd/shmem_struct.c
+++ b/src/slurmd/shmem_struct.c
@@ -8,7 +8,18 @@
 #include <src/common/slurm_protocol_api.h>
 #include <src/slurmd/shmem_struct.h>
 
-#define SHMEM_KEY "slurm_shmem_key"
+/* function prototypes */
+void * add_task ( slurmd_shmem_t * shmem , task_t * new_task );
+void copy_task ( task_t * dest , task_t * const src );
+void copy_job_step ( job_step_t * dest , job_step_t * src );
+void clear_task ( task_t * task );
+void clear_job_step( job_step_t * job_step );
+
+
+/* gets a pointer to the slurmd shared memory segment
+ * if it doesn't exist, one is created 
+ * returns - a void * pointer to the shared memory segment
+ */
 void * get_shmem ( )
 {
 	int shmem_id ;
@@ -20,29 +31,37 @@ void * get_shmem ( )
 	return shmem_addr ;
 }
 
+/* initializes the shared memory segment, this should only be called once by the master slurmd 
+ * after the initial get_shmem call
+ * shmem - pointer to the shared memory segment returned by get_shmem ( )
+ */
 void init_shmem ( slurmd_shmem_t * shmem )
 {
 	int i ;
+	/* set everthing to zero */
 	memset ( shmem , 0 , sizeof ( slurmd_shmem_t ) );
+	/* sanity check */
+	/* set all task objects to unused */
+
 	for ( i=0 ; i < MAX_TASKS ; i ++ )
 	{
 		clear_task ( & shmem->tasks[i] ) ;
-/*
-		shmem->tasks[i] . used = false ;
-		shmem->tasks[i] . job_step = NULL ;
-		shmem->tasks[i] . next = NULL ;
-*/
 	}
+	
+	/* set all job_step objects to unused */
 	for ( i=0 ; i < MAX_JOB_STEPS ; i ++ )
 	{
 		clear_job_step ( & shmem->job_steps[i] ) ;
-/*
-		shmem->job_steps[i] . used = false ;
-		shmem->job_steps[i] . haed_task = NULL ;
-*/
 	}
 }
 
+/* runs through the job_step array looking for a unused job_step.
+ * upon finding one the passed src job_step is copied into the shared mem job_step array
+ * shmem - pointer to the shared memory segment returned by get_shmem ( )
+ * job_step_t - src job_step to be added to the shared memory list
+ * returns - the address of the assigned job_step in the shared mem job_step array or
+ * the function dies on a fatal log call if the array is full
+ */
 void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step ) 
 {
 	int i ;
@@ -59,6 +78,13 @@ void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step )
 	return (void * ) SLURM_ERROR ;
 }
 
+/* runs through the task array looking for a unused task.
+ * upon finding one the passed src task is copied into the shared mem task array
+ * shmem - pointer to the shared memory segment returned by get_shmem ( )
+ * new_task - src task to be added to the shared memory list
+ * returns - the address of the assigned task in the shared mem task array
+ * the function dies on a fatal log call if the array is full
+ */
 void * add_task ( slurmd_shmem_t * shmem , task_t * new_task ) 
 {
 	int i ;
@@ -75,6 +101,7 @@ void * add_task ( slurmd_shmem_t * shmem , task_t * new_task )
 	return (void * ) SLURM_ERROR ;
 }
 
+/* copies the contents of one task to another, has no memory allocate or dynamic length members */
 void copy_task ( task_t * dest , task_t * const src ) 
 {
 	dest -> threadid 	= src -> threadid;
@@ -84,12 +111,21 @@ void copy_task ( task_t * dest , task_t * const src )
 	dest -> gid		= src -> gid;
 }
 
+/* copies the contents of one job_step to another, has no memory allocate or dynamic length members */
 void copy_job_step ( job_step_t * dest , job_step_t * src )
 {
 	dest -> job_id		= src -> job_id ;
 	dest -> job_step_id	= src -> job_step_id ;
 }
 
+/* prepends a new task onto the front of a list of tasks assocuated with a job_step.
+ * it calls add_task which copies the passed task into a task array in shared memoery
+ * sets pointers from the task to the corresponding job_step array 
+ * note if the task array is full,  the add_task function will assert and exiti
+ * shmem - pointer to the shared memory segment returned by get_shmem ( )
+ * job_step - job_step to receive the new task
+ * task - task to be prepended
+ */
 int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task )
 {
 	task_t * new_task ;
@@ -108,6 +144,7 @@ int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * tas
 	return SLURM_SUCCESS ;
 }
 
+/* clears a job_step and associated task list for future use */
 int deallocate_job_step ( job_step_t * jobstep )
 {
 	task_t * task_ptr = jobstep -> head_task ;
@@ -122,6 +159,7 @@ int deallocate_job_step ( job_step_t * jobstep )
 	return SLURM_SUCCESS ;
 }
 
+/* clears a job_step array memeber for future use */
 void clear_task ( task_t * task )
 {
 	task -> used = false ;
@@ -129,6 +167,7 @@ void clear_task ( task_t * task )
 	task -> next = NULL ;
 }
 
+/* clears a job_step array memeber for future use */
 void clear_job_step( job_step_t * job_step )
 {
 	job_step -> used = false ;
diff --git a/src/slurmd/shmem_struct.h b/src/slurmd/shmem_struct.h
index 641591b194d..a37897724b6 100644
--- a/src/slurmd/shmem_struct.h
+++ b/src/slurmd/shmem_struct.h
@@ -36,11 +36,7 @@ typedef struct slurmd_shmem
 void * get_shmem ( );
 void init_shmem ( slurmd_shmem_t * shmem );
 void * add_job_step ( slurmd_shmem_t * shmem , job_step_t * new_job_step ) ;
-void * add_task ( slurmd_shmem_t * shmem , task_t * new_task );
-void copy_task ( task_t * dest , task_t * const src );
-void copy_job_step ( job_step_t * dest , job_step_t * src );
-int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task );
 int deallocate_job_step ( job_step_t * jobstep );
-void clear_task ( task_t * task );
-void clear_job_step( job_step_t * job_step );
+int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task );
+
 #endif
diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c
index 2024381c7e6..11e007b60b5 100644
--- a/src/slurmd/slurmd.c
+++ b/src/slurmd/slurmd.c
@@ -54,26 +54,22 @@ time_t init_time;
 slurmd_shmem_t * shmem_seg ;
 
 /* function prototypes */
+void * request_thread ( void * arg ) ;
 void slurmd_req ( slurm_msg_t * msg );
-int send_node_registration_status_msg ( ) ;
-int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg ) ;
 int slurmd_msg_engine ( void * args ) ;
-void * request_thread ( void * arg ) ;
-void init ( ) ;
-
-void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) ;
+int send_node_registration_status_msg ( ) ;
 void slurm_rpc_kill_tasks ( slurm_msg_t * msg ) ;
+void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) ;
+int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg ) ;
 
 int main (int argc, char *argv[]) 
 {
 	int error_code ;
-	
-	
 	char node_name[MAX_NAME_LEN];
 	log_options_t opts = LOG_OPTS_STDERR_ONLY ;
-
 	init_time = time (NULL);
 	log_init(argv[0], opts, SYSLOG_FACILITY_DAEMON, NULL);
+
 /*
 	if ( ( error_code = init_slurm_conf () ) ) 
 		fatal ("slurmd: init_slurm_conf error %d", error_code);
@@ -81,17 +77,22 @@ int main (int argc, char *argv[])
 		fatal ("slurmd: error %d from read_slurm_conf reading %s", error_code, SLURM_CONF);
 */
 
+	/* shared memory init */
 	shmem_seg = get_shmem ( ) ;
 	init_shmem ( shmem_seg ) ;
+
 	if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) ) 
 		fatal ("slurmd: errno %d from gethostname", errno);
-	task_mgr_init ( ) ;	
+
+	/* send registration message to slurmctld*/
 	send_node_registration_status_msg ( ) ;
+
 	slurmd_msg_engine ( NULL ) ;
 	return SLURM_SUCCESS ;
 }
 
-
+/* sends a node_registration_status_msg to the slurmctld upon boot
+ * announcing availibility for computationt */
 int send_node_registration_status_msg ( )
 {
 	slurm_msg_t request_msg ;
@@ -107,21 +108,31 @@ int send_node_registration_status_msg ( )
 	return SLURM_SUCCESS ;
 }
 
+/* calls machine dependent system info calls to fill structure
+ * node_reg_msg - structure to fill with system info
+ * returns - return code
+ */
 int fill_in_node_registration_status_msg ( slurm_node_registration_status_msg_t * node_reg_msg )
 {
 	int error_code ;
 	char node_name[MAX_NAME_LEN];
+
+	/* get hostname */
 	if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) )
 		fatal ("slurmd: errno %d from gethostname", errno);
 
+	/* fill in data structure */
 	node_reg_msg -> timestamp = time ( NULL ) ;
 	node_reg_msg -> node_name = xstrdup ( node_name ) ; 
 	get_procs ( & node_reg_msg -> cpus );
 	get_memory ( & node_reg_msg -> real_memory_size ) ;
 	get_tmp_disk ( & node_reg_msg -> temporary_disk_space ) ;
+
 	return SLURM_SUCCESS ;
 }
 
+/* accept thread for incomming slurm messages 
+ * args - do nothing right now */
 int slurmd_msg_engine ( void * args )
 {
 	int error_code ;
@@ -185,6 +196,10 @@ int slurmd_msg_engine ( void * args )
 	return 0 ;
 }
 
+/* worker thread method for accepted message connections
+ * arg - a slurm_msg_t representing the accepted incomming message
+ * returns - nothing, void * because of pthread def
+ */
 void * request_thread ( void * arg )
 {
 	slurm_msg_t * msg = ( slurm_msg_t * ) arg ;
@@ -196,6 +211,9 @@ void * request_thread ( void * arg )
 	return NULL ;
 }
 
+/* multiplexing message handler
+ * msg - incomming request message 
+ */
 void slurmd_req ( slurm_msg_t * msg )
 {
 	
@@ -217,6 +235,11 @@ void slurmd_req ( slurm_msg_t * msg )
 	slurm_free_msg ( msg ) ;
 }
 
+
+/******************************/
+/* rpc methods */
+/******************************/
+
 /* Launches tasks */
 void slurm_rpc_launch_tasks ( slurm_msg_t * msg )
 {
@@ -274,7 +297,6 @@ void slurm_rpc_kill_tasks ( slurm_msg_t * msg )
 	}
 }
 
-/* Reconfigure - re-initialized from configuration files */
 void slurm_rpc_slurmd_example ( slurm_msg_t * msg )
 {
 	/* init */
diff --git a/src/slurmd/task_mgr.c b/src/slurmd/task_mgr.c
index 0bb6efacf92..0b6bd1af0ea 100644
--- a/src/slurmd/task_mgr.c
+++ b/src/slurmd/task_mgr.c
@@ -3,6 +3,8 @@
 #include <sys/wait.h>
 #include <errno.h>
 #include <unistd.h>
+#include <pthread.h>
+#include <src/common/log.h>
 #include <src/common/list.h>
 #include <src/common/xerrno.h>
 #include <src/common/xmalloc.h>
@@ -17,32 +19,50 @@ global variables
 
 /* file descriptor defines */
 
-#define STDIN 0
-#define STDOUT 1 
-#define STDERR 2
-#define READFD 0
-#define WRITEFD 1
-
+#define STDIN_FILENO 0
+#define STDOUT_FILENO 1 
+#define STDERR_FILENO 2
+#define MAX_TASKS_PER_LAUNCH 64
+#define CHILD_IN 0
+#define CHILD_IN_RD 0
+#define CHILD_IN_WR 1
+#define CHILD_OUT 2
+#define CHILD_OUT_RD 2
+#define CHILD_OUT_WR 3
+#define CHILD_ERR 4
+#define CHILD_ERR_RD 4
+#define CHILD_ERR_WR 5
 
 /* prototypes */
 void slurm_free_task ( void * _task ) ;
-int iowatch_launch (  launch_tasks_msg_t * launch_msg ) ;
-int match_job_id_job_step_id ( void * _x, void * _key ) ;
-int append_task_to_list (  launch_tasks_msg_t * launch_msg , int pid ) ;
+void * iowatch_launch_thread ( void * arg ) ;
 int kill_task ( task_t * task ) ;
 int interconnect_init ( launch_tasks_msg_t * launch_msg );
 int fan_out_task_launch ( launch_tasks_msg_t * launch_msg );
+void * task_exec_thread ( void * arg ) ;
+int init_parent_pipes ( int * pipes ) ;
+void setup_parent_pipes ( int * pipes ) ; 
+int setup_child_pipes ( int * pipes ) ;
+int forward_io ( task_start_t * task_arg ) ;
+void * stdout_io_pipe_thread ( void * arg ) ;
+/******************************************************************
+ *task launch method call hierarchy
+ *
+ *launch_tasks()
+ *	interconnect_init()
+ *		fan_out_task_launch() (pthread_create)
+ *			iowatch_launch_thread()
+ *				task_exec_thread() (pthread_create)
+ ******************************************************************/			
 
-void task_mgr_init ( ) 
-{
-}
-
+/* exported module funtion to launch tasks */
 int launch_tasks ( launch_tasks_msg_t * launch_msg )
 {
 	return interconnect_init ( launch_msg );
 }
 
-/* Contains interconnect specific setup instructions and then calls fan_out_task_launch */
+/* Contains interconnect specific setup instructions and then calls 
+ * fan_out_task_launch */
 int interconnect_init ( launch_tasks_msg_t * launch_msg )
 {
 	return fan_out_task_launch ( launch_msg ) ;
@@ -51,137 +71,264 @@ int interconnect_init ( launch_tasks_msg_t * launch_msg )
 int fan_out_task_launch ( launch_tasks_msg_t * launch_msg )
 {
 	int i ;
-	int cpid[64] ;
+	int rc ;
+	task_start_t task_start[MAX_TASKS_PER_LAUNCH] ;
+	/*place on the stack so we don't have to worry about mem clean up should
+	 *this task_launch get brutally killed by a kill_job step message */
+	//task_start = xmalloc ( sizeof ( task_start[MAX_TASKS_PER_LAUNCH] ) );
+
+	/* launch requested number of threads */
 	for ( i = 0 ; i < launch_msg->tasks_to_launch ; i ++ )
 	{
-		cpid[i] = fork ( ) ;
-		if ( cpid[i] == 0 )
-		{
-			break ;
-		}
+		task_start[i].launch_msg = launch_msg ;
+		task_start[i].slurmd_fanout_id = i ;
+		rc = pthread_create ( & task_start[i].pthread_id , NULL , iowatch_launch_thread , ( void * ) & task_start[i] ) ;
 	}
 	
-	/*parent*/
-	if ( i == launch_msg->tasks_to_launch ) 
-	{
-		int waiting = i ;
-		int j ;
-		int pid ;
-                while (waiting > 0) {
-                        pid = waitpid(0, NULL, 0);
-                        if (pid < 0) {
-                                xperror("waitpid");
-                                exit(1);
-                        }
-                        for (j = 0; j < launch_msg->tasks_to_launch ; j++) {
-                                if (cpid[j] == pid)
-                                        waiting--;
-                        }
-                }
-	}
-	/*child*/
-	else
+	/* wait for all the launched threads to finish */
+	for ( i = 0 ; i < launch_msg->tasks_to_launch ; i ++ )
 	{
-		iowatch_launch ( launch_msg ) ;
+		rc = pthread_join( task_start[i].pthread_id , & task_start[i].thread_return ) ;
 	}
+
+	//xfree ( tast_start )
 	return SLURM_SUCCESS ;
 }
 
-int iowatch_launch (  launch_tasks_msg_t * launch_msg )
+
+void * iowatch_launch_thread ( void * arg ) 
 {
-	int rc ;
-	int newstdin[2] ;		
-	int newstdout[2] ;		
-	int newstderr[2] ;		
+	task_start_t * task_start = ( task_start_t * ) arg ;
+	int pipes[6] ;
 	/*
 	int * childin = &newstdin[1] ;
 	int * childout = &newstdout[0] ;
 	int * childerr = &newstderr[0] ;
 	*/
-	task_t * task ;
-	int pid ;
+	/* init arg to be passed to task launch thread */
+	task_start->pipes = pipes ;
 
-	/* open pipes to be used in dup after fork */
-	if( ( rc = pipe ( newstdin ) ) ) 
+	/* create pipes to read child stdin, stdout, sterr */
+	init_parent_pipes ( pipes ) ;
+
+	/* create task thread */
+	pthread_create ( & task_start->exec_pthread_id , NULL , task_exec_thread , ( void * ) arg ) ;
+
+	/* pipe output from child task to srun over sockets */
+	setup_parent_pipes ( pipes ) ;
+	//forward_io ( arg ) ;
+
+	/* wait for thread to die */
+	pthread_join ( task_start->exec_pthread_id , & task_start->exec_thread_return ) ;
+
+	return ( void * ) SLURM_SUCCESS ;
+}
+
+int forward_io ( task_start_t * task_arg ) 
+{
+
+	pthread_create ( & task_arg->io_pthread_id[STDOUT_FILENO] , NULL , stdout_io_pipe_thread ,  task_arg ) ;
+
+	return SLURM_SUCCESS ;
+}
+
+void * stdout_io_pipe_thread ( void * arg )
+{
+	slurm_fd connection_fd ;
+	slurm_addr dest_addr ;
+	task_start_t * io_arg = ( task_start_t * ) arg ;
+	char buffer[SLURMD_IO_MAX_BUFFER_SIZE] ;
+	int bytes_read ;
+	int sock_bytes_written ;
+	
+	/* dest_addr = somethiong from arg */
+
+	if ( ( connection_fd = slurm_open_stream ( & dest_addr ) ) == SLURM_PROTOCOL_ERROR )
 	{
-		return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
+		info ( "error opening socket to srun to pipe stdout" ) ;
+		pthread_exit ( 0 ) ;
 	}
-	if( ( rc = pipe ( newstdout ) ) ) 
+	
+	while ( true )
 	{
-		return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
+		if ( ( bytes_read = read ( io_arg->pipes[CHILD_OUT_RD] , buffer , SLURMD_IO_MAX_BUFFER_SIZE ) ) <= 0 )
+		{
+			info ( "error reading stdout stream for task %i", 1 ) ;
+			pthread_exit ( 0 ) ;
+		}
+		if ( ( sock_bytes_written = slurm_write_stream ( connection_fd , buffer , bytes_read ) ) == SLURM_PROTOCOL_ERROR )
+
+		{
+			info ( "error sending stdout stream for task %i", 1 ) ;
+			pthread_exit ( 0 ) ; 
+		}
 	}
-	if( ( rc = pipe ( newstderr ) ) ) 
+}
+
+/*
+int forward_io ( task_thread_arg_t * task_arg ) 
+{
+	slurm_addr in_out_addr = task_arg->launch_msg->
+		slurm_addr sig_err_addr = task_arg->launch_msg->
+	slurm_fd in_out ; 
+	slurm_fd sig_err ; 
+	int * pipes = task_arg-> pipes ;
+
+	fd_set n_rd_set ;
+	fd_set n_wr_set ;
+	fd_set n_err_set ;
+	slurm_fd_set ns_rd_set ;
+	slurm_fd_set ns_wr_set ;
+	slurm_fd_set ns_err_set ;
+
+	fd_set rd_set ;
+	fd_set wr_set ;
+	fd_set err_set ;
+	slurm_fd_set s_rd_set ;
+	slurm_fd_set s_wr_set ;
+	slurm_fd_set s_err_set ;
+	int s_rc ;
+	int rc ;
+	struct timeval timeout ;
+
+	timeout . tv_sec = 0 ;
+	timeout . tv_sec = SLURM_SELECT_TIMEOUT ;
+
+	in_out = slurm_stream_connect ( & in_out_addr ) ;
+	sig_err = slurm_stream_connect ( & sig_err_addr ) ;
+	
+
+	FD_ZERO ( n_rd_set ) ;
+	FD_ZERO ( n_wd_set ) ;
+	FD_ZERO ( n_err_set ) ;
+
+	slurm_FD_ZERO ( ns_rd_set ) ;
+	slurm_FD_ZERO ( ns_wr_set ) ;
+	slurm_FD_ZERO ( ns_err_set ) ;
+
+	slurm_FD_SET ( in_out, & ns_rd_set ) ;
+	slurm_FD_SET ( in_out, & ns_err_set ) ;
+
+	slurm_FD_SET ( ig_err & ns_rd_set ) ;
+	slurm_FD_SET ( ig_err & ns_err_set ) ;
+
+	FD_SET ( pipes[CHILD_OUT_RD], & n_rd_set ) ;
+	FD_SET ( pipes[CHILD_ERR_RD], & n_rd_set ) ;
+
+	FD_SET ( pipes[CHILD_IN_WR], & n_err_set ) ;
+	FD_SET ( pipes[CHILD_OUT_RD], & n_err_set ) ;
+	FD_SET ( pipes[CHILD_ERR_RD], & n_err_set ) ;
+
+
+	while ( )
 	{
-		return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
+		memcpy ( &rd_set , &n_rd_set , sizeof ( rd_set ) ) ;
+		memcpy ( &wr_set , &n_wr_set , sizeof ( wr_set ) ) ;
+		memcpy ( &err_set , &ns_err_set , sizeof ( err_set ) ) ;
+
+		memcpy ( &s_rd_set , &ns_rd_set , sizeof ( s_rd_set ) ) ;
+		memcpy ( &s_wr_set , &ns_wr_set , sizeof ( s_wr_set ) ) ;
+		memcpy ( &s_err_set , &ns_err_set , sizeof ( s_err_set ) ) ;
+
+		rc = select ( pipes[CHILD_ERR_RD] + 1  , rd_set , wr_set , err_set , & timeout ) ; 
+		if ( rc > 0 )
+
+		timeout . tv_sec = 0 ;
+		timeout . tv_sec = SLURM_SELECT_TIMEOUT ;
+		s_rc = slurm_select ( sig_err_addr + 1 , s_rd_set , s_wr_set , s_err_set , & timeout ) ;
+
+		FD_ZERO ( n_rd_set ) ;
+		FD_ZERO ( n_wd_set ) ;
+		FD_ZERO ( n_err_set ) ;
+
+		slurm_FD_ZERO ( ns_rd_set ) ;
+		slurm_FD_ZERO ( ns_wr_set ) ;
+		slurm_FD_ZERO ( ns_err_set ) ;
+
+		slurm_FD_SET ( in_out, & ns_rd_set ) ;
+		slurm_FD_SET ( ig_err & ns_rd_set ) ;
+
+		slurm_FD_SET ( in_out, & ns_err_set ) ;
+		slurm_FD_SET ( ig_err & ns_err_set ) ;
+
+		FD_SET ( pipes[CHILD_OUT_RD], & n_rd_set ) ;
+		FD_SET ( pipes[CHILD_ERR_RD], & n_rd_set ) ;
+
+		FD_SET ( pipes[CHILD_IN_WR], & n_err_set ) ;
+		FD_SET ( pipes[CHILD_OUT_RD], & n_err_set ) ;
+		FD_SET ( pipes[CHILD_ERR_RD], & n_err_set ) ;
 	}
 
-	switch ( ( pid = fork ( ) ) )
+}
+*/
+void * task_exec_thread ( void * arg )
+{
+	task_start_t * task_arg = ( task_start_t * ) arg ;
+	launch_tasks_msg_t * launch_msg = task_arg -> launch_msg ;
+	int * pipes = task_arg->pipes ;
+	int rc ;
+
+	setup_child_pipes ( pipes ) ;
+	/* setuid and gid*/
+	if ( ( rc = setuid ( launch_msg->uid ) ) == SLURM_ERROR )  ;
+
+	if ( ( rc = setgid ( launch_msg->gid ) ) == SLURM_ERROR ) ;
+
+	/* run bash and cmdline */
+	chdir ( launch_msg->cwd ) ;
+	execl ( "/bin/bash" , "bash" , "-c" , launch_msg->cmd_line );
+	return ( void * ) SLURM_SUCCESS ;
+}
+
+void setup_parent_pipes ( int * pipes )
+{
+	close ( pipes[CHILD_IN_RD] ) ;
+	close ( pipes[CHILD_OUT_WR] ) ;
+	close ( pipes[CHILD_ERR_WR] ) ;
+}
+
+int init_parent_pipes ( int * pipes )
+{
+	int rc ;
+	/* open pipes to be used in dup after fork */
+	if( ( rc = pipe ( & pipes[CHILD_IN] ) ) ) 
 	{
-		/*error*/
-		case -1:
-			return SLURM_ERROR ;
-			break ;
-		/*child*/
-		case 0:
-			/*dup stdin*/
-			close ( STDIN );
-			dup ( newstdin[READFD] ) ;
-			close ( newstdin[READFD] );
-			close ( newstdin[READFD] );
-
-			/*dup stdout*/
-			close ( STDOUT );
-			dup ( newstdout[WRITEFD] ) ;
-			close ( newstdout[READFD] );
-			close ( newstdout[WRITEFD] );
-
-			/*dup stderr*/
-			close ( STDERR );
-			dup ( newstderr[WRITEFD] ) ;
-			close ( newstderr[READFD] );
-			close ( newstderr[READFD] );
-			
-			/* setuid and gid*/
-			if ( ( rc = setuid ( launch_msg->uid ) ) == SLURM_ERROR ) 
-			
-			if ( ( rc = setgid ( launch_msg->gid ) ) == SLURM_ERROR ) ;
-			
-			/* run bash and cmdline */
-			chdir ( launch_msg->cwd ) ;
-			execl ( "/bin/bash" , "bash" , "-c" , launch_msg->cmd_line );
-			return SLURM_SUCCESS ;
-			break ;
-		/*parent*/
-		default:
-			
-			task = xmalloc ( sizeof ( task_t ) ) ;
-			append_task_to_list ( launch_msg , pid ) ;
-			close ( newstdin[READFD] ) ;
-			close ( newstdout[WRITEFD] ) ;
-			close ( newstderr[WRITEFD] ) ;
-			break ;
+		return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
 	}
-
-	if ( pid != 0 ) /*parent*/
+	if( ( rc = pipe ( & pipes[CHILD_OUT] ) ) ) 
 	{
-		/* select and io copy from std streams to sockets */
+		return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
+	}
+	if( ( rc = pipe ( & pipes[CHILD_ERR] ) ) ) 
+	{
+		return ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN ;
 	}
 	return SLURM_SUCCESS ;
 }
 
-
-int append_task_to_list (  launch_tasks_msg_t * launch_msg , int pid )
+int setup_child_pipes ( int * pipes )
 {
-	task_t * task ;
-	task = ( task_t * ) xmalloc ( sizeof ( task_t ) ) ;
-	if ( task == NULL )
-		return ENOMEM ;
-
-	task -> pid = pid;
-	task -> uid = launch_msg -> uid;
-	task -> gid = launch_msg -> gid;
-	
-	return SLURM_SUCCESS ;
+	int error_code = 0 ;
+
+	/*dup stdin*/
+	close ( STDIN_FILENO );
+	if ( SLURM_ERROR == ( error_code = dup ( CHILD_IN_RD ) ) ) info ("dup failed on child standard in pipe");
+	close ( CHILD_IN_RD );
+	close ( CHILD_IN_WR );
+
+	/*dup stdout*/
+	close ( STDOUT_FILENO );
+	if ( SLURM_ERROR == ( error_code = dup ( CHILD_OUT_WR ) ) ) info ("dup failed on child standard out pipe");
+	close ( CHILD_OUT_RD );
+	close ( CHILD_OUT_WR );
+
+	/*dup stderr*/
+	close ( STDERR_FILENO );
+	if ( SLURM_ERROR == ( error_code = dup ( CHILD_ERR_WR ) ) ) info ("dup failed on child standard err pipe");
+	close ( CHILD_ERR_RD );
+	close ( CHILD_ERR_WR );
+
+	return error_code ;
 }
 
 int kill_tasks ( kill_tasks_msg_t * kill_task_msg )
diff --git a/src/slurmd/task_mgr.h b/src/slurmd/task_mgr.h
index fcaf46dcad4..caeadfd5deb 100644
--- a/src/slurmd/task_mgr.h
+++ b/src/slurmd/task_mgr.h
@@ -14,9 +14,31 @@
 #endif  /*  HAVE_CONFIG_H */
 #endif
 
+#define STDIN_IO_THREAD 0
+#define STDOUT_IO_THREAD 1
+#define STDERR_IO_THREAD 2
+#define STDSIG_IO_THREAD 3
+#define HEART_BEAT_IO_THREAD 4
+#define SLURMD_NUMBER_OF_IO_THREADS 5
+#define SLURMD_IO_MAX_BUFFER_SIZE 4096
+
 /* function prototypes */
-void task_mgr_init ( ) ;
 int launch_tasks ( launch_tasks_msg_t * launch_msg ) ;
 int kill_tasks ( kill_tasks_msg_t * kill_task_msg ) ;
 
-
+typedef struct task_start
+{
+	/*task control thread id*/
+	pthread_t		pthread_id;
+	void *			thread_return;
+	/*actual exec thread id*/
+	pthread_t		exec_pthread_id;
+	void *			exec_thread_return;
+	/*io threads ids*/
+	pthread_t		io_pthread_id[SLURMD_NUMBER_OF_IO_THREADS];
+	void *			io_thread_return[SLURMD_NUMBER_OF_IO_THREADS];
+	launch_tasks_msg_t * 	launch_msg;
+	int			slurmd_fanout_id; /* the node specific task number used to compute port to stream std streams to */
+	int *			pipes;
+	
+} task_start_t ;
-- 
GitLab