diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index f9f84a21989ad2067c71b48b8d4b57518604540d..f74d54337c201a42f85d8249ba195d24535170f6 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -458,6 +458,8 @@ _run_batch_job(slurmd_job_t *job) pid_t sid, pid; struct passwd *spwd = getpwuid(getuid()); + _block_most_signals(); + if ((rc = io_spawn_handler(job)) < 0) { return ESLURMD_IO_ERROR; } @@ -510,10 +512,10 @@ _run_batch_job(slurmd_job_t *job) } while ((pid = waitpid(0, &status, 0)) < 0 && (pid != t.pid)) { - if ((pid < 0) && (errno == EINTR)) { + if (errno == EINTR) { _handle_attach_req(job); continue; - } else if (pid < 0) + } else error("waitpid: %m"); } @@ -541,8 +543,10 @@ _wait_for_all_tasks(slurmd_job_t *job) int status; pid_t pid = waitpid(0, &status, 0); if ((pid < (pid_t) 0)) { - if (errno == EINTR) + if (errno == EINTR) { _handle_attach_req(job); + continue; + } error("waitpid: %m"); /* job_cleanup() */ } diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 6d3d5184fbecd0490debc58d219e16b86f1e76ea..d2bae00a9e5527a89b9f6117baaf4c31b974cc5b 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -271,21 +271,27 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) if (!(step = shm_get_step(req->job_id, req->job_step_id))) { debug("kill for nonexistent job %d.%d requested", req->job_id, req->job_step_id); - rc = EEXIST; + rc = ESLURM_INVALID_JOB_ID; goto done; } req_uid = slurm_auth_uid(msg->cred); if ((req_uid != step->uid) && (req_uid != 0)) { debug("kill req from uid %ld for job %d.%d owned by uid %ld", - req_uid, step->jobid, step->stepid, step->uid); + req_uid, req->job_id, req->job_step_id, step->uid); rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ goto done; } + verbose("Successful request to send signal %d to %d", + req->signal, req->job_id, req->job_step_id); + + if (killpg(step->sid, req->signal) < 0) + rc = errno; shm_free_step(step); - rc = shm_signal_step(req->job_id, req->job_step_id, req->signal); + + /* rc = shm_signal_step(req->job_id, req->job_step_id, req->signal); */ done: slurm_send_rc_msg(msg, rc); diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index 6e46648e094d78407cceb1e7a4406790bdd52abe..dda1eb6e69d1c40f92307cd838eeba5b02872f40 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -452,6 +452,7 @@ shm_get_step(uint32_t jobid, uint32_t stepid) if ((i = _shm_find_step(jobid, stepid)) >= 0) s = _shm_copy_step(&slurmd_shm->step[i]); _shm_unlock(); + xassert(!s || ((s->stepid == stepid) && (s->jobid == jobid))); return s; } diff --git a/src/srun/Makefile.am b/src/srun/Makefile.am index 1aa928c589e672456163a06c6e1ae99429fdae96..2fc933f95f3c131f026a773027f4fe71323ca16e 100644 --- a/src/srun/Makefile.am +++ b/src/srun/Makefile.am @@ -12,7 +12,9 @@ bin_PROGRAMS = srun srun_SOURCES = srun.c opt.c env.c opt.h env.h job.c job.h net.c net.h \ msg.c msg.h io.c io.h launch.h launch.c attach.h \ - reattach.c reattach.h fname.h fname.c srun.wrapper.c + reattach.c reattach.h fname.h fname.c srun.wrapper.c \ + signals.c + srun_LDADD = $(top_builddir)/src/common/libcommon.la \ $(top_builddir)/src/api/libslurm.la \ $(POPT_LIBS) diff --git a/src/srun/job.c b/src/srun/job.c index 9a4861c964b42d7bff0eabd0878ce65df5644262..cd4f0e2d87253cdb743e850b2ce0f8b95d590d79 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -201,6 +201,9 @@ _job_fake_cred(job_t *job) job->cred->user_id = opt.uid; job->cred->expiration_time = 0x7fffffff; read(fd, job->cred->signature, SLURM_SSL_SIGNATURE_LENGTH); + + if (close(fd) < 0) + error ("close(/dev/random): %m"); } job_t * @@ -263,8 +266,19 @@ update_job_state(job_t *job, job_state_t state) void job_force_termination(job_t *job) { - info ("forcing job termination"); - update_job_state(job, SRUN_JOB_OVERDONE); + if (mode == MODE_ATTACH) { + info ("forcing detach"); + update_job_state(job, SRUN_JOB_DETACHED); + } else { + info ("forcing job termination"); + update_job_state(job, SRUN_JOB_OVERDONE); + } + pthread_kill(job->ioid, SIGTERM); } + +void +job_destroy(job_t *job, const char *msg) +{ +} diff --git a/src/srun/job.h b/src/srun/job.h index 6b37374146949b52ab53ad2b18d0299d31406b8b..12a653ba356f3c11e5f88507347c3c9d28db63a1 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -23,7 +23,8 @@ typedef enum { SRUN_JOB_RUNNING, SRUN_JOB_FAILED, SRUN_JOB_TERMINATING, - SRUN_JOB_OVERDONE + SRUN_JOB_OVERDONE, + SRUN_JOB_DETACHED } job_state_t; typedef enum { diff --git a/src/srun/launch.c b/src/srun/launch.c index d009fbddb877b82dacc5a1a859a58f129bac9c88..5456decaa90ad9360b1226934a29d03169e22a42 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -97,6 +97,21 @@ _dist_cyclic(job_t *job) } } +int +launch_thr_create(job_t *job) +{ + int e; + pthread_attr_t attr; + + pthread_attr_init(&attr); + if ((e = pthread_create(&job->lid, &attr, &launch, (void *) job))) + slurm_seterrno_ret(e); + + debug("Started launch thread (%d)", job->lid); + + return SLURM_SUCCESS; +} + void * launch(void *arg) { diff --git a/src/srun/launch.h b/src/srun/launch.h index b6609d3e6b1f153a6b1d455cede1d171d1280c08..8acf6ed4a2fd7eb239824a6a93cc570b2e18cefe 100644 --- a/src/srun/launch.h +++ b/src/srun/launch.h @@ -1,4 +1,28 @@ -/* */ +/*****************************************************************************\ + * src/srun/launch.h - header for srun launch thread + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ #ifndef _HAVE_LAUNCH_H #define _HAVE_LAUNCH_H @@ -15,6 +39,7 @@ #include "src/common/slurm_protocol_api.h" #include "src/srun/opt.h" +#include "src/srun/job.h" typedef struct launch_thr { pthread_t thread; @@ -25,7 +50,7 @@ typedef struct launch_thr { int i; /* temporary index into array */ } launch_thr_t; - +int launch_thr_create(job_t *job); void * launch(void *arg); #endif /* !_HAVE_LAUNCH_H */ diff --git a/src/srun/msg.c b/src/srun/msg.c index 35d13f59ae97e68fbee4ea33c0785c7c0b8af5e8..1f938f05693de3577ae54695c3c5ab8e331c706b 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -314,8 +314,12 @@ _reattach_handler(job_t *job, slurm_msg_t *msg) } } + update_running_tasks(job, resp->srun_node_id); + + /* if (job->stepid == NO_VAL) update_job_state(job, SRUN_JOB_OVERDONE); + */ } diff --git a/src/srun/opt.c b/src/srun/opt.c index 1cf1a38fc0bda04397138b7e1a0cb779f47e30a1..14f0b3d20d033b0a43a60f96d504d6bc56df1e73 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -122,6 +122,9 @@ struct poptOption attachTable[] = { {"attach", 'a', POPT_ARG_STRING, &opt.attach, OPT_ATTACH, "attach to running job with job id = id", "id"}, + {"join", 'j', POPT_ARG_NONE, &opt.join, OPT_JOIN, + "When used with --attach, allow forwarding of signals and stdin", + }, POPT_TABLEEND }; diff --git a/src/srun/reattach.c b/src/srun/reattach.c index 3927d3f29ca67d2f166783b6b9a47322722384c2..720ab6d1e0a1bb8121aa5f60089892766711630b 100644 --- a/src/srun/reattach.c +++ b/src/srun/reattach.c @@ -48,6 +48,7 @@ #include "src/srun/opt.h" #include "src/srun/io.h" #include "src/srun/msg.h" +#include "src/srun/signals.h" /* number of active threads */ @@ -293,6 +294,8 @@ _attach_to_job(job_t *job) req = xmalloc(job->nhosts * sizeof(*req)); msg = xmalloc(job->nhosts * sizeof(*msg)); + + debug("Going to attach to job %d.%d", job->jobid, job->stepid); for (i = 0; i < job->nhosts; i++) { @@ -418,28 +421,14 @@ p_reattach_task(void *arg) } -/* -static sig_atomic_t interrupt = 0; - -static void -_int_handler(int signum) -{ - interrupt = 1; -} -*/ - -static void -_term_handler(int sig) -{ - pthread_exit(0); -} - static void _complete_job(job_t *job) { - if (job->stepid != NO_VAL) - slurm_complete_job_step(job->jobid, job->stepid, 0, 0); - slurm_complete_job(job->jobid, 0, 0); + if (opt.join) { + if (job->stepid != NO_VAL) + slurm_complete_job_step(job->jobid, job->stepid, 0, 0); + slurm_complete_job(job->jobid, 0, 0); + } } int reattach() @@ -459,7 +448,8 @@ int reattach() _get_attach_info(s); - opt.ifname = "none"; + if (!opt.join) + opt.ifname = "none"; if ((opt.nodelist = s->nodes) == NULL) exit(1); @@ -473,6 +463,9 @@ int reattach() job->jobid = s->jobid; job->stepid = s->stepid; + if (opt.join) + sig_setup_sigmask(); + if (msg_thr_create(job) < 0) { error("Unable to create msg thread: %m"); exit(1); @@ -483,13 +476,16 @@ int reattach() exit(1); } - _attach_to_job(job); + if (opt.join && sig_thr_create(job) < 0) { + error("Unable to create signals thread: %m"); + } - xsignal(SIGTERM, &_term_handler); + _attach_to_job(job); slurm_mutex_lock(&job->state_mutex); while ( (job->state != SRUN_JOB_OVERDONE) - && (job->state != SRUN_JOB_FAILED )) + && (job->state != SRUN_JOB_FAILED ) + && (job->state != SRUN_JOB_DETACHED) ) pthread_cond_wait(&job->state_cond, &job->state_mutex); slurm_mutex_unlock(&job->state_mutex); @@ -500,7 +496,7 @@ int reattach() pthread_kill(job->jtid, SIGTERM); pthread_join(job->ioid, NULL); - _complete_job(job); + /* _complete_job(job); */ exit(0); } diff --git a/src/srun/signals.c b/src/srun/signals.c new file mode 100644 index 0000000000000000000000000000000000000000..a33d8b7f0a9e86d18a14a9d7024f0bc49862e153 --- /dev/null +++ b/src/srun/signals.c @@ -0,0 +1,325 @@ +/*****************************************************************************\ + * src/srun/signals.c - signal handling for srun + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>, and + * Moe Jette <jette@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if HAVE_PTHREAD +#include <pthread.h> +#endif + +#include <signal.h> +#include <string.h> + +#include "src/common/log.h" +#include "src/common/xmalloc.h" +#include "src/common/xsignal.h" +#include "src/common/slurm_errno.h" +#include "src/common/slurm_protocol_defs.h" +#include "src/common/slurm_protocol_api.h" + +#include "src/srun/job.h" +#include "src/srun/io.h" + + +/* number of active threads */ +static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; +static int active = 0; + +typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; + +typedef struct thd { + pthread_t thread; /* thread ID */ + pthread_attr_t attr; /* thread attributes */ + state_t state; /* thread state */ +} thd_t; + +typedef struct task_info { + slurm_msg_t *req_ptr; + job_t *job_ptr; + int host_inx; +} task_info_t; + + +/* + * Static prototypes + */ +static void _sigterm_handler(int); +static void _handle_intr(job_t *, time_t *, time_t *); +static void _sig_thr_setup(sigset_t *set); +static void * _sig_thr(void *); +static void _p_fwd_signal(slurm_msg_t *, job_t *); +static void * _p_signal_task(void *); + + +int +sig_setup_sigmask(void) +{ + sigset_t sigset; + + /* block most signals in all threads, except sigterm */ + sigemptyset(&sigset); + sigaddset(&sigset, SIGINT); + sigaddset(&sigset, SIGQUIT); + sigaddset(&sigset, SIGTSTP); + sigaddset(&sigset, SIGSTOP); + + if (sigprocmask(SIG_BLOCK, &sigset, NULL) != 0) { + error("sigprocmask: %m"); + return SLURM_ERROR; + } + + xsignal(SIGTERM, &_sigterm_handler); + + return SLURM_SUCCESS; +} + +int +sig_thr_create(job_t *job) +{ + int e; + pthread_attr_t attr; + + pthread_attr_init(&attr); + + if ((e = pthread_create(&job->sigid, &attr, &_sig_thr, job)) != 0) + slurm_seterrno_ret(e); + + debug("Started signals thread (%d)", job->sigid); + + return SLURM_SUCCESS; +} + +void +fwd_signal(job_t *job, int signo) +{ + int i; + slurm_msg_t *req_array_ptr; + kill_tasks_msg_t msg; + + debug("forward signal %d to job", signo); + + /* common to all tasks */ + msg.job_id = job->jobid; + msg.job_step_id = job->stepid; + msg.signal = (uint32_t) signo; + + req_array_ptr = (slurm_msg_t *) + xmalloc(sizeof(slurm_msg_t) * job->nhosts); + for (i = 0; i < job->nhosts; i++) { + if (job->host_state[i] != SRUN_HOST_REPLIED) { + debug2("%s has not yet replied\n", job->host[i]); + continue; + } + + req_array_ptr[i].msg_type = REQUEST_KILL_TASKS; + req_array_ptr[i].data = &msg; + memcpy(&req_array_ptr[i].address, + &job->slurmd_addr[i], sizeof(slurm_addr)); + } + + _p_fwd_signal(req_array_ptr, job); + + debug("All tasks have been signalled"); + xfree(req_array_ptr); +} + + +static void +_sigterm_handler(int signum) +{ + if (signum == SIGTERM) { + pthread_exit(0); + } +} + +static void +_handle_intr(job_t *job, time_t *last_intr, time_t *last_intr_sent) +{ + + if ((time(NULL) - *last_intr) > 1) { + info("interrupt (one more within 1 sec to abort)"); + if (mode != MODE_ATTACH) + report_task_status(job); + *last_intr = time(NULL); + } else { /* second Ctrl-C in half as many seconds */ + + /* terminate job */ + if (job->state != SRUN_JOB_OVERDONE) { + + info("sending Ctrl-C to job"); + *last_intr = time(NULL); + fwd_signal(job, SIGINT); + + if ((time(NULL) - *last_intr_sent) < 1) + job_force_termination(job); + else + *last_intr_sent = time(NULL); + } else { + job_force_termination(job); + } + } +} + +static void +_sig_thr_setup(sigset_t *set) +{ + int rc; + + sigemptyset(set); + sigaddset(set, SIGINT); + sigaddset(set, SIGQUIT); + sigaddset(set, SIGTSTP); + sigaddset(set, SIGSTOP); + if ((rc = pthread_sigmask(SIG_BLOCK, set, NULL)) != 0) + error ("pthread_sigmask: %s", slurm_strerror(rc)); +} + +/* simple signal handling thread */ +static void * +_sig_thr(void *arg) +{ + job_t *job = (job_t *)arg; + sigset_t set; + time_t last_intr = 0; + time_t last_intr_sent = 0; + int signo; + + while (1) { + + _sig_thr_setup(&set); + + sigwait(&set, &signo); + debug2("recvd signal %d", signo); + switch (signo) { + case SIGINT: + _handle_intr(job, &last_intr, &last_intr_sent); + break; + case SIGSTOP: + case SIGTSTP: + debug3("Ignoring SIGSTOP"); + break; + case SIGQUIT: + info("Quit"); + job_force_termination(job); + break; + default: + break; + } + } + + pthread_exit(0); +} + +/* _p_fwd_signal - parallel (multi-threaded) task signaller */ +static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) +{ + int i; + task_info_t *task_info_ptr; + thd_t *thread_ptr; + + thread_ptr = xmalloc (job->nhosts * sizeof (thd_t)); + for (i = 0; i < job->nhosts; i++) { + if (req_array_ptr[i].msg_type == 0) + continue; /* inactive task */ + + pthread_mutex_lock(&active_mutex); + while (active >= opt.max_threads) { + pthread_cond_wait(&active_cond, &active_mutex); + } + active++; + pthread_mutex_unlock(&active_mutex); + + task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t)); + task_info_ptr->req_ptr = &req_array_ptr[i]; + task_info_ptr->job_ptr = job; + task_info_ptr->host_inx = i; + + if (pthread_attr_init (&thread_ptr[i].attr)) + error ("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate (&thread_ptr[i].attr, + PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate error %m"); +#ifdef PTHREAD_SCOPE_SYSTEM + if (pthread_attr_setscope (&thread_ptr[i].attr, + PTHREAD_SCOPE_SYSTEM)) + error ("pthread_attr_setscope error %m"); +#endif + while ( pthread_create (&thread_ptr[i].thread, + &thread_ptr[i].attr, + _p_signal_task, + (void *) task_info_ptr) ) { + error ("pthread_create error %m"); + /* just run it under this thread */ + _p_signal_task((void *) task_info_ptr); + } + } + + + pthread_mutex_lock(&active_mutex); + while (active > 0) { + pthread_cond_wait(&active_cond, &active_mutex); + } + pthread_mutex_unlock(&active_mutex); + xfree(thread_ptr); +} + +/* _p_signal_task - parallelized signal of a specific task */ +static void * _p_signal_task(void *args) +{ + task_info_t *info = (task_info_t *)args; + slurm_msg_t *req = info->req_ptr; + job_t *job = info->job_ptr; + int host_inx = info->host_inx; + slurm_msg_t resp; + + debug3("sending signal to host %s", job->host[host_inx]); + if (slurm_send_recv_node_msg(req, &resp) < 0) /* Has timeout */ + error("signal %s: %m", job->host[host_inx]); + else { + return_code_msg_t *rc = resp.data; + if (rc->return_code != 0) { + error("%s: Unable to fwd signal: %s", + job->host[host_inx], + slurm_strerror(rc->return_code)); + } + + if (resp.msg_type == RESPONSE_SLURM_RC) + slurm_free_return_code_msg(resp.data); + } + + pthread_mutex_lock(&active_mutex); + active--; + pthread_cond_signal(&active_cond); + pthread_mutex_unlock(&active_mutex); + xfree(args); + return NULL; +} + + diff --git a/src/srun/signals.h b/src/srun/signals.h new file mode 100644 index 0000000000000000000000000000000000000000..3a35d50e2ad6f66d6155843ee565694b400a57c3 --- /dev/null +++ b/src/srun/signals.h @@ -0,0 +1,36 @@ +/*****************************************************************************\ + * src/srun/signals.h - srun signal handling + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grodnona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SIGNALS_H +#define _SIGNALS_H + +#include "src/srun/job.h" + +void sig_setup_sigmask(void); +int sig_thr_create(job_t *job); +void fwd_signal(job_t *job, int signal); + +#endif /* !_SIGNALS_H */ diff --git a/src/srun/srun.c b/src/srun/srun.c index f0e429f10c8ebc0365942408cd68f49294eb3dc0..ec2302dd9ce067617f16ba849fd95c6e7bb6b315 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -60,6 +60,7 @@ #include "src/srun/io.h" #include "src/srun/job.h" #include "src/srun/launch.h" +#include "src/srun/signals.h" #include "src/srun/reattach.h" #include "src/srun/opt.h" @@ -80,24 +81,6 @@ typedef resource_allocation_and_run_response_msg_t alloc_run_resp; #define TYPE_TEXT 1 #define TYPE_SCRIPT 2 -/* number of active threads */ -static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; -static int active = 0; - -typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; - -typedef struct thd { - pthread_t thread; /* thread ID */ - pthread_attr_t attr; /* thread attributes */ - state_t state; /* thread state */ -} thd_t; - -typedef struct task_info { - slurm_msg_t *req_ptr; - job_t *job_ptr; - int host_inx; -} task_info_t; /* * forward declaration of static funcs @@ -105,18 +88,13 @@ typedef struct task_info { static allocation_resp *_allocate_nodes(void); static void _print_job_information(allocation_resp *resp); static void _create_job_step(job_t *job); -static void _sigterm_handler(int signum); static void _sig_kill_alloc(int signum); -static void * _sig_thr(void *arg); static char *_build_script (char *pathname, int file_type); static char *_get_shell (void); static int _is_file_text (char *fname, char** shell_ptr); static int _run_batch_job (void); static allocation_resp *_existing_allocation(void); static void _run_job_script(uint32_t jobid); -static void _fwd_signal(job_t *job, int signo); -static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job); -static void *_p_signal_task(void *args); static int _set_batch_script_env(uint32_t jobid); #define die(msg, args...) do { \ @@ -132,11 +110,8 @@ static int _set_batch_script_env(uint32_t jobid); int srun(int ac, char **av) { - sigset_t sigset; allocation_resp *resp; job_t *job; - pthread_attr_t attr; - struct sigaction action; bool old_job = false; struct rlimit rlim; @@ -210,38 +185,21 @@ srun(int ac, char **av) slurm_free_resource_allocation_response_msg(resp); } - /* block most signals in all threads, except sigterm */ - sigemptyset(&sigset); - sigaddset(&sigset, SIGINT); - sigaddset(&sigset, SIGQUIT); - sigaddset(&sigset, SIGTSTP); - sigaddset(&sigset, SIGSTOP); - if (sigprocmask(SIG_BLOCK, &sigset, NULL) != 0) - die("sigprocmask: %m"); - action.sa_handler = &_sigterm_handler; - action.sa_flags = 0; - sigaction(SIGTERM, &action, NULL); - /* job structure should now be filled in */ + sig_setup_sigmask(); + if (msg_thr_create(job) < 0) die("Unable to create msg thread"); if (io_thr_create(job) < 0) die("failed to initialize IO"); - pthread_attr_init(&attr); - /* spawn signal thread */ - if ((errno = pthread_create(&job->sigid, &attr, &_sig_thr, - (void *) job)) != 0) - die("Unable to create signal thread. %m"); - debug("Started signals thread (%d)", job->sigid); + if (sig_thr_create(job) < 0) + die("Unable to create signals thread: %m"); - - /* launch jobs */ - if ((errno = pthread_create(&job->lid, &attr, &launch, (void *) job))) - die("Unable to create launch thread. %m"); - debug("Started launch thread (%d)", job->lid); + if (launch_thr_create(job) < 0) + die("Unable to create launch thread: %m"); /* wait for job to terminate */ pthread_mutex_lock(&job->state_mutex); @@ -255,7 +213,7 @@ srun(int ac, char **av) /* job is now overdone, clean up */ if (job->state == SRUN_JOB_FAILED) { info("sending SIGINT to job"); - _fwd_signal(job, SIGINT); + fwd_signal(job, SIGINT); } /* kill launch thread */ @@ -491,202 +449,6 @@ _print_job_information(allocation_resp *resp) } -static void -_sigterm_handler(int signum) -{ - if (signum == SIGTERM) { - pthread_exit(0); - } -} - -static void -_handle_intr(job_t *job, time_t *last_intr, time_t *last_intr_sent) -{ - - if ((time(NULL) - *last_intr) > 1) { - info("interrupt (one more within 1 sec to abort)"); - report_task_status(job); - *last_intr = time(NULL); - } else { /* second Ctrl-C in half as many seconds */ - - /* terminate job */ - if (job->state != SRUN_JOB_OVERDONE) { - - info("sending Ctrl-C to job"); - *last_intr = time(NULL); - _fwd_signal(job, SIGINT); - - if ((time(NULL) - *last_intr_sent) < 1) - job_force_termination(job); - else - *last_intr_sent = time(NULL); - } else { - job_force_termination(job); - } - } -} - -static void -_sig_thr_setup(sigset_t *set) -{ - int rc; - - sigemptyset(set); - sigaddset(set, SIGINT); - sigaddset(set, SIGQUIT); - sigaddset(set, SIGTSTP); - sigaddset(set, SIGSTOP); - if ((rc = pthread_sigmask(SIG_BLOCK, set, NULL)) != 0) - error ("pthread_sigmask: %s", slurm_strerror(rc)); -} - - -/* simple signal handling thread */ -static void * -_sig_thr(void *arg) -{ - job_t *job = (job_t *)arg; - sigset_t set; - time_t last_intr = 0; - time_t last_intr_sent = 0; - int signo; - - while (1) { - - _sig_thr_setup(&set); - - sigwait(&set, &signo); - debug2("recvd signal %d", signo); - switch (signo) { - case SIGINT: - _handle_intr(job, &last_intr, &last_intr_sent); - break; - case SIGSTOP: - case SIGTSTP: - debug3("Ignoring SIGSTOP"); - break; - case SIGQUIT: - info("Quit"); - job_force_termination(job); - break; - default: - break; - } - } - - pthread_exit(0); -} - -static void -_fwd_signal(job_t *job, int signo) -{ - int i; - slurm_msg_t *req_array_ptr; - kill_tasks_msg_t msg; - - debug("forward signal %d to job", signo); - - /* common to all tasks */ - msg.job_id = job->jobid; - msg.job_step_id = job->stepid; - msg.signal = (uint32_t) signo; - - req_array_ptr = (slurm_msg_t *) - xmalloc(sizeof(slurm_msg_t) * job->nhosts); - for (i = 0; i < job->nhosts; i++) { - if (job->host_state[i] != SRUN_HOST_REPLIED) { - debug2("%s has not yet replied\n", job->host[i]); - continue; - } - - req_array_ptr[i].msg_type = REQUEST_KILL_TASKS; - req_array_ptr[i].data = &msg; - memcpy(&req_array_ptr[i].address, - &job->slurmd_addr[i], sizeof(slurm_addr)); - } - - _p_fwd_signal(req_array_ptr, job); - - debug("All tasks have been signalled"); - xfree(req_array_ptr); -} - -/* _p_fwd_signal - parallel (multi-threaded) task signaller */ -static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) -{ - int i; - task_info_t *task_info_ptr; - thd_t *thread_ptr; - - thread_ptr = xmalloc (job->nhosts * sizeof (thd_t)); - for (i = 0; i < job->nhosts; i++) { - if (req_array_ptr[i].msg_type == 0) - continue; /* inactive task */ - - pthread_mutex_lock(&active_mutex); - while (active >= opt.max_threads) { - pthread_cond_wait(&active_cond, &active_mutex); - } - active++; - pthread_mutex_unlock(&active_mutex); - - task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t)); - task_info_ptr->req_ptr = &req_array_ptr[i]; - task_info_ptr->job_ptr = job; - task_info_ptr->host_inx = i; - - if (pthread_attr_init (&thread_ptr[i].attr)) - error ("pthread_attr_init error %m"); - if (pthread_attr_setdetachstate (&thread_ptr[i].attr, - PTHREAD_CREATE_DETACHED)) - error ("pthread_attr_setdetachstate error %m"); -#ifdef PTHREAD_SCOPE_SYSTEM - if (pthread_attr_setscope (&thread_ptr[i].attr, - PTHREAD_SCOPE_SYSTEM)) - error ("pthread_attr_setscope error %m"); -#endif - while ( pthread_create (&thread_ptr[i].thread, - &thread_ptr[i].attr, - _p_signal_task, - (void *) task_info_ptr) ) { - error ("pthread_create error %m"); - /* just run it under this thread */ - _p_signal_task((void *) task_info_ptr); - } - } - - - pthread_mutex_lock(&active_mutex); - while (active > 0) { - pthread_cond_wait(&active_cond, &active_mutex); - } - pthread_mutex_unlock(&active_mutex); - xfree(thread_ptr); -} - -/* _p_signal_task - parallelized signal of a specific task */ -static void * _p_signal_task(void *args) -{ - task_info_t *task_info_ptr = (task_info_t *)args; - slurm_msg_t *req_ptr = task_info_ptr->req_ptr; - job_t *job_ptr = task_info_ptr->job_ptr; - int host_inx = task_info_ptr->host_inx; - slurm_msg_t resp; - - debug3("sending signal to host %s", job_ptr->host[host_inx]); - if (slurm_send_recv_node_msg(req_ptr, &resp) < 0) /* Has timeout */ - error("signal %s: %m", job_ptr->host[host_inx]); - else if (resp.msg_type == RESPONSE_SLURM_RC) - slurm_free_return_code_msg(resp.data); - - pthread_mutex_lock(&active_mutex); - active--; - pthread_cond_signal(&active_cond); - pthread_mutex_unlock(&active_mutex); - xfree(args); - return NULL; -} - /* submit a batch job and return error code */ static int _run_batch_job(void)