diff --git a/src/srun/launch.c b/src/srun/launch.c index a3e3c7e9150c8af2e92bf21c5665720e0c027ce3..42111fa40ad1743989e3da55322cf29592d4c111 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -194,7 +194,7 @@ static void p_launch(slurm_msg_t *req_array_ptr, job_t *job) (void *) task_info_ptr) ) { error ("pthread_create error %m"); /* just run it under this thread */ - p_launch_task(task_info_ptr); + p_launch_task((void *) task_info_ptr); } } @@ -213,10 +213,10 @@ static void * p_launch_task(void *args) job_t *job_ptr = task_info_ptr->job_ptr; int host_inx = msg_ptr->srun_node_id; - debug2("launching on host %s", job_ptr->host[host_inx]); + debug3("launching on host %s", job_ptr->host[host_inx]); print_launch_msg(msg_ptr); if (slurm_send_only_node_msg(req_ptr) < 0) { /* Already handles timeout */ - error("%s: %m", job_ptr->host[host_inx]); + error("launch %s: %m", job_ptr->host[host_inx]); job_ptr->host_state[host_inx] = SRUN_HOST_UNREACHABLE; } diff --git a/src/srun/srun.c b/src/srun/srun.c index 34935373947b955964db6725706ef5aef94b0359..de930e989dc5f89ec1982d4582a40ccc4f5cac49 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -71,6 +71,25 @@ typedef resource_allocation_response_msg_t allocation_resp; #define TYPE_TEXT 1 #define TYPE_SCRIPT 2 +/* number of active threads */ +static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; +static int active = 0; + +typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; + +typedef struct thd { + pthread_t thread; /* thread ID */ + pthread_attr_t attr; /* thread attributes */ + state_t state; /* thread state */ +} thd_t; + +typedef struct task_info { + slurm_msg_t *req_ptr; + job_t *job_ptr; + int host_inx; +} task_info_t; + /* * forward declaration of static funcs */ @@ -80,13 +99,15 @@ static void create_job_step(job_t *job); static void sigterm_handler(int signum); static void sig_kill_alloc(int signum); void * sig_thr(void *arg); -void fwd_signal(job_t *job, int signo); static char *build_script (char *pathname, int file_type); static char *get_shell (void); static int is_file_text (char *fname, char** shell_ptr); static int run_batch_job (void); static allocation_resp *existing_allocation(void); static void run_job_script(uint32_t job_id); +static void fwd_signal(job_t *job, int signo); +static void p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job); +static void *p_signal_task(void *args); #if HAVE_LIBELAN3 # include <src/common/qsw.h> @@ -519,38 +540,113 @@ sig_thr(void *arg) pthread_exit(0); } -void +static void fwd_signal(job_t *job, int signo) { int i; - slurm_msg_t req; - slurm_msg_t resp; + slurm_msg_t *req_array_ptr; kill_tasks_msg_t msg; debug("forward signal %d to job", signo); - req.msg_type = REQUEST_KILL_TASKS; - req.data = &msg; - + /* common to all tasks */ msg.job_id = job->jobid; msg.job_step_id = job->stepid; msg.signal = (uint32_t) signo; + req_array_ptr = (slurm_msg_t *) + xmalloc(sizeof(slurm_msg_t) * job->nhosts); for (i = 0; i < job->nhosts; i++) { if (job->host_state[i] != SRUN_HOST_REPLIED) { debug2("%s has not yet replied\n", job->host[i]); continue; } - memcpy(&req.address, &job->slurmd_addr[i], sizeof(slurm_addr)); - debug("sending kill req to %s", job->host[i]); - if (slurm_send_recv_node_msg(&req, &resp) < 0) - error("Unable to send signal to host %s", - job->host[i]); + req_array_ptr[i].msg_type = REQUEST_KILL_TASKS; + req_array_ptr[i].data = &msg; + memcpy(&req_array_ptr[i].address, + &job->slurmd_addr[i], sizeof(slurm_addr)); + } + + p_fwd_signal(req_array_ptr, job); + + debug("All tasks have been signalled"); + xfree(req_array_ptr); +} + +/* p_fwd_signal - parallel (multi-threaded) task signaller */ +static void p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) +{ + int i; + task_info_t *task_info_ptr; + thd_t *thread_ptr; + + if (opt.max_threads > job->nhosts) /* don't need more threads than tasks */ + opt.max_threads = job->nhosts; + + thread_ptr = xmalloc (job->nhosts * sizeof (thd_t)); + for (i = 0; i < job->nhosts; i++) { + if (req_array_ptr[i].msg_type == 0) + continue; /* inactive task */ + + pthread_mutex_lock(&active_mutex); + while (active >= opt.max_threads) { + pthread_cond_wait(&active_cond, &active_mutex); + } + active++; + pthread_mutex_unlock(&active_mutex); + + task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t)); + task_info_ptr->req_ptr = &req_array_ptr[i]; + task_info_ptr->job_ptr = job; + task_info_ptr->host_inx = i; + + if (pthread_attr_init (&thread_ptr[i].attr)) + fatal ("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate error %m"); +#ifdef PTHREAD_SCOPE_SYSTEM + if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); +#endif + while ( pthread_create (&thread_ptr[i].thread, + &thread_ptr[i].attr, + p_signal_task, + (void *) task_info_ptr) ) { + error ("pthread_create error %m"); + /* just run it under this thread */ + p_signal_task((void *) task_info_ptr); + } } + while (active > 0) { + pthread_cond_wait(&active_cond, &active_mutex); + } + xfree(thread_ptr); } +/* p_signal_task - parallelized signal of a specific task */ +static void * p_signal_task(void *args) +{ + task_info_t *task_info_ptr = (task_info_t *)args; + slurm_msg_t *req_ptr = task_info_ptr->req_ptr; + job_t *job_ptr = task_info_ptr->job_ptr; + int host_inx = task_info_ptr->host_inx; + slurm_msg_t resp; + + debug3("sending signal to host %s", job_ptr->host[host_inx]); + if (slurm_send_recv_node_msg(req_ptr, &resp) < 0) /* Has has timeout */ + error("signal %s: %m", job_ptr->host[host_inx]); + else if (resp.msg_type == RESPONSE_SLURM_RC) + slurm_free_return_code_msg(resp.data); + + pthread_mutex_lock(&active_mutex); + active--; + pthread_cond_signal(&active_cond); + pthread_mutex_unlock(&active_mutex); + xfree(args); + return NULL; +} /* submit a batch job and return error code */ static int