From 1a8eb8080c8a08355d5abec1411a1fcfcdf9cc5a Mon Sep 17 00:00:00 2001 From: tewk <tewk@unknown> Date: Wed, 3 Jul 2002 22:37:35 +0000 Subject: [PATCH] More revisions to slurmd removed gid from messages this will come from getpwuid call added a needed include to scancel to compile removed old test from job_mgr_test that wouldn't compile due to changes in the api --- src/common/slurm_protocol_defs.h | 2 +- src/common/slurm_protocol_pack.c | 4 +- src/scancel/scancel.c | 1 + src/slurmd/shmem_struct.c | 46 ++++++++-- src/slurmd/shmem_struct.h | 45 ++++++---- src/slurmd/task_mgr.c | 89 +++++++++++++------ src/slurmd/task_mgr.h | 5 +- testsuite/slurm_unit/slurmctld/job_mgr-test.c | 4 +- testsuite/slurm_unit/slurmd/srun_emu.pl | 4 +- testsuite/slurm_unit/slurmd/task_mgr-test.c | 1 - 10 files changed, 141 insertions(+), 60 deletions(-) diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 87271b1d6ba..37712c69720 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -220,7 +220,6 @@ typedef struct launch_tasks_msg uint32_t job_id ; uint32_t job_step_id ; uint32_t uid ; - uint32_t gid ; char * credentials ; uint32_t tasks_to_launch ; uint16_t envc ; @@ -235,6 +234,7 @@ typedef struct kill_tasks_msg { uint32_t job_id ; uint32_t job_step_id ; + uint32_t signal ; } kill_tasks_msg_t ; diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index d1bc56eb85a..e2e5f4a000f 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -902,7 +902,6 @@ void pack_launch_tasks_msg ( launch_tasks_msg_t * msg , void ** buffer , uint32_ pack32 ( msg -> job_id , buffer , length ) ; pack32 ( msg -> job_step_id , buffer , length ) ; pack32 ( msg -> uid , buffer , length ) ; - pack32 ( msg -> gid , buffer , length ) ; packstr ( msg -> credentials , buffer , length ) ; pack32 ( msg -> tasks_to_launch , buffer , length ) ; packstring_array ( msg -> env , msg -> envc , buffer , length ) ; @@ -927,7 +926,6 @@ int unpack_launch_tasks_msg ( launch_tasks_msg_t ** msg_ptr , void ** buffer , u unpack32 ( & msg -> job_id , buffer , length ) ; unpack32 ( & msg -> job_step_id , buffer , length ) ; unpack32 ( & msg -> uid , buffer , length ) ; - unpack32 ( & msg -> gid , buffer , length ) ; unpackstr_xmalloc ( & msg -> credentials , & uint16_tmp , buffer , length ) ; unpack32 ( & msg -> tasks_to_launch , buffer , length ) ; unpackstring_array ( & msg -> env , & msg -> envc , buffer , length ) ; @@ -943,6 +941,7 @@ void pack_cancel_tasks_msg ( kill_tasks_msg_t * msg , void ** buffer , uint32_t { pack32 ( msg -> job_id , buffer , length ) ; pack32 ( msg -> job_step_id , buffer , length ) ; + pack32 ( msg -> signal , buffer , length ) ; } int unpack_cancel_tasks_msg ( kill_tasks_msg_t ** msg_ptr , void ** buffer , uint32_t * length ) @@ -958,6 +957,7 @@ int unpack_cancel_tasks_msg ( kill_tasks_msg_t ** msg_ptr , void ** buffer , uin unpack32 ( & msg -> job_id , buffer , length ) ; unpack32 ( & msg -> job_step_id , buffer , length ) ; + unpack32 ( & msg -> signal , buffer , length ) ; *msg_ptr = msg ; return 0 ; } diff --git a/src/scancel/scancel.c b/src/scancel/scancel.c index 285d44641ac..4ece6d07685 100644 --- a/src/scancel/scancel.c +++ b/src/scancel/scancel.c @@ -28,6 +28,7 @@ # include <config.h> #endif +#include <unistd.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> diff --git a/src/slurmd/shmem_struct.c b/src/slurmd/shmem_struct.c index 0b852bee5cf..d01470a411a 100644 --- a/src/slurmd/shmem_struct.c +++ b/src/slurmd/shmem_struct.c @@ -1,5 +1,6 @@ #include <stdlib.h> #include <assert.h> +#include <pthread.h> #include <sys/types.h> #include <sys/shm.h> #include <string.h> @@ -12,9 +13,9 @@ #define OCTAL_RW_PERMISSIONS 0666 /* function prototypes */ -void clear_task ( task_t * task ); -void clear_job_step( job_step_t * job_step ); -int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task ) ; +static void clear_task ( task_t * task ); +static void clear_job_step( job_step_t * job_step ); +static int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task ) ; /* gets a pointer to the slurmd shared memory segment * if it doesn't exist, one is created @@ -61,6 +62,7 @@ void init_shmem ( slurmd_shmem_t * shmem ) { clear_job_step ( & shmem->job_steps[i] ) ; } + pthread_mutex_init ( & shmem -> mutex , NULL ) ; } /* runs through the job_step array looking for a unused job_step. @@ -74,6 +76,7 @@ void init_shmem ( slurmd_shmem_t * shmem ) void * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step_id ) { int i ; + pthread_mutex_lock ( & shmem -> mutex ) ; for ( i=0 ; i < MAX_JOB_STEPS ; i ++ ) { if (shmem -> job_steps[i].used == false ) @@ -82,10 +85,12 @@ void * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step_id ) shmem -> job_steps[i].used = true ; shmem -> job_steps[i].job_id=job_id; shmem -> job_steps[i].job_step_id=job_step_id; + pthread_mutex_unlock ( & shmem -> mutex ) ; return & shmem -> job_steps[i] ; } } - fatal ( "No available job_step slots in shmem segment"); + pthread_mutex_unlock ( & shmem -> mutex ) ; + fatal ( "No available job_step slots in shmem segment"); return (void * ) SLURM_ERROR ; } @@ -100,6 +105,7 @@ void * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step_id ) void * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step ) { int i ; + pthread_mutex_lock ( & shmem -> mutex ) ; for ( i=0 ; i < MAX_TASKS ; i ++ ) { if (shmem -> tasks[i].used == false ) @@ -107,10 +113,12 @@ void * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step ) clear_task ( & shmem -> tasks[i] ) ; shmem -> tasks[i].used = true ; prepend_task ( shmem , job_step , & shmem -> tasks[i] ) ; + pthread_mutex_unlock ( & shmem -> mutex ) ; return & shmem -> tasks[i] ; } - } - fatal ( "No available task slots in shmem segment"); + } + pthread_mutex_unlock ( & shmem -> mutex ) ; + fatal ( "No available task slots in shmem segment"); return (void * ) SLURM_ERROR ; } @@ -124,7 +132,7 @@ void * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step ) * task - task to be prepended */ -int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task ) +static int prepend_task ( slurmd_shmem_t * shmem , job_step_t * job_step , task_t * task ) { /* prepend operation*/ /* newtask next pointer gets head of the jobstep task list */ @@ -152,7 +160,7 @@ int deallocate_job_step ( job_step_t * jobstep ) } /* clears a job_step array memeber for future use */ -void clear_task ( task_t * task ) +static void clear_task ( task_t * task ) { task -> used = false ; task -> job_step = NULL ; @@ -160,8 +168,28 @@ void clear_task ( task_t * task ) } /* clears a job_step array memeber for future use */ -void clear_job_step( job_step_t * job_step ) +static void clear_job_step( job_step_t * job_step ) { job_step -> used = false ; job_step -> head_task = NULL ; } + +/* api call for DPCS to return a job_id given a session_id */ +int find_job_id_for_session ( slurmd_shmem_t * shmem , int session_id ) +{ + int i ; + pthread_mutex_lock ( & shmem -> mutex ) ; + for ( i=0 ; i < MAX_JOB_STEPS ; i ++ ) + { + if (shmem -> job_steps[i].used == true ) + { + if (shmem -> job_steps[i].session_id == session_id ) + + pthread_mutex_unlock ( & shmem -> mutex ) ; + return shmem -> job_steps[i].job_id ; + } + } + pthread_mutex_unlock ( & shmem -> mutex ) ; + info ( "No job_id found for session_id %i", session_id ); + return SLURM_FAILURE ; +} diff --git a/src/slurmd/shmem_struct.h b/src/slurmd/shmem_struct.h index e52364dd75e..c3d1be92b49 100644 --- a/src/slurmd/shmem_struct.h +++ b/src/slurmd/shmem_struct.h @@ -8,37 +8,50 @@ typedef struct job_step job_step_t ; typedef struct task task_t ; +/* represents a task running on a node */ struct task { - uint32_t task_id; - uint32_t uid; - uint32_t gid; - task_start_t task_start; - char used; - job_step_t * job_step; - task_t * next; + uint32_t task_id; /* srun assigned globally unique taskid */ + task_start_t task_start; /* task_start_message see task_mgr.h */ + char used; /* boolean type that is marked when this record is used */ + job_step_t * job_step; /* reverse pointer to the controlling job_step */ + task_t * next; /* next task pointer in the job_step */ } ; - +/* represents a job_step consisting of a list of tasks */ struct job_step { - uint32_t job_id; - uint32_t job_step_id; - char used; - task_t * head_task; + uint32_t job_id; /* slurmctrld assigned jobid */ + uint32_t job_step_id; /* slurmctrld assigned job_step id */ + uint32_t session_id; + char used; /* boolean type that is marked when this record is used */ + task_t * head_task; /* fist task in the job_step */ } ; +/* shared memory structure. This structure is overlayed on top of the allocated shared ram */ typedef struct slurmd_shmem { - pthread_mutex_t mutex; - task_t tasks[MAX_TASKS]; - job_step_t job_steps[MAX_JOB_STEPS]; + pthread_mutex_t mutex; /* mutex to protect shared ram */ + task_t tasks[MAX_TASKS]; /* array of task objects */ + job_step_t job_steps[MAX_JOB_STEPS]; /* array of job_step objects */ } slurmd_shmem_t ; +/* gets shared memory segment, allocating it if needed */ void * get_shmem ( ); + +/* should only be called once after allocation of shared ram + * Marks all task and job_step objects as unused */ void init_shmem ( slurmd_shmem_t * shmem ); + +/* detaches from shared ram and deallocates shared ram if no other + * attachments exist */ int rel_shmem ( void * shmem_addr ); +/* allocates job step from shared memory array */ void * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step_id ) ; +/* allocates task from shared memory array */ void * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step ) ; - +/* api call for DPCS to return a job_id given a session_id */ +int find_job_id_for_session ( slurmd_shmem_t * shmem , int session_id ) ; +/* clears a job_step and associated task list for future use */ +int deallocate_job_step ( job_step_t * jobstep ) ; #endif diff --git a/src/slurmd/task_mgr.c b/src/slurmd/task_mgr.c index 505ed3c22fc..37c326e9526 100644 --- a/src/slurmd/task_mgr.c +++ b/src/slurmd/task_mgr.c @@ -1,5 +1,7 @@ #include <stdlib.h> #include <sys/types.h> +#include <pwd.h> +#include <grp.h> #include <sys/wait.h> #include <errno.h> #include <unistd.h> @@ -37,7 +39,6 @@ global variables /* prototypes */ void slurm_free_task ( void * _task ) ; -void * iowatch_launch_thread ( void * arg ) ; int kill_task ( task_t * task ) ; int interconnect_init ( launch_tasks_msg_t * launch_msg ); int fan_out_task_launch ( launch_tasks_msg_t * launch_msg ); @@ -56,8 +57,8 @@ int setup_task_env (task_start_t * task_start ) ; *launch_tasks() * interconnect_init() * fan_out_task_launch() (pthread_create) - * iowatch_launch_thread() (pthread_create) - * task_exec_thread() (pthread_create) + * task_exec_thread() (fork) for task exec + * task_exec_thread() (pthread_create) for io piping ******************************************************************/ /* exported module funtion to launch tasks */ @@ -78,6 +79,7 @@ int fan_out_task_launch ( launch_tasks_msg_t * launch_msg ) { int i ; int rc ; + int session_id ; /* shmem work - see slurmd.c shmem_seg this is probably not needed*/ slurmd_shmem_t * shmem_ptr = get_shmem ( ) ; @@ -92,15 +94,26 @@ int fan_out_task_launch ( launch_tasks_msg_t * launch_msg ) * launched*/ task_start_t * task_start[launch_msg->tasks_to_launch]; + if ( ( session_id = setsid () ) == SLURM_ERROR ) + { + info ( "set sid failed" ); + } + curr_job_step -> session_id = session_id ; + /* launch requested number of threads */ for ( i = 0 ; i < launch_msg->tasks_to_launch ; i ++ ) { curr_task = alloc_task ( shmem_ptr , curr_job_step ); task_start[i] = & curr_task -> task_start ; + + /* fill in task_start struct */ task_start[i] -> launch_msg = launch_msg ; + task_start[i] -> local_task_id = i ; + task_start[i] -> inout_dest = *( launch_msg -> streams + ( i * 2 ) ) ; + task_start[i] -> err_dest = *( launch_msg -> streams + ( i * 2 ) + 1 ) ; - if ( pthread_create ( & task_start[i]->pthread_id , NULL , iowatch_launch_thread , ( void * ) task_start[i] ) ) + if ( pthread_create ( & task_start[i]->pthread_id , NULL , task_exec_thread , ( void * ) task_start[i] ) ) goto kill_threads; } @@ -123,31 +136,21 @@ int fan_out_task_launch ( launch_tasks_msg_t * launch_msg ) return SLURM_SUCCESS ; } -void * iowatch_launch_thread ( void * arg ) -{ - task_start_t * task_start = ( task_start_t * ) arg ; - - /* create pipes to read child stdin, stdout, sterr */ - init_parent_pipes ( task_start->pipes ) ; - return task_exec_thread ( arg ) ; -} - int forward_io ( task_start_t * task_arg ) { pthread_attr_t pthread_attr ; - slurm_addr * dest_out_addr = task_arg -> launch_msg -> streams ; - slurm_addr * dest_err_addr = task_arg -> launch_msg -> streams + 1 ; int local_errno; - +#define STDIN_OUT_SOCK 0 +#define SIG_STDERR_SOCK 0 /* open stdout & stderr sockets */ - if ( ( task_arg->sockets[0] = slurm_open_stream ( dest_out_addr ) ) == SLURM_PROTOCOL_ERROR ) + if ( ( task_arg->sockets[STDIN_OUT_SOCK] = slurm_open_stream ( & ( task_arg -> inout_dest ) ) ) == SLURM_PROTOCOL_ERROR ) { local_errno = errno ; info ( "error opening socket to srun to pipe stdout errno %i" , local_errno ) ; pthread_exit ( 0 ) ; } - if ( ( task_arg->sockets[1] = slurm_open_stream ( dest_err_addr ) ) == SLURM_PROTOCOL_ERROR ) + if ( ( task_arg->sockets[SIG_STDERR_SOCK] = slurm_open_stream ( &( task_arg -> err_dest ) ) ) == SLURM_PROTOCOL_ERROR ) { local_errno = errno ; info ( "error opening socket to srun to pipe stdout errno %i" , local_errno ) ; @@ -217,6 +220,7 @@ void * stdout_io_pipe_thread ( void * arg ) info ( "error reading stdout stream for task %i , errno %i", 1 , local_errno ) ; pthread_exit ( NULL ) ; } + write ( 1 , buffer , bytes_read ) ; if ( ( sock_bytes_written = slurm_write_stream ( io_arg->sockets[0] , buffer , bytes_read ) ) == SLURM_PROTOCOL_ERROR ) { local_errno = errno ; @@ -259,7 +263,14 @@ void * task_exec_thread ( void * arg ) int * pipes = task_start->pipes ; int rc ; int cpid ; + struct passwd * pwd ; + struct sigaction newaction ; + struct sigaction oldaction ; + newaction . sa_handler = SIG_IGN ; + + /* create pipes to read child stdin, stdout, sterr */ + init_parent_pipes ( task_start->pipes ) ; #define FORK_ERROR -1 #define CHILD_PROCCESS 0 @@ -267,19 +278,43 @@ void * task_exec_thread ( void * arg ) { case FORK_ERROR: break ; - case CHILD_PROCCESS: - signal(SIGTTOU, SIG_IGN); // ignore tty output - signal(SIGTTIN, SIG_IGN); // ignore tty input - signal(SIGTSTP, SIG_IGN); // ignore user + case CHILD_PROCCESS: + sigaction(SIGTTOU, &newaction, &oldaction); /* ignore tty output */ + sigaction(SIGTTIN, &newaction, &oldaction); /* ignore tty input */ + sigaction(SIGTSTP, &newaction, &oldaction); /* ignore user */ + /* setup std stream pipes */ setup_child_pipes ( pipes ) ; - rc ++ ; + /* get passwd file info */ + if ( ( pwd = getpwuid ( launch_msg->uid ) ) == NULL ) + { + info ( "user id not found in passwd file" ) ; + _exit ( SLURM_FAILURE ) ; + } + /* setuid and gid*/ - //if ( ( rc = setuid ( launch_msg->uid ) ) == SLURM_ERROR ) ; - - //if ( ( rc = setgid ( launch_msg->gid ) ) == SLURM_ERROR ) ; + if ( ( rc = setuid ( launch_msg->uid ) ) == SLURM_ERROR ) + { + info ( "set user id failed " ) ; + _exit ( SLURM_FAILURE ) ; + } + + if ( ( rc = setgid ( pwd -> pw_gid ) ) == SLURM_ERROR ) + { + info ( "set group id failed " ) ; + _exit ( SLURM_FAILURE ) ; + } + + /* initgroups */ + if ( ( rc = initgroups ( pwd ->pw_name , pwd -> pw_gid ) ) == SLURM_ERROR ) + { + info ( "init groups failed " ) ; + _exit ( SLURM_FAILURE ) ; + } + + /* set session id */ /* setup requested env */ //setup_task_env ( task_arg ) ; @@ -288,7 +323,9 @@ void * task_exec_thread ( void * arg ) chdir ( launch_msg->cwd ) ; execl ( "/bin/bash" , "bash" , "-c" , launch_msg->cmd_line ); _exit ( SLURM_SUCCESS ) ; + default: /*parent proccess */ + task_start->exec_pid = cpid ; setup_parent_pipes ( task_start->pipes ) ; forward_io ( arg ) ; waitpid ( cpid , NULL , 0 ) ; diff --git a/src/slurmd/task_mgr.h b/src/slurmd/task_mgr.h index aa55b597355..373689cc640 100644 --- a/src/slurmd/task_mgr.h +++ b/src/slurmd/task_mgr.h @@ -31,7 +31,7 @@ typedef struct task_start pthread_t pthread_id; int thread_return; /*actual exec thread id*/ - pthread_t exec_pthread_id; + int exec_pid; int exec_thread_return; /*io threads ids*/ pthread_t io_pthread_id[SLURMD_NUMBER_OF_IO_THREADS]; @@ -40,5 +40,8 @@ typedef struct task_start int pipes[6]; int sockets[2]; int local_task_id; + char addr_update; + slurm_addr inout_dest; + slurm_addr err_dest; } task_start_t ; #endif diff --git a/testsuite/slurm_unit/slurmctld/job_mgr-test.c b/testsuite/slurm_unit/slurmctld/job_mgr-test.c index 27d9cf95fdd..5c07694cece 100644 --- a/testsuite/slurm_unit/slurmctld/job_mgr-test.c +++ b/testsuite/slurm_unit/slurmctld/job_mgr-test.c @@ -14,7 +14,7 @@ main (int argc, char *argv[]) log_options_t opts = LOG_OPTS_STDERR_ONLY; char *dump; uint16_t tmp_id; - char update_spec[] = "TimeLimit=1234 Priority=123"; + //char update_spec[] = "TimeLimit=1234 Priority=123"; note("This is BullShit"); @@ -59,7 +59,7 @@ main (int argc, char *argv[]) set_job_prio (job_rec); } - error_code = update_job (tmp_id, update_spec); + //error_code = update_job (tmp_id, update_spec); if (error_code) { fail ("update_job"); } diff --git a/testsuite/slurm_unit/slurmd/srun_emu.pl b/testsuite/slurm_unit/slurmd/srun_emu.pl index 393fcc7d3e7..1dd5e57ec27 100755 --- a/testsuite/slurm_unit/slurmd/srun_emu.pl +++ b/testsuite/slurm_unit/slurmd/srun_emu.pl @@ -13,7 +13,7 @@ if ($pid) Reuse => 1, ); my $new_sock1 = $sock1->accept(); - while(defined(<$new_sock1>)) + while(<$new_sock1>) { print $_; } @@ -30,7 +30,7 @@ else Reuse => 1, ); my $new_sock2 = $sock2->accept(); - while(defined(<$new_sock2>)) + while(<$new_sock2>) { print $_; } diff --git a/testsuite/slurm_unit/slurmd/task_mgr-test.c b/testsuite/slurm_unit/slurmd/task_mgr-test.c index e960f071bd8..6201e0c6d54 100644 --- a/testsuite/slurm_unit/slurmd/task_mgr-test.c +++ b/testsuite/slurm_unit/slurmd/task_mgr-test.c @@ -16,7 +16,6 @@ int main ( int argc , char ** argv ) launch_tasks_msg . job_id = 1000 ; launch_tasks_msg . job_step_id = 2000 ; launch_tasks_msg . uid = 801 ; - launch_tasks_msg . gid = 802 ; launch_tasks_msg . credentials = NULL ; launch_tasks_msg . tasks_to_launch = 1 ; launch_tasks_msg . envc = 0 ; -- GitLab