From e4591b07c76b5f049cda30aef73b93790298bb7b Mon Sep 17 00:00:00 2001 From: Mark Grondona <mgrondona@llnl.gov> Date: Tue, 28 Jan 2003 20:24:23 +0000 Subject: [PATCH] o src/srun : reorganize many of the node allocation and job step creation functions into allocate.[ch] o src/common/xsignal.[ch] : xsignal returns old signal handler o src/slurmd : remove list of running threads - use number of active threads instead. (terminating slurmd sleeps until all active threads have vanished) --- src/common/xsignal.c | 34 ++++- src/common/xsignal.h | 9 +- src/slurmd/req.c | 2 - src/slurmd/shm.c | 33 ++++- src/slurmd/slurmd.c | 78 +++++------ src/slurmd/slurmd.h | 2 +- src/srun/Makefile.am | 2 +- src/srun/allocate.c | 310 +++++++++++++++++++++++++++++++++++++++++++ src/srun/allocate.h | 80 +++++++++++ src/srun/io.c | 15 ++- src/srun/job.c | 33 ++++- src/srun/job.h | 33 ++++- src/srun/launch.c | 10 +- src/srun/msg.c | 10 +- src/srun/signals.c | 26 ++-- src/srun/srun.c | 251 ++++------------------------------- 16 files changed, 625 insertions(+), 303 deletions(-) create mode 100644 src/srun/allocate.c create mode 100644 src/srun/allocate.h diff --git a/src/common/xsignal.c b/src/common/xsignal.c index a9ddde8247b..e09cb91eee6 100644 --- a/src/common/xsignal.c +++ b/src/common/xsignal.c @@ -33,18 +33,42 @@ #include "src/common/log.h" #include "src/common/slurm_errno.h" +#include "src/common/xsignal.h" -void -xsignal(int signo, void (*handler)(int)) +SigFunc * +xsignal(int signo, SigFunc *f) { struct sigaction sa, old_sa; - sa.sa_handler = handler; + + sa.sa_handler = f; sigemptyset(&sa.sa_mask); sigaddset(&sa.sa_mask, signo); sa.sa_flags = 0; - sigaction(signo, &sa, &old_sa); - return; + if (sigaction(signo, &sa, &old_sa) < 0) + error("xsignal(%d) failed: %m", signo); + return (old_sa.sa_handler); +} + +int +xsignal_unblock(int signo) +{ + sigset_t set; + if (sigemptyset(&set) < 0) { + error("sigemptyset: %m"); + return SLURM_ERROR; + } + if (sigaddset(&set, signo) < 0) { + error("sigaddset: %m"); + return SLURM_ERROR; + } + + if (sigprocmask(SIG_UNBLOCK, &set, NULL) < 0) { + error("sigprocmask: %m"); + return SLURM_ERROR; + } + + return SLURM_SUCCESS; } int diff --git a/src/common/xsignal.h b/src/common/xsignal.h index 5b169d5d807..ca8b00f1d16 100644 --- a/src/common/xsignal.h +++ b/src/common/xsignal.h @@ -30,10 +30,17 @@ #include <signal.h> +typedef void SigFunc(int); + /* * Install a signal handler in the POSIX way, but with BSD signal() semantics */ -struct sigaction *xsignal(int signo, void (*handler)(int)); +SigFunc *xsignal(int signo, SigFunc *); + +/* + * Unblock a single signal + */ +int xsignal_unblock(int signo); /* * Unblock all possible signals diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 5b73c90ee4f..09d824ca79b 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -144,7 +144,6 @@ _launch_batch_job(batch_job_launch_msg_t *req, slurm_addr *cli) break; case 0: /* child runs job */ slurm_shutdown_msg_engine(conf->lfd); - list_destroy(conf->threads); destroy_credential_state_list(conf->cred_state_list); slurm_destroy_ssl_key_ctx(&conf->vctx); slurm_ssl_destroy(); @@ -175,7 +174,6 @@ _launch_tasks(launch_tasks_request_msg_t *req, slurm_addr *cli) break; case 0: /* child runs job */ slurm_shutdown_msg_engine(conf->lfd); - list_destroy(conf->threads); destroy_credential_state_list(conf->cred_state_list); slurm_destroy_ssl_key_ctx(&conf->vctx); slurm_ssl_destroy(); diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index 1db81f5dd2b..1b68bb13a62 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -78,7 +78,7 @@ #define SHM_LOCKNAME "/.slurm.lock" /* Increment SHM_VERSION if format changes */ -#define SHM_VERSION 1001 +#define SHM_VERSION 1004 /* These macros convert shared memory pointers to local memory * pointers and back again. Pointers in shared memory are relative @@ -188,12 +188,25 @@ void shm_cleanup(void) { char *s; + key_t key; if ((s = _create_ipc_name(SHM_LOCKNAME))) { + key = ftok(s, 1); info("request to destroy shm lock `%s'", s); if (sem_unlink(s) < 0) error("sem_unlink: %m"); xfree(s); + } + + /* This seems to be the only way to get a shared memory ID given + * a key, if you don't already know the size of the region. + */ + if ((shmid = shmget(key, 1, 0)) < 0) { + error ("Unable to get shmid: %m"); + } + + if ((shmid > 0) && (shmctl(shmid, IPC_RMID, NULL) < 0)) { + error ("Unable to destroy existing shm segment"); } } @@ -824,10 +837,26 @@ static int _shm_attach() { int oflags = 0; + struct shmid_ds shmi; key_t key = ftok(lockname, 1); - if ((shmid = shmget(key, sizeof(slurmd_shm_t), oflags)) < 0) + if ((shmid = shmget(key, 1/* sizeof(slurmd_shm_t) */, oflags)) < 0) { + error("shmget: %m"); return SLURM_ERROR; + } + + if (shmctl(shmid, IPC_STAT, &shmi) < 0) { + error ("shmctl: unable to get info for shm id %d", shmid); + } + + if (shmi.shm_segsz != sizeof(slurmd_shm_t)) { + error("size for shm segment id %d is %dK, expected %dK", + shmid, (shmi.shm_segsz/1024), + (sizeof(slurmd_shm_t)/1024)); + error("You probably need to run with `-c' " + "or just delete old segment."); + slurm_seterrno_ret(EINVAL); + } slurmd_shm = shmat(shmid, NULL, 0); if (slurmd_shm == (void *)-1 || slurmd_shm == NULL) { diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index 93d82a86cc5..7b4137ec76d 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -66,6 +66,8 @@ # define MAXHOSTNAMELEN 64 #endif +#define MAX_THREADS 64 + #define DEFAULT_SPOOLDIR "/tmp/slurmd" #define DEFAULT_PIDFILE "/var/run/slurmd.pid" @@ -74,6 +76,14 @@ typedef struct connection { slurm_addr *cli_addr; } conn_t; +/* + * count of active threads + */ +static int active_threads = 0; +static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; + + /* * static shutdown and reconfigure flags: @@ -85,8 +95,6 @@ static void _term_handler(int); static void _hup_handler(int); static void _process_cmdline(int ac, char **av); static void _create_msg_socket(); -static void _tid_free(pthread_t *); -static pthread_t *_tid_copy(pthread_t *); static void _msg_engine(); static int _slurmd_init(); static int _slurmd_fini(); @@ -98,6 +106,8 @@ static void _kill_old_slurmd(); static void _list_recovered_creds(List list); static void _reconfigure(); static void _restore_cred_state(List *list); +static void _increment_thd_count(); +static void _decrement_thd_count(); static void _wait_for_all_threads(); static void _set_slurmd_spooldir(void); static void _usage(); @@ -197,34 +207,33 @@ _msg_engine() return; } -static pthread_t * -_tid_copy(pthread_t *tid) +static void +_decrement_thd_count(void) { - pthread_t *id = xmalloc(sizeof(*id)); - *id = *tid; - return id; + slurm_mutex_lock(&active_mutex); + active_threads--; + slurm_mutex_unlock(&active_mutex); } static void -_tid_free(pthread_t *tid) +_increment_thd_count(void) { - xfree(tid); + slurm_mutex_lock(&active_mutex); + while (active_threads >= MAX_THREADS) + pthread_cond_wait(&active_cond, &active_mutex); + active_threads++; + slurm_mutex_unlock(&active_mutex); } static void _wait_for_all_threads() { - ListIterator i; - pthread_t *ptid; - - debug("Waiting for %d running threads", list_count(conf->threads)); - - i = list_iterator_create(conf->threads); - while ((ptid = list_next(i))) { - pthread_join(*ptid, NULL); - debug2("thread %d finished", *ptid); + slurm_mutex_lock(&active_mutex); + while (active_threads > 0) { + verbose("waiting on %d active threads", active_threads); + pthread_cond_wait(&active_cond, &active_mutex); } - list_iterator_destroy(i); + slurm_mutex_unlock(&active_mutex); } static void @@ -240,31 +249,27 @@ _handle_connection(slurm_fd fd, slurm_addr *cli) if ((rc = pthread_attr_init(&attr)) != 0) { error("pthread_attr_init: %s", slurm_strerror(rc)); + xfree(arg); return; } - /* - * Do not create a detached thread. - * - * rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - * if (rc != 0) { - * error("Unable to set detachstate on attr: %s", - * slurm_strerror(rc)); - * return; - * } - */ + rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (rc != 0) { + errno = rc; + xfree(arg); + error("Unable to set detachstate on attr: %m"); + return; + } fd_set_close_on_exec(fd); + _increment_thd_count(); rc = pthread_create(&id, &attr, &_service_connection, (void *) arg); if (rc != 0) { error("msg_engine: pthread_create: %s", slurm_strerror(rc)); - _service_connection((void *) &arg); + _service_connection((void *) arg); return; } - - list_append(conf->threads, (void *) _tid_copy(&id)); - return; } @@ -278,7 +283,6 @@ static void * _service_connection(void *arg) { int rc; - pthread_t tid = pthread_self(); conn_t *con = (conn_t *) arg; slurm_msg_t *msg = xmalloc(sizeof(*msg)); @@ -290,8 +294,10 @@ _service_connection(void *arg) slurmd_req(msg, con->cli_addr); } slurm_close_accepted_conn(con->fd); + xfree(con); - list_delete_all(conf->threads, (ListFindF) _find_tid, &tid); + _decrement_thd_count(); + return NULL; } @@ -539,7 +545,6 @@ _slurmd_init() slurm_ssl_init(); slurm_init_verifier(&conf->vctx, conf->pubkey); _restore_cred_state(&conf->cred_state_list); - conf->threads = list_create((ListDelF) _tid_free); if (conf->shm_cleanup) shm_cleanup(); if (shm_init() < 0) @@ -609,7 +614,6 @@ static void _list_recovered_creds(List list) static int _slurmd_fini() { - list_destroy(conf->threads); save_cred_state(conf->cred_state_list); slurm_destroy_ssl_key_ctx(&conf->vctx); slurm_ssl_destroy(); diff --git a/src/slurmd/slurmd.h b/src/slurmd/slurmd.h index 95441ae119a..b12788ad8c8 100644 --- a/src/slurmd/slurmd.h +++ b/src/slurmd/slurmd.h @@ -81,7 +81,7 @@ typedef struct slurmd_config { int shm_cleanup:1; List cred_state_list; /* credential stat list */ - List threads; /* list of active threads */ + slurm_ssl_ctx vctx; /* ssl context for cred utils */ uid_t slurm_user_id; /* UID that slurmctld runs as */ pthread_mutex_t config_mutex; /* lock for slurmd_config access */ diff --git a/src/srun/Makefile.am b/src/srun/Makefile.am index bb02f35271d..5c8b2581636 100644 --- a/src/srun/Makefile.am +++ b/src/srun/Makefile.am @@ -16,7 +16,7 @@ bin_PROGRAMS = srun srun_SOURCES = srun.c opt.c env.c opt.h env.h job.c job.h net.c net.h \ msg.c msg.h io.c io.h launch.h launch.c attach.h \ reattach.c reattach.h fname.h fname.c srun.wrapper.c \ - signals.c signals.h + signals.c signals.h allocate.c allocate.h srun_LDADD = $(top_builddir)/src/common/libcommon.la \ $(top_builddir)/src/api/libslurm.la \ diff --git a/src/srun/allocate.c b/src/srun/allocate.c new file mode 100644 index 00000000000..0adb797e0cc --- /dev/null +++ b/src/srun/allocate.c @@ -0,0 +1,310 @@ +/*****************************************************************************\ + * src/srun/allocate.c - srun functions for managing node allocations + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <stdlib.h> + +#include "src/common/xsignal.h" +#include "src/common/xmalloc.h" +#include "src/common/log.h" +#include "src/api/slurm.h" + +#include "src/srun/allocate.h" +#include "src/srun/attach.h" +#include "src/srun/opt.h" + +#define MAX_RETRIES 10 + +/* + * Static Prototypes + */ +static void _wait_for_resources(resource_allocation_response_msg_t *resp); +static bool _retry(); +static bool _destroy_job(bool set); +static void _intr_handler(int signo); + +static job_step_create_request_msg_t * _step_req_create(job_t *j); +static void _step_req_destroy(job_step_create_request_msg_t *r); + + +resource_allocation_response_msg_t * +allocate_nodes(void) +{ + int rc = 0; + resource_allocation_response_msg_t *resp = NULL; + job_desc_msg_t *j = job_desc_msg_create(); + + while ((rc = slurm_allocate_resources(j, &resp) < 0) && _retry()) {;} + + if ((rc == 0) && (resp->node_list == NULL)) + _wait_for_resources(resp); + + job_desc_msg_destroy(j); + + return resp; +} + +/* Return jobid from environment + * + * Returns jobid if SLURM_JOBID was set in the user's environment + * else returns 0 + */ +uint32_t +jobid_from_env(void) +{ + char *p, *q; + uint32_t jobid; + + if (!(p = getenv("SLURM_JOBID"))) + return 0; + + jobid = (uint32_t) strtoul(p, &q, 10); + if (*q != '\0') { + error ("Invalid value for SLURM_JOBID: `%s'", p); + return 0; + } + + return jobid; +} + +resource_allocation_response_msg_t * +existing_allocation(void) +{ + old_job_alloc_msg_t job; + resource_allocation_response_msg_t *resp = NULL; + + if ((job.job_id = jobid_from_env()) == 0) + return NULL; + job.uid = getuid(); + + if (slurm_confirm_allocation(&job, &resp) < 0) { + error("Unable to confirm resource allocation for job %u: %m"); + exit(1); + } + + return resp; +} + + +static void +_wait_for_resources(resource_allocation_response_msg_t *resp) +{ + SigFunc *old_handler; + old_job_alloc_msg_t old_job; + + info ("job %u queued and waiting for resources", resp->job_id); + + xsignal_unblock(SIGINT); + old_handler = xsignal(SIGINT, _intr_handler); + + old_job.job_id = resp->job_id; + old_job.uid = (uint32_t) getuid(); + slurm_free_resource_allocation_response_msg (resp); + sleep (2); + + /* Keep polling until the job is allocated resources */ + while (slurm_confirm_allocation(&old_job, &resp) < 0) { + if (slurm_get_errno() == ESLURM_JOB_PENDING) { + debug3("Still waiting for allocation"); + sleep(5); + } else { + error("Unable to confirm resource allocation for " + "job %u: %m", old_job.job_id); + exit (1); + } + + if (_destroy_job(0)) { + verbose("cancelling job %u", old_job.job_id); + slurm_complete_job(old_job.job_id, 0, 0); +#ifdef HAVE_TOTALVIEW + tv_launch_failure(); +#endif + exit(0); + } + + } + + xsignal(SIGINT, SIG_IGN); +} + + + +static bool +_retry() +{ + static int retries = 0; + static char *msg = "Slurm controller not responding, " + "sleeping and retrying."; + + if (errno == ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) { + if (retries == 0) + error (msg); + else if (retries < MAX_RETRIES) + debug (msg); + else + return false; + sleep (++retries); + } else { + error("Unable to allocate resources: %m"); + return false; + } + + return true; +} + +/* Returns true if user requested immediate destruction of pending job + * _destroy_job() will return false until it has been called once with + * parameter `set' equal to TRUE. + */ +static bool +_destroy_job(bool set) +{ + static bool destroy = false; + return (set ? (destroy = true) : destroy); +} + +/* + * SIGINT handler while waiting for resources to become available. + */ +static void +_intr_handler(int signo) +{ + _destroy_job(true); +} + + +/* + * Create job description struction based off srun options + * (see opt.h) + */ +job_desc_msg_t * +job_desc_msg_create(void) +{ + job_desc_msg_t *j = xmalloc(sizeof(*j)); + + slurm_init_job_desc_msg(j); + + j->contiguous = opt.contiguous; + j->features = opt.constraints; + j->immediate = opt.immediate; + j->name = opt.job_name; + j->req_nodes = opt.nodelist; + j->exc_nodes = opt.exc_nodes; + j->partition = opt.partition; + j->min_nodes = opt.min_nodes; + j->num_tasks = opt.nprocs; + j->user_id = opt.uid; + + if (opt.max_nodes) + j->max_nodes = opt.max_nodes; + if (opt.mincpus > -1) + j->min_procs = opt.mincpus; + if (opt.realmem > -1) + j->min_memory = opt.realmem; + if (opt.tmpdisk > -1) + j->min_tmp_disk = opt.tmpdisk; + + if (opt.overcommit) + j->num_procs = opt.min_nodes; + else + j->num_procs = opt.nprocs * opt.cpus_per_task; + + if (opt.no_kill) + j->kill_on_node_fail = 0; + if (opt.time_limit > -1) + j->time_limit = opt.time_limit; + if (opt.share) + j->shared = 1; + + return (j); +} + +void +job_desc_msg_destroy(job_desc_msg_t *j) +{ + xfree(j); +} + +static job_step_create_request_msg_t * +_step_req_create(job_t *j) +{ + job_step_create_request_msg_t *r = xmalloc(sizeof(*r)); + r->job_id = j->jobid; + r->user_id = opt.uid; + r->node_count = j->nhosts; + r->cpu_count = opt.overcommit ? j->nhosts + : (opt.nprocs*opt.cpus_per_task); + r->num_tasks = opt.nprocs; + r->node_list = j->nodelist; + r->relative = false; /* XXX fix this oneday */ + + switch (opt.distribution) { + case SRUN_DIST_UNKNOWN: + r->task_dist = (opt.nprocs <= j->nhosts) ? SLURM_DIST_CYCLIC + : SLURM_DIST_BLOCK; + break; + case SRUN_DIST_CYCLIC: + r->task_dist = SLURM_DIST_CYCLIC; + break; + default: /* (opt.distribution == SRUN_DIST_BLOCK) */ + r->task_dist = SLURM_DIST_BLOCK; + break; + } + + return(r); +} + +static void +_step_req_destroy(job_step_create_request_msg_t *r) +{ + xfree(r); +} + +void +create_job_step(job_t *job) +{ + job_step_create_request_msg_t *req = NULL; + job_step_create_response_msg_t *resp = NULL; + + if (!(req = _step_req_create(job))) + job_fatal (job, "Unable to allocate step request message"); + + if ((slurm_job_step_create(req, &resp) < 0) || (resp == NULL)) + job_fatal (job, "Unable to create job step: %m"); + + job->stepid = resp->job_step_id; + job->cred = resp->credentials; +#ifdef HAVE_LIBELAN3 + job->qsw_job = resp->qsw_job; +#endif + _step_req_destroy(req); + +} + diff --git a/src/srun/allocate.h b/src/srun/allocate.h new file mode 100644 index 00000000000..c46791f7ad9 --- /dev/null +++ b/src/srun/allocate.h @@ -0,0 +1,80 @@ +/*****************************************************************************\ + * src/srun/allocate.h - node allocation functions for srun + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _HAVE_ALLOCATE_H +#define _HAVE_ALLOCATE_H + +#include "src/api/slurm.h" +#include "src/srun/job.h" + +/* + * Allocate nodes from the slurm controller -- retrying the attempt + * if the controller appears to be down, and optionally waiting for + * resources if none are currently available (see opt.immediate) + * + * Returns a pointer to a resource_allocation_response_msg which must + * be freed with slurm_free_resource_allocation_response_msg() + */ +resource_allocation_response_msg_t * allocate_nodes(void); + +/* + * Create a job_desc_msg_t object, filled in from the current srun options + * (see opt.h) + * The resulting memory must be freed with job_desc_msg_destroy() + */ +job_desc_msg_t * job_desc_msg_create(void); + +/* + * Destroy (free memory from) a job_desc_msg_t object allocated with + * job_desc_msg_create() + */ +void job_desc_msg_destroy(job_desc_msg_t *j); + +/* + * Check for SLURM_JOBID environment variable, and if it is a valid + * jobid, return a pseudo allocation response pointer. + * + * Returns NULL if SLURM_JOBID is not present or is invalid. + */ +resource_allocation_response_msg_t * existing_allocation(void); + +/* + * Return the jobid number stored in SLURM_JOBID env var + * + * Returns 0 if SLURM_JOBID is not set in current environment, or + * is invalid. + */ +uint32_t jobid_from_env(void); + +/* + * Create a job step given the job information stored in 'j' + * After returning, 'j' is filled in with information for job step. + */ +void create_job_step(job_t *j); + + +#endif /* !_HAVE_ALLOCATE_H */ diff --git a/src/srun/io.c b/src/srun/io.c index 89dd749faf2..a37351020ea 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -78,7 +78,7 @@ static int _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum); static int _do_task_output_poll(fd_info_t *info); static int _do_task_input(job_t *job, int taskid); static int _do_task_input_poll(job_t *job, fd_info_t *info); -static inline bool _job_io_done(job_t *job); +static inline bool _io_thr_done(job_t *job); static int _handle_pollerr(fd_info_t *info); static char * _host_state_name(host_state_t state_inx); static ssize_t _readn(int fd, void *buf, size_t nbytes); @@ -244,7 +244,7 @@ _io_thr_poll(void *job_arg) for (i = 0; i < job->niofds; i++) _poll_set_rd(fds[i], job->iofd[i]); - while (!_job_io_done(job)) { + while (!_io_thr_done(job)) { int eofcnt = 0; nfds = job->niofds; /* already have n ioport fds + stdin */ @@ -299,7 +299,7 @@ _io_thr_poll(void *job_arg) pthread_exit(0); } - while ((!_job_io_done(job)) && + while ((!_io_thr_done(job)) && ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0)) { if (rc == 0) { /* timeout */ _do_poll_timeout(job); @@ -420,9 +420,14 @@ static char *_host_state_name(host_state_t state_inx) } } -static bool _job_io_done(job_t *job) +static inline bool +_io_thr_done(job_t *job) { - return (job->state >= SRUN_JOB_FORCETERM); + bool retval; + slurm_mutex_lock(&job->state_mutex); + retval = (job->state >= SRUN_JOB_FORCETERM); + slurm_mutex_unlock(&job->state_mutex); + return retval; } void report_task_status(job_t *job) diff --git a/src/srun/job.c b/src/srun/job.c index 17fd3c11293..109f81bda6f 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -100,6 +100,8 @@ _job_create_internal(allocation_info_t *info) */ _set_nprocs(info); + debug2("creating job with %d tasks", opt.nprocs); + job = xmalloc(sizeof(*job)); slurm_mutex_init(&job->state_mutex); @@ -313,19 +315,42 @@ void job_fatal(job_t *job, const char *msg) { if (msg) error(msg); - job_destroy(job); + job_destroy(job, errno); exit(1); } void -job_destroy(job_t *job) +job_destroy(job_t *job, int error) { if (job->old_job) { debug("cancelling job step %u.%u", job->jobid, job->stepid); - slurm_complete_job_step(job->jobid, job->stepid, 0, 0); + slurm_complete_job_step(job->jobid, job->stepid, 0, error); } else if (!opt.no_alloc) { debug("cancelling job %u", job->jobid); - slurm_complete_job(job->jobid, 0, 0); + slurm_complete_job(job->jobid, 0, error); + } else { + debug("no allocation to cancel"); + return; + } + +#ifdef HAVE_TOTALVIEW + if (error) tv_launch_failure(); +#endif +} + +int +job_active_tasks_on_host(job_t *job, int hostid) +{ + int i; + int retval = 0; + + slurm_mutex_lock(&job->task_mutex); + for (i = 0; i < job->ntask[hostid]; i++) { + uint32_t tid = job->tids[hostid][i]; + if (job->task_state[tid] == SRUN_TASK_RUNNING) + retval++; } + slurm_mutex_unlock(&job->task_mutex); + return retval; } diff --git a/src/srun/job.h b/src/srun/job.h index 59ba5a92f6d..95f05e8ee9c 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -1,5 +1,29 @@ -/* an srun "job" */ - +/*****************************************************************************\ + * src/srun/job.h - specification of an srun "job" + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ #ifndef _HAVE_JOB_H #define _HAVE_JOB_H @@ -62,6 +86,7 @@ typedef struct srun_job { int *cpus; /* number of processors on each host */ int *ntask; /* number of tasks to run on each host */ uint32_t **tids; /* host id => task ids mapping */ + uint32_t *hostid; /* task id => host id mapping */ slurm_addr *slurmd_addr;/* slurm_addr vector to slurmd's */ @@ -117,6 +142,8 @@ job_t * job_create_noalloc(void); job_t * job_create_allocation(resource_allocation_response_msg_t *resp); void job_fatal(job_t *job, const char *msg); -void job_destroy(job_t *job); +void job_destroy(job_t *job, int error); + +int job_active_tasks_on_host(job_t *job, int hostid); #endif /* !_HAVE_JOB_H */ diff --git a/src/srun/launch.c b/src/srun/launch.c index f82eecd8cd5..ff4d3a6c147 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -79,7 +79,8 @@ _dist_block(job_t *job) for (i=0; ((i<job->nhosts) && (taskid<opt.nprocs)); i++) { for (j=0; (((j*opt.cpus_per_task)<job->cpus[i]) && (taskid<opt.nprocs)); j++) { - job->tids[i][j] = taskid++; + job->hostid[taskid] = i; + job->tids[i][j] = taskid++; job->ntask[i]++; } } @@ -92,7 +93,8 @@ _dist_cyclic(job_t *job) for (j=0; (taskid<opt.nprocs); j++) { /* cycle counter */ for (i=0; ((i<job->nhosts) && (taskid<opt.nprocs)); i++) { if (j < job->cpus[i]) { - job->tids[i][j] = taskid++; + job->hostid[taskid] = i; + job->tids[i][j] = taskid++; job->ntask[i]++; } } @@ -131,7 +133,8 @@ launch(void *arg) debug("sending to slurmd port %d", slurm_get_slurmd_port()); /* Build task id list for each host */ - job->tids = xmalloc(job->nhosts * sizeof(uint32_t *)); + job->tids = xmalloc(job->nhosts * sizeof(uint32_t *)); + job->hostid = xmalloc(opt.nprocs * sizeof(uint32_t)); for (i = 0; i < job->nhosts; i++) job->tids[i] = xmalloc(job->cpus[i] * sizeof(uint32_t)); @@ -275,7 +278,6 @@ _send_msg_rc(slurm_msg_t *msg) { slurm_msg_t resp; return_code_msg_t *rcmsg = NULL; - int errcode = 0; int rc = 0; if ((rc = slurm_send_recv_node_msg(msg, &resp)) < 0) diff --git a/src/srun/msg.c b/src/srun/msg.c index cc435673ba3..1068a1df8ad 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -204,6 +204,8 @@ static void update_running_tasks(job_t *job, uint32_t nodeid) { int i; + debug2("updating %d running tasks for node %d", + job->ntask[nodeid], nodeid); slurm_mutex_lock(&job->task_mutex); for (i = 0; i < job->ntask[nodeid]; i++) { uint32_t tid = job->tids[nodeid][i]; @@ -314,10 +316,12 @@ _reattach_handler(job_t *job, slurm_msg_t *msg) /* * store global task id information as returned from slurmd */ - job->tids[resp->srun_node_id] = xmalloc(resp->ntasks * - sizeof(uint32_t)); - for (i = 0; i < resp->ntasks; i++) + job->tids[resp->srun_node_id] = xmalloc( resp->ntasks * + sizeof(uint32_t) ); + job->ntask[resp->srun_node_id] = resp->ntasks; + for (i = 0; i < resp->ntasks; i++) { job->tids[resp->srun_node_id][i] = resp->gids[i]; + } #if HAVE_TOTALVIEW if ((remote_argc == 0) && (resp->executable_name)) { diff --git a/src/srun/signals.c b/src/srun/signals.c index 54c9bdef11e..d143f4ce815 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -79,9 +79,14 @@ static void _p_fwd_signal(slurm_msg_t *, job_t *); static void * _p_signal_task(void *); -static bool _job_sig_done(job_t *job) +static inline bool +_sig_thr_done(job_t *job) { - return (job->state >= SRUN_JOB_DONE); + bool retval; + slurm_mutex_lock(&job->state_mutex); + retval = (job->state >= SRUN_JOB_DONE); + slurm_mutex_unlock(&job->state_mutex); + return retval; } int @@ -146,6 +151,9 @@ fwd_signal(job_t *job, int signo) continue; } + if (job_active_tasks_on_host(job, i) == 0) + continue; + req_array_ptr[i].msg_type = REQUEST_KILL_TASKS; req_array_ptr[i].data = &msg; memcpy(&req_array_ptr[i].address, @@ -220,7 +228,7 @@ _sig_thr(void *arg) time_t last_intr_sent = 0; int signo; - while (!_job_sig_done(job)) { + while (!_sig_thr_done(job)) { _sig_thr_setup(&set); @@ -263,12 +271,12 @@ static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) if (req_array_ptr[i].msg_type == 0) continue; /* inactive task */ - pthread_mutex_lock(&active_mutex); + slurm_mutex_lock(&active_mutex); while (active >= opt.max_threads) { pthread_cond_wait(&active_cond, &active_mutex); } active++; - pthread_mutex_unlock(&active_mutex); + slurm_mutex_unlock(&active_mutex); task_info_ptr = (task_info_t *)xmalloc(sizeof(task_info_t)); task_info_ptr->req_ptr = &req_array_ptr[i]; @@ -296,11 +304,11 @@ static void _p_fwd_signal(slurm_msg_t *req_array_ptr, job_t *job) } - pthread_mutex_lock(&active_mutex); + slurm_mutex_lock(&active_mutex); while (active > 0) { pthread_cond_wait(&active_cond, &active_mutex); } - pthread_mutex_unlock(&active_mutex); + slurm_mutex_unlock(&active_mutex); xfree(thread_ptr); } @@ -328,10 +336,10 @@ static void * _p_signal_task(void *args) slurm_free_return_code_msg(resp.data); } - pthread_mutex_lock(&active_mutex); + slurm_mutex_lock(&active_mutex); active--; pthread_cond_signal(&active_cond); - pthread_mutex_unlock(&active_mutex); + slurm_mutex_unlock(&active_mutex); xfree(args); return NULL; } diff --git a/src/srun/srun.c b/src/srun/srun.c index eed374c29d4..c273f06e654 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -62,6 +62,7 @@ #include "src/srun/launch.h" #include "src/srun/signals.h" #include "src/srun/reattach.h" +#include "src/srun/allocate.h" #include "src/srun/opt.h" #include "src/srun/net.h" @@ -74,30 +75,24 @@ #define MAX_RETRIES 20 -typedef resource_allocation_response_msg_t allocation_resp; -typedef resource_allocation_and_run_response_msg_t alloc_run_resp; - #define TYPE_NOT_TEXT 0 #define TYPE_TEXT 1 #define TYPE_SCRIPT 2 +typedef resource_allocation_response_msg_t allocation_resp; +typedef resource_allocation_and_run_response_msg_t alloc_run_resp; + /* * forward declaration of static funcs */ -static allocation_resp *_allocate_nodes(void); -static void _print_job_information(allocation_resp *resp); -static void _create_job_step(job_t *job); -static inline bool _job_all_done(job_t *job); -static void _sig_kill_alloc(int signum); -static char *_build_script (char *pathname, int file_type); -static char *_get_shell (void); -static int _is_file_text (char *, char**); -static int _run_batch_job (void); -static allocation_resp *_existing_allocation(void); -static void _run_job_script(uint32_t jobid, uint32_t node_cnt); -static int _set_batch_script_env(uint32_t jobid, - uint32_t node_cnt); +static void _print_job_information(allocation_resp *resp); +static char *_build_script (char *pathname, int file_type); +static char *_get_shell (void); +static int _is_file_text (char *, char**); +static int _run_batch_job (void); +static void _run_job_script(uint32_t jobid, uint32_t node_cnt); +static int _set_batch_script_env(uint32_t jobid, uint32_t node_cnt); #ifdef HAVE_LIBELAN3 @@ -134,6 +129,8 @@ srun(int ac, char **av) log_alter(logopt, 0, NULL); } + sig_setup_sigmask(); + /* now global "opt" should be filled in and available, * create a job from opt */ @@ -149,24 +146,24 @@ srun(int ac, char **av) _qsw_standalone(job); #endif - } else if ( (resp = _existing_allocation()) ) { + } else if ( (resp = existing_allocation()) ) { if (opt.allocate) { error("job %u already has an allocation", resp->job_id); exit(1); } job = job_create_allocation(resp); job->old_job = true; - _create_job_step(job); + create_job_step(job); slurm_free_resource_allocation_response_msg(resp); } else if (opt.allocate) { - if ( !(resp = _allocate_nodes()) ) + if ( !(resp = allocate_nodes()) ) exit(1); if (_verbose) _print_job_information(resp); job = job_create_allocation(resp); _run_job_script(resp->job_id, resp->node_cnt); - slurm_complete_job(resp->job_id, 0, 0); + job_destroy(job, 0); debug ("Spawned srun shell terminated"); exit (0); @@ -176,20 +173,18 @@ srun(int ac, char **av) exit (0); } else { - if ( !(resp = _allocate_nodes()) ) + if ( !(resp = allocate_nodes()) ) exit(1); if (_verbose) _print_job_information(resp); job = job_create_allocation(resp); - _create_job_step(job); + create_job_step(job); slurm_free_resource_allocation_response_msg(resp); } /* job structure should now be filled in */ - sig_setup_sigmask(); - if (msg_thr_create(job) < 0) job_fatal(job, "Unable to create msg thread"); @@ -202,21 +197,22 @@ srun(int ac, char **av) if (launch_thr_create(job) < 0) job_fatal(job, "Unable to create launch thread: %m"); - /* wait for job to terminate */ + /* wait for job to terminate + */ slurm_mutex_lock(&job->state_mutex); - debug3("before main state loop: state = %d", job->state); - while (!_job_all_done(job)) + while (job->state < SRUN_JOB_TERMINATED) pthread_cond_wait(&job->state_cond, &job->state_mutex); slurm_mutex_unlock(&job->state_mutex); - /* job is now overdone, clean up */ + /* job is now overdone, clean up + */ if (job->state == SRUN_JOB_FAILED) { info("sending SIGINT to job"); fwd_signal(job, SIGINT); } /* Tell slurmctld that job is done */ - job_destroy(job); + job_destroy(job, 0); /* wait for launch thread */ if (pthread_join(job->lid, NULL) < 0) @@ -236,125 +232,6 @@ srun(int ac, char **av) return 0; } -/* - * allocate nodes from slurm controller via slurm api - * will xmalloc memory for allocation response, which caller must free - * initialization if set - */ -static allocation_resp * -_allocate_nodes(void) -{ - int rc, retries; - job_desc_msg_t job; - resource_allocation_response_msg_t *resp; - old_job_alloc_msg_t old_job; - - slurm_init_job_desc_msg(&job); - - job.contiguous = opt.contiguous; - job.features = opt.constraints; - job.immediate = opt.immediate; - job.name = opt.job_name; - job.req_nodes = opt.nodelist; - job.exc_nodes = opt.exc_nodes; - job.partition = opt.partition; - job.min_nodes = opt.min_nodes; - if (opt.max_nodes) - job.max_nodes = opt.max_nodes; - job.num_tasks = opt.nprocs; - job.user_id = opt.uid; - if (opt.mincpus > -1) - job.min_procs = opt.mincpus; - if (opt.realmem > -1) - job.min_memory = opt.realmem; - if (opt.tmpdisk > -1) - job.min_tmp_disk = opt.tmpdisk; - - if (opt.overcommit) - job.num_procs = opt.min_nodes; - else - job.num_procs = opt.nprocs * opt.cpus_per_task; - - if (opt.no_kill) - job.kill_on_node_fail = 0; - if (opt.time_limit > -1) - job.time_limit = opt.time_limit; - if (opt.share) - job.shared = 1; - - retries = 0; - while ((rc = slurm_allocate_resources(&job, &resp)) < 0) { - if ((slurm_get_errno() == ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) - && (retries < MAX_RETRIES)) { - char *msg = "Slurm controller not responding, " - "sleeping and retrying"; - if (retries == 0) - error (msg); - else - debug (msg); - - sleep (++retries); - } else { - error("Unable to allocate resources: %m"); - return NULL; - } - } - - if ((rc == 0) && (resp->node_list == NULL)) { - struct sigaction action, old_action; - int fake_job_id = (0 - resp->job_id); - info ("Job %u queued and waiting for resources", resp->job_id); - _sig_kill_alloc(fake_job_id); - action.sa_handler = &_sig_kill_alloc; - /* action.sa_flags = SA_ONESHOT; */ - sigaction(SIGINT, &action, &old_action); - old_job.job_id = resp->job_id; - old_job.uid = (uint32_t) getuid(); - slurm_free_resource_allocation_response_msg (resp); - sleep (2); - - /* Keep polling until the job is allocated resources */ - while (slurm_confirm_allocation(&old_job, &resp) < 0) { - if (slurm_get_errno() == ESLURM_JOB_PENDING) { - debug3("Still waiting for allocation"); - sleep (10); - } else { - error("Unable to confirm resource " - "allocation for job %u: %m", - old_job.job_id); - exit (1); - } - } - sigaction(SIGINT, &old_action, NULL); - } - - return resp; -} - - -static void -_sig_kill_alloc(int signum) -{ - static uint32_t job_id = 0; - - if (signum == SIGINT) { /* <Control-C> */ - slurm_complete_job (job_id, 0, 0); -#ifdef HAVE_TOTALVIEW - tv_launch_failure(); -#endif - exit (0); - } else if (signum < 0) - job_id = (uint32_t) (0 - signum); /* kluge to pass job id */ - else - fatal ("_sig_kill_alloc called with invalid argument", signum); - -} - -static bool _job_all_done(job_t *job) -{ - return (job->state >= SRUN_JOB_TERMINATED); -} - #ifdef HAVE_LIBELAN3 static void @@ -378,51 +255,6 @@ _qsw_standalone(job_t *job) } #endif /* HAVE_LIBELAN3 */ -static void -_create_job_step(job_t *job) -{ - job_step_create_request_msg_t req; - job_step_create_response_msg_t *resp; - - req.job_id = job->jobid; - req.user_id = opt.uid; - req.node_count = job->nhosts; - if (opt.overcommit) - req.cpu_count = job->nhosts; - else - req.cpu_count = opt.nprocs * opt.cpus_per_task; - req.num_tasks = opt.nprocs; - req.node_list = job->nodelist; - req.relative = false; - - if (opt.distribution == SRUN_DIST_UNKNOWN) { - if (opt.nprocs <= job->nhosts) - opt.distribution = SRUN_DIST_CYCLIC; - else - opt.distribution = SRUN_DIST_BLOCK; - } - if (opt.distribution == SRUN_DIST_BLOCK) - req.task_dist = SLURM_DIST_BLOCK; - else /* (opt.distribution == SRUN_DIST_CYCLIC) */ - req.task_dist = SLURM_DIST_CYCLIC; - - if (slurm_job_step_create(&req, &resp) || (resp == NULL)) { - error("unable to create job step: %s", slurm_strerror(errno)); - slurm_complete_job(job->jobid, 0, errno); -#ifdef HAVE_TOTALVIEW - tv_launch_failure(); -#endif - exit(1); - } - - job->stepid = resp->job_step_id; - job->cred = resp->credentials; -#ifdef HAVE_LIBELAN3 - job->qsw_job= resp->qsw_job; -#endif - -} - static void _print_job_information(allocation_resp *resp) @@ -705,39 +537,6 @@ _build_script (char *fname, int file_type) return buffer; } -/* If this is a valid job then return a (psuedo) allocation response pointer, - * otherwise return NULL */ -static allocation_resp * -_existing_allocation( void ) -{ - char * jobid_str, *end_ptr; - uint32_t jobid_uint; - old_job_alloc_msg_t job; - allocation_resp *resp; - - /* Load SLURM_JOBID environment variable */ - jobid_str = getenv( "SLURM_JOBID" ); - if (jobid_str == NULL) - return NULL; - jobid_uint = (uint32_t) strtoul( jobid_str, &end_ptr, 10 ); - if (end_ptr[0] != '\0') { - error( "Invalid SLURM_JOBID environment variable: %s", - jobid_str ); - exit( 1 ); - } - - /* Confirm that this job_id is legitimate */ - job.job_id = jobid_uint; - job.uid = (uint32_t) getuid(); - - if (slurm_confirm_allocation(&job, &resp) == SLURM_FAILURE) { - error("Unable to confirm resource allocation for job %u: %s", - jobid_uint, slurm_strerror(errno)); - exit( 1 ); - } - - return resp; -} static int _set_batch_script_env(uint32_t jobid, uint32_t node_cnt) -- GitLab