diff --git a/src/common/log.c b/src/common/log.c index df54eda89f8aba7252fb4fa2c3c782c4a6dde811..47dafb86e4eba5924f792e40e4df7594859154b4 100644 --- a/src/common/log.c +++ b/src/common/log.c @@ -381,7 +381,7 @@ static void log_msg(log_level_t level, const char *fmt, va_list args) case LOG_LEVEL_DEBUG: priority = LOG_DEBUG; - pfx = "debug: "; + pfx = "debug: "; break; case LOG_LEVEL_DEBUG2: diff --git a/src/common/slurm_errno.c b/src/common/slurm_errno.c index 6604490a0a5071c67c4f3f0f062b1c80f7aba81c..02899b6e4fb074cb6325ae0094529ffe084147cb 100644 --- a/src/common/slurm_errno.c +++ b/src/common/slurm_errno.c @@ -230,6 +230,8 @@ static slurm_errtab_t slurm_errtab[] = { "Slurmd could not fork batch job" }, { ESLURMD_EXECVE_FAILED, "Slurmd could not execve batch job" }, + { ESLURMD_IO_ERROR, + "Slurmd could not connect IO" }, /* slurmd errors in user batch job */ { ESCRIPT_CHDIR_FAILED, diff --git a/src/common/slurm_errno.h b/src/common/slurm_errno.h index 50a05bdd786c370c13e8e4e1c7bad2ea03e3a5d7..0956c1da000a2abad93eb7a6f184a032a66eee30 100644 --- a/src/common/slurm_errno.h +++ b/src/common/slurm_errno.h @@ -137,6 +137,7 @@ enum { ESLURMD_CANNOT_SPAWN_IO_THREAD, ESLURMD_FORK_FAILED, ESLURMD_EXECVE_FAILED, + ESLURMD_IO_ERROR, /* slurmd errors in user batch job */ ESCRIPT_CHDIR_FAILED = 4100, diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index d5c52bb16518a805160c885e413110a209c4a517..a61de5d812958a495c0f7efa725b442f762b0930 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -249,6 +249,8 @@ typedef struct launch_tasks_request_msg { char *efname; char *ifname; + int32_t slurmd_debug; /* remote slurmd debug level */ + slurm_job_credential_t *credential; /* job credential */ #ifdef HAVE_LIBELAN3 diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 043e17c8a093c39279ca4b25eaefcb2b3bb841d9..18d56643f22d0e68380dcc1f4dae5202df41d70c 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -2014,6 +2014,7 @@ _pack_launch_tasks_request_msg(launch_tasks_request_msg_t * msg, Buf buffer) packstr(msg->ofname, buffer); packstr(msg->efname, buffer); packstr(msg->ifname, buffer); + pack32(msg->slurmd_debug, buffer); pack32_array(msg->global_task_ids, msg->tasks_to_launch, buffer); #ifdef HAVE_LIBELAN3 @@ -2050,6 +2051,7 @@ _unpack_launch_tasks_request_msg(launch_tasks_request_msg_t ** safe_unpackstr_xmalloc(&msg->ofname, &uint16_tmp, buffer); safe_unpackstr_xmalloc(&msg->efname, &uint16_tmp, buffer); safe_unpackstr_xmalloc(&msg->ifname, &uint16_tmp, buffer); + safe_unpack32(&msg->slurmd_debug, buffer); safe_unpack32_array(&msg->global_task_ids, &uint32_tmp, buffer); if (msg->tasks_to_launch != uint32_tmp) goto unpack_error; diff --git a/src/slurmd/fname.c b/src/slurmd/fname.c index 9d6bc9d1cf1d3fbca21e456081027009ba99306d..349a9bd332df8890fb5d674d8e3092a1909e9cb2 100644 --- a/src/slurmd/fname.c +++ b/src/slurmd/fname.c @@ -174,10 +174,8 @@ fname_trunc_all(slurmd_job_t *job, const char *fmt) filei = list_iterator_create(files); while ((fname = list_next(filei))) { if ((rc = _trunc_file(fname)) < 0) - goto done; + break; } - - done: list_destroy(files); return rc; } diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 2f980ce337c71294d1c54d56deb91d306114c13d..5d7117ccca44695f3d83c876ce34a96ab72a5700 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -113,7 +113,6 @@ static bool _isa_client(struct io_info *io); static bool _isa_task(struct io_info *io); static int _io_init_pipes(task_info_t *t); -static int _io_prepare_clients(slurmd_job_t *); static int _io_prepare_tasks(slurmd_job_t *); static void * _io_thr(void *); static int _io_write_header(struct io_info *, srun_info_t *); @@ -235,13 +234,7 @@ io_spawn_handler(slurmd_job_t *job) xassert(_validate_io_list(job->objs)); pthread_create(&job->ioid, &attr, &_io_thr, (void *)job); - - /* open 2*ntask initial connections or files for stdout/err - * append these to objs list - */ - if (_io_prepare_clients(job) < 0) - return SLURM_FAILURE; - + return 0; } @@ -278,11 +271,12 @@ _io_finalize(task_info_t *t) /* Need to close all stdin writers * * We effectively close these writers by - * forcing them to be unwritable. This will + * forcing them to be unreadable. This will * prevent the IO thread from hanging waiting * for stdin data. (While also not forcing the * close of a pipe that is also writable) */ + if (in->writers) { ListIterator i; @@ -306,6 +300,8 @@ io_close_all(slurmd_job_t *job) for (i = 0; i < job->ntasks; i++) _io_finalize(job->task[i]); + close(STDERR_FILENO); + /* Signal IO thread to close appropriate * client connections */ @@ -407,7 +403,7 @@ _obj_set_unwritable(io_obj_t *obj) obj->ops->writable = NULL; } -static void +static int _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, slurmd_io_type_t type) { @@ -419,7 +415,7 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, /* XXX retry or silently fail? * fail for now. */ - return; + return SLURM_ERROR; } fd_set_nonblocking(sock); @@ -438,6 +434,8 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, } list_append(job->objs, (void *)obj); + + return SLURM_SUCCESS; } /* @@ -447,6 +445,12 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, static int _io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) { + /* Try hard to get stderr connected to something + */ + if ( (_open_output_file(j, t, s->efname, CLIENT_STDERR) < 0) + && (_io_add_connecting(j, t, s, CLIENT_STDERR) < 0) ) + return SLURM_FAILURE; + if (s->ofname) { if (_open_output_file(j, t, s->ofname, CLIENT_STDOUT) < 0) return SLURM_FAILURE; @@ -454,13 +458,6 @@ _io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) _io_add_connecting(j, t, s, CLIENT_STDOUT); } - if (s->efname) { - if (_open_output_file(j, t, s->efname, CLIENT_STDERR) < 0) - return SLURM_FAILURE; - } else { - _io_add_connecting(j, t, s, CLIENT_STDERR); - } - if (s->ifname) { if (_open_stdin_file(j, t, s) < 0) return SLURM_FAILURE; @@ -477,8 +474,8 @@ _io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) /* * create initial client objs for N tasks */ -static int -_io_prepare_clients(slurmd_job_t *job) +int +io_prepare_clients(slurmd_job_t *job) { int i; srun_info_t *srun; @@ -487,10 +484,12 @@ _io_prepare_clients(slurmd_job_t *job) xassert(srun != NULL); if (srun->ofname && (fname_trunc_all(job, srun->ofname) < 0)) - return SLURM_FAILURE; + goto error; - if (srun->efname && (fname_trunc_all(job, srun->efname) < 0)) - return SLURM_FAILURE; + if (srun->efname && (strcmp(srun->ofname, srun->efname) != 0)) { + if (fname_trunc_all(job, srun->efname) < 0) + goto error; + } if (srun->ioaddr.sin_addr.s_addr) { char host[256]; @@ -503,19 +502,28 @@ _io_prepare_clients(slurmd_job_t *job) * local file */ for (i = 0; i < job->ntasks; i++) { - _io_prepare_one(job, job->task[i], srun); + if (_io_prepare_one(job, job->task[i], srun) < 0) + return SLURM_FAILURE; /* kick IO thread */ pthread_kill(job->ioid, SIGHUP); } return SLURM_SUCCESS; + + error: + /* + * Try to open stderr connection for errors + */ + _io_add_connecting(job, job->task[0], srun, CLIENT_STDERR); + pthread_kill(job->ioid, SIGHUP); + return SLURM_FAILURE; } int io_new_clients(slurmd_job_t *job) { - return _io_prepare_clients(job); + return io_prepare_clients(job); } static int @@ -540,10 +548,14 @@ _open_output_file(slurmd_job_t *job, task_info_t *t, char *fmt, int fd = -1; io_obj_t *obj = NULL; int flags = O_APPEND|O_WRONLY; - char *fname = fname_create(job, fmt, t->gid); + char *fname ; + + if (fmt == NULL) + return SLURM_ERROR; xassert((type == CLIENT_STDOUT) || (type == CLIENT_STDERR)); + fname = fname_create(job, fmt, t->gid); if ((fd = _open_task_file(fname, flags)) > 0) { debug("opened `%s' for %s fd %d", fname, _io_str[type], fd); obj = _io_obj(job, t, fd, type); @@ -1494,6 +1506,7 @@ _validate_client_stderr(struct io_info *client) xassert(client->magic == IO_MAGIC); xassert(!client->readers); xassert(client->obj->ops->writable != NULL); + i = list_iterator_create(client->writers); while ((t = list_next(i))) { xassert(t->magic == IO_MAGIC); diff --git a/src/slurmd/io.h b/src/slurmd/io.h index 0262abf94165323db64111c00997ef192e0828bd..8442601f6d16bfc8d08ac87ee9a9ea926be9546a 100644 --- a/src/slurmd/io.h +++ b/src/slurmd/io.h @@ -56,6 +56,10 @@ int io_prepare_child(task_info_t *t); void io_close_all(slurmd_job_t *job); +/* + * Connect initial N tasks to their stdio + */ +int io_prepare_clients(slurmd_job_t *job); /* Notes: * diff --git a/src/slurmd/job.c b/src/slurmd/job.c index 04efba2cce6048b780c1a6d08b2506efd35a6932..ec1a50dddd97b6c80d967e8ca8546d341e8d6fcf 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -118,6 +118,7 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->nnodes = msg->nnodes; job->nodeid = msg->srun_node_id; job->ntasks = msg->tasks_to_launch; + job->debug = msg->slurmd_debug; job->timelimit = msg->credential->expiration_time; diff --git a/src/slurmd/job.h b/src/slurmd/job.h index 6b561440d71e9873c0f8d40b50ec5e936399ed54..199b790313e1b25c6d46de1c317e700aa762b14a 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -90,6 +90,7 @@ typedef struct slurmd_job { uint32_t nprocs; uint32_t nodeid; uint32_t ntasks; + uint32_t debug; uint16_t envc; uint16_t argc; char **env; diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 34cac33a2a9a0dc80a0724799fb9b3318a7b99b1..27290ed5b32157bf240176652d11abb895670173 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -63,7 +63,7 @@ static int _run_job(slurmd_job_t *job); static int _run_batch_job(slurmd_job_t *job); -static void _exec_all_tasks(slurmd_job_t *job); +static int _exec_all_tasks(slurmd_job_t *job); static void _task_exec(slurmd_job_t *job, int i, bool batch); static int _drop_privileges(struct passwd *pwd); static int _reclaim_privileges(struct passwd *pwd); @@ -71,6 +71,9 @@ static int _become_user(slurmd_job_t *job); static int _unblock_all_signals(void); static int _send_exit_msg(int rc, task_info_t *t); static int _complete_job(slurmd_job_t *job, int rc, int status); +static void _send_launch_resp(slurmd_job_t *job, int rc); +static void _wait_for_all_tasks(slurmd_job_t *job); +static void _slurmd_job_log_init(slurmd_job_t *job); static void _setargs(slurmd_job_t *job, char **argv, int argc) @@ -123,7 +126,6 @@ mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) shm_fini(); return(SLURM_SUCCESS); error: - job_error(job, "cannot run job: %m"); shm_fini(); return(SLURM_ERROR); } @@ -284,7 +286,6 @@ static int _run_job(slurmd_job_t *job) { int rc = SLURM_SUCCESS; - int i = 0; struct passwd *spwd = getpwuid(geteuid()); /* Insert job info into shared memory */ @@ -294,39 +295,59 @@ _run_job(slurmd_job_t *job) job_error(job, "interconnect_init: %m"); rc = -2; /* shm_init(); */ - goto done; + goto fail; + } + + if ((rc = io_spawn_handler(job)) < 0) { + rc = ESLURMD_IO_ERROR; + goto fail1; } + /* connect job stderr to this node's task 0 stderr so + * user recieves error messages on stderr + */ + _slurmd_job_log_init(job); + /* * Temporarily drop permissions */ - if ((rc = _drop_privileges(job->pwd)) < 0) - goto done; + if ((rc = _drop_privileges(job->pwd)) < 0) + goto fail2; - /* Option: connect slurmd stderr to srun local task 0: stderr? */ - if (io_spawn_handler(job) == SLURM_ERROR) { - job_error(job, "unable to spawn io handler"); - rc = -3; - goto done; - } + /* Open input/output files and/or connections back to client + */ + rc = io_prepare_clients(job); if (_reclaim_privileges(spwd) < 0) error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); - _exec_all_tasks(job); - job_debug2(job, "job complete, waiting on IO"); + if (rc < 0) { + rc = ESLURMD_IO_ERROR; + goto fail2; + } + + rc = _exec_all_tasks(job); + _send_launch_resp(job, rc); + _wait_for_all_tasks(job); + + job_debug2(job, "all tasks exited, waiting on IO"); io_close_all(job); pthread_join(job->ioid, NULL); job_debug2(job, "IO complete"); - done: interconnect_fini(job); /* ignore errors */ job_delete_shm(job); /* again, ignore errors */ - job_verbose(job, "job complete with rc = %d", rc); - if (rc < 0) { - for (i = 0; i < job->ntasks; i++) - _send_exit_msg(-rc, job->task[i]); - } + job_verbose(job, "job completed", rc); + return rc; + +fail2: + io_close_all(job); + pthread_join(job->ioid, NULL); +fail1: + interconnect_fini(job); +fail: + job_delete_shm(job); + _send_launch_resp(job, rc); return rc; } @@ -417,27 +438,24 @@ _run_batch_job(slurmd_job_t *job) pid_t sid, pid; struct passwd *spwd = getpwuid(getuid()); - /* Temporarily drop permissions to initiate - * IO thread. This will ensure that calling user - * has appropriate permissions to open output - * files, if any. - */ - if (_drop_privileges(job->pwd) < 0) { - error("seteuid(%ld) : %m", job->uid); - return ESLURMD_SET_UID_OR_GID_ERROR; - } - rc = io_spawn_handler(job); + if ((rc = io_spawn_handler(job)) < 0) { + return ESLURMD_IO_ERROR; + } - /* seteuid/gid back to saved uid/gid + /* + * Temporarily drop permissions */ - if (_reclaim_privileges(spwd) < 0) { - error("seteuid(%ld) : %m", spwd->pw_uid); + if ((rc = _drop_privileges(job->pwd)) < 0) return ESLURMD_SET_UID_OR_GID_ERROR; - } - /* Give up if we couldn't spawn IO handler for whatever reason + /* Open input/output files and/or connections back to client */ + rc = io_prepare_clients(job); + + if (_reclaim_privileges(spwd) < 0) + error("sete{u/g}id(%ld/%ld): %m", spwd->pw_uid, spwd->pw_gid); + if (rc < 0) return ESLURMD_CANNOT_SPAWN_IO_THREAD; @@ -635,7 +653,7 @@ _task_exec(slurmd_job_t *job, int i, bool batch) exit(errno); } -static void +static int _exec_all_tasks(slurmd_job_t *job) { pid_t sid; @@ -663,7 +681,7 @@ _exec_all_tasks(slurmd_job_t *job) if ((t.pid = fork()) < 0) { error("fork: %m"); - exit(1); + return 1; /* job_cleanup() */ } else if (t.pid == 0) /* child */ break; @@ -682,12 +700,12 @@ _exec_all_tasks(slurmd_job_t *job) } if (i == job->ntasks) - _wait_for_all_tasks(job); + return 0; /* _wait_for_all_tasks(job); */ else _task_exec(job, i, false); debug3("All tasks exited"); - return; + return 0; } static int @@ -733,3 +751,52 @@ _unblock_all_signals(void) } return SLURM_SUCCESS; } + +static void +_send_launch_resp(slurmd_job_t *job, int rc) +{ + int i; + slurm_msg_t resp_msg; + launch_tasks_response_msg_t resp; + srun_info_t *srun = list_peek(job->sruns); + + job_debug(job, "Sending launch resp rc=%d", rc); + + resp_msg.address = srun->resp_addr; + resp_msg.data = &resp; + resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; + + resp.node_name = conf->hostname; + resp.srun_node_id = job->nodeid; + resp.return_code = rc; + resp.count_of_pids = job->ntasks; + + resp.local_pids = xmalloc(job->ntasks * sizeof(*resp.local_pids)); + for (i = 0; i < job->ntasks; i++) + resp.local_pids[i] = job->task[i]->pid; + + slurm_send_only_node_msg(&resp_msg); + + xfree(resp.local_pids); +} + +static void +_slurmd_job_log_init(slurmd_job_t *job) +{ + char argv0[64]; + log_options_t logopt = LOG_OPTS_STDERR_ONLY; + + /* Connect slurmd stderr to job's stderr */ + if (dup2(job->task[0]->perr[1], STDERR_FILENO) < 0) { + error("job_log_init: dup2(stderr): %m"); + return; + } + + logopt.stderr_level += job->debug; + + snprintf(argv0, sizeof(argv0), "slurmd[%s]", conf->hostname); + + /* reinitialize log to log on stderr */ + log_init(argv0, logopt, 0, NULL); +} + diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 82033d59d75d558f7526321b7ccc5780ded78081..572673d1b960b00803dbbc18684de7e62182055b 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -177,10 +177,8 @@ _rpc_launch_tasks(slurm_msg_t *msg, slurm_addr *cli) uint16_t port; char host[MAXHOSTNAMELEN]; uid_t req_uid; - slurm_msg_t resp_msg; - launch_tasks_response_msg_t resp; + bool super_user = false; launch_tasks_request_msg_t *req = msg->data; - bool super_user = false; req_uid = slurm_auth_uid(msg->cred); if ((req_uid == conf->slurm_user_id) || (req_uid == 0)) @@ -207,22 +205,11 @@ _rpc_launch_tasks(slurm_msg_t *msg, slurm_addr *cli) return; } - rc = _launch_tasks(req, cli); - memcpy(&resp_msg.address, cli, sizeof(slurm_addr)); - slurm_set_addr(&resp_msg.address, req->resp_port, NULL); - - resp_msg.data = &resp; - resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; - - resp.node_name = conf->hostname; - resp.srun_node_id = req->srun_node_id; - resp.return_code = rc; - resp.count_of_pids = 0; - resp.local_pids = NULL; /* array type of uint32_t */ - - slurm_send_only_node_msg(&resp_msg); + if ((rc = _launch_tasks(req, cli))) + slurm_send_rc_msg(msg, rc); } + static void _rpc_batch_job(slurm_msg_t *msg, slurm_addr *cli) { diff --git a/src/srun/io.c b/src/srun/io.c index 0c30ac2a4dc6f06b9c63787b9bee7ec2c7abfad3..956c911aad30824639f856bc965536e9d4eda975 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -344,15 +344,26 @@ _io_thr_poll(void *job_arg) static void _do_poll_timeout (job_t *job) { - int i, age; + int i, age, eofcnt = 0; static bool term_msg_sent = false; + for (i = 0; ((i < opt.nprocs) && (time_first_done == 0)); i++) { if ((job->task_state[i] == SRUN_TASK_FAILED) || (job->task_state[i] == SRUN_TASK_EXITED)) time_first_done = time(NULL); } + for (i = 0; i < opt.nprocs; i++) { + if ((job->err[i] == IO_DONE) && (job->out[i] == IO_DONE)) + eofcnt++; + } + + if (eofcnt == opt.nprocs) { + _flush_io(job); + pthread_exit(0); + } + age = time(NULL) - time_first_done; if (job->state == SRUN_JOB_FAILED) { debug3("job failed, exiting IO thread"); diff --git a/src/srun/job.h b/src/srun/job.h index 7a9a26dc912b67a73dd1ebffe151988d845e82bf..8886d14f8a6d1acdeb5be16a5b1df5e66bb10afb 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -56,6 +56,8 @@ typedef struct srun_job { char **host; /* hostname vector */ int *cpus; /* number of processors on each host */ int *ntask; /* number of tasks to run on each host */ + uint32_t **tids; /* host id => task ids mapping */ + slurm_addr *slurmd_addr;/* slurm_addr vector to slurmd's */ pthread_t sigid; /* signals thread tid */ diff --git a/src/srun/launch.c b/src/srun/launch.c index 73d8ed462a1d53560fb077234d8dbecffabb8626..091bb06ddb8781741e2b8b7dd0063f1bbdf77900 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -62,8 +62,8 @@ typedef struct task_info { job_t *job_ptr; } task_info_t; -static void _dist_block(job_t *job, uint32_t **task_ids); -static void _dist_cyclic(job_t *job, uint32_t **task_ids); +static void _dist_block(job_t *job); +static void _dist_cyclic(job_t *job); static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job); static void * _p_launch_task(void *args); static void _print_launch_msg(launch_tasks_request_msg_t *msg, @@ -71,26 +71,26 @@ static void _print_launch_msg(launch_tasks_request_msg_t *msg, static int _envcount(char **env); static void -_dist_block(job_t *job, uint32_t **task_ids) +_dist_block(job_t *job) { int i, j, taskid = 0; for (i=0; ((i<job->nhosts) && (taskid<opt.nprocs)); i++) { for (j=0; (((j*opt.cpus_per_task)<job->cpus[i]) && (taskid<opt.nprocs)); j++) { - task_ids[i][j] = taskid++; + job->tids[i][j] = taskid++; job->ntask[i]++; } } } static void -_dist_cyclic(job_t *job, uint32_t **task_ids) +_dist_cyclic(job_t *job) { int i, j, taskid = 0; for (j=0; (taskid<opt.nprocs); j++) { /* cycle counter */ for (i=0; ((i<job->nhosts) && (taskid<opt.nprocs)); i++) { if (j < job->cpus[i]) { - task_ids[i][j] = taskid++; + job->tids[i][j] = taskid++; job->ntask[i]++; } } @@ -105,7 +105,6 @@ launch(void *arg) job_t *job = (job_t *) arg; int i, my_envc; char hostname[MAXHOSTNAMELEN]; - uint32_t **task_ids; update_job_state(job, SRUN_JOB_LAUNCHING); if (gethostname(hostname, MAXHOSTNAMELEN) < 0) @@ -115,10 +114,9 @@ launch(void *arg) debug("sending to slurmd port %d", slurm_get_slurmd_port()); /* Build task id list for each host */ - task_ids = (uint32_t **) xmalloc(job->nhosts * sizeof(uint32_t *)); + job->tids = xmalloc(job->nhosts * sizeof(uint32_t *)); for (i = 0; i < job->nhosts; i++) - task_ids[i] = (uint32_t *) xmalloc(job->cpus[i] * - sizeof(uint32_t)); + job->tids[i] = xmalloc(job->cpus[i] * sizeof(uint32_t)); if (opt.distribution == SRUN_DIST_UNKNOWN) { if (opt.nprocs <= job->nhosts) @@ -128,15 +126,12 @@ launch(void *arg) } if (opt.distribution == SRUN_DIST_BLOCK) - _dist_block(job, task_ids); + _dist_block(job); else - _dist_cyclic(job, task_ids); + _dist_cyclic(job); - msg_array_ptr = (launch_tasks_request_msg_t *) - xmalloc(sizeof(launch_tasks_request_msg_t) * - job->nhosts); - req_array_ptr = (slurm_msg_t *) - xmalloc(sizeof(slurm_msg_t) * job->nhosts); + msg_array_ptr = xmalloc(sizeof(launch_tasks_request_msg_t)*job->nhosts); + req_array_ptr = xmalloc(sizeof(slurm_msg_t) * job->nhosts); my_envc = _envcount(environ); for (i = 0; i < job->nhosts; i++) { launch_tasks_request_msg_t *r = &msg_array_ptr[i]; @@ -154,6 +149,7 @@ launch(void *arg) r->cwd = opt.cwd; r->nnodes = job->nhosts; r->nprocs = opt.nprocs; + r->slurmd_debug = opt.slurmd_debug; if (job->ofname->type == IO_PER_TASK) r->ofname = job->ofname->name; @@ -164,7 +160,7 @@ launch(void *arg) /* Node specific message contents */ r->tasks_to_launch = job->ntask[i]; - r->global_task_ids = task_ids[i]; + r->global_task_ids = job->tids[i]; r->srun_node_id = (uint32_t)i; r->io_port = ntohs(job->ioport[i%job->niofds]); r->resp_port = ntohs(job->jaddr[i%job->njfds].sin_port); @@ -185,9 +181,6 @@ launch(void *arg) _p_launch(req_array_ptr, job); - for (i = 0; i < job->nhosts; i++) - xfree(task_ids[i]); - xfree(task_ids); xfree(msg_array_ptr); xfree(req_array_ptr); diff --git a/src/srun/msg.c b/src/srun/msg.c index 3c03b1cfef5692cd549569c05d9ddb55528a1274..0aeb6241439d72cd8b3931f0554c8aa6b089ff76 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -107,33 +107,84 @@ _build_tv_list(launch_tasks_response_msg_t *msg) #endif static void -_launch_handler(job_t *job, slurm_msg_t *resp) +_process_launch_resp(job_t *job, launch_tasks_response_msg_t *msg) { - launch_tasks_response_msg_t *msg = - (launch_tasks_response_msg_t *) resp->data; + int i; + + if ( (msg->srun_node_id >= 0) + && (msg->srun_node_id < job->nhosts) ) { - debug2("received launch resp from %s nodeid=%d", msg->node_name, - msg->srun_node_id); - - if (msg->return_code != 0) { - error("recvd return code %d from %s", msg->return_code, - msg->node_name); - return; - } else { pthread_mutex_lock(&job->task_mutex); - if ((msg->srun_node_id >= 0) && - (msg->srun_node_id < job->nhosts)) { - job->host_state[msg->srun_node_id] = - SRUN_HOST_REPLIED; + job->host_state[msg->srun_node_id] = SRUN_HOST_REPLIED; + pthread_mutex_unlock(&job->task_mutex); #ifdef HAVE_TOTALVIEW - _build_tv_list(msg); + _build_tv_list(msg); #endif - } else - error("launch resp from %s has bad task_id %d", + if (_verbose) { + hostlist_t pids = hostlist_create(NULL); + char buf[1024]; + for (i = 0; i < msg->count_of_pids; i++) { + snprintf(buf, sizeof(buf), "pids:%d", + msg->local_pids[i]); + hostlist_push(pids, buf); + } + + hostlist_ranged_string(pids, sizeof(buf), buf); + verbose("%s: %s", msg->node_name, buf); + } + + } else + error("launch resp from %s has bad task_id %d", msg->node_name, msg->srun_node_id); - pthread_mutex_unlock(&job->task_mutex); +} + +static void +update_failed_tasks(job_t *job, uint32_t nodeid) +{ + int i; + slurm_mutex_lock(&job->task_mutex); + for (i = 0; i < job->ntask[nodeid]; i++) { + uint32_t tid = job->tids[nodeid][i]; + if (job->err[tid] == WAITING_FOR_IO) + job->err[tid] = IO_DONE; + if (job->out[tid] == WAITING_FOR_IO) + job->out[tid] = IO_DONE; + job->task_state[tid] = SRUN_TASK_FAILED; + tasks_exited++; } + slurm_mutex_unlock(&job->task_mutex); + + if (tasks_exited == opt.nprocs) { + debug2("all tasks exited"); + update_job_state(job, SRUN_JOB_OVERDONE); + } + +} + +static void +_launch_handler(job_t *job, slurm_msg_t *resp) +{ + launch_tasks_response_msg_t *msg = resp->data; + + debug2("received launch resp from %s nodeid=%d", msg->node_name, + msg->srun_node_id); + + if (msg->return_code != 0) { + + error("%s: launch failed: %s", + msg->node_name, slurm_strerror(msg->return_code)); + slurm_mutex_lock(&job->task_mutex); + job->host_state[msg->srun_node_id] = SRUN_HOST_REPLIED; + slurm_mutex_unlock(&job->task_mutex); + + if (!opt.no_kill) + update_job_state(job, SRUN_JOB_FAILED); + else + update_failed_tasks(job, msg->srun_node_id); + return; + } else + _process_launch_resp(job, msg); } /* _confirm_launch_complete @@ -203,7 +254,7 @@ _exit_handler(job_t *job, slurm_msg_t *exit_msg) taskid, _taskid2hostname(taskid, job), msg->return_code); else - verbose("task %d exited with status 0", taskid); + debug("task %d exited with status 0", taskid); slurm_mutex_lock(&job->task_mutex); job->tstatus[taskid] = msg->return_code; diff --git a/src/srun/opt.c b/src/srun/opt.c index 32a07e36116d7d2aefee8f984abe7dfab60c3747..0d978d7b2c2e522cf11755373e781b3d9cb6f679 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -177,8 +177,8 @@ struct poptOption runTable[] = { {"overcommit", 'O', POPT_ARG_NONE, &opt.overcommit, 0, "overcommit resources", }, - {"kill-off", 'k', POPT_ARG_NONE, &opt.fail_kill, 0, - "do not kill job on node failure", + {"no-kill", 'k', POPT_ARG_NONE, &opt.no_kill, 0, + "Do not kill job on node failure", }, {"share", 's', POPT_ARG_NONE, &opt.share, 0, "share node with other jobs", @@ -206,6 +206,8 @@ struct poptOption runTable[] = { "err"}, {"verbose", 'v', 0, 0, OPT_VERBOSE, "verbose operation (multiple -v's increase verbosity)", }, + {"slurmd-debug", 'd', POPT_ARG_INT, &opt.slurmd_debug, OPT_DEBUG, + "slurmd debug level", "value"}, {"threads", 'T', POPT_ARG_INT, &opt.max_threads, OPT_THREADS, "number of threads in srun", "threads"}, @@ -259,7 +261,8 @@ typedef struct env_vars { } env_vars_t; env_vars_t env_vars[] = { - {"SLURM_DEBUG", OPT_DEBUG, NULL, NULL}, + {"SLURM_DEBUG", OPT_VERBOSE, NULL, NULL}, + {"SLURMD_DEBUG", OPT_INT, &opt.slurmd_debug, NULL}, {"SLURM_NPROCS", OPT_INT, &opt.nprocs, &opt.nprocs_set}, {"SLURM_CPUS_PER_TASK", OPT_INT, &opt.cpus_per_task, &opt.cpus_set}, {"SLURM_NNODES", OPT_INT, &opt.nodes, &opt.nodes_set}, @@ -495,7 +498,7 @@ static void _opt_default() opt.overcommit = false; opt.batch = false; opt.share = false; - opt.fail_kill = false; + opt.no_kill = false; #ifdef HAVE_TOTALVIEW opt.totalview = _under_totalview(); #endif @@ -508,7 +511,7 @@ static void _opt_default() opt.max_wait = 0; _verbose = 0; - _debug = 0; + opt.slurmd_debug = LOG_LEVEL_QUIET; /* constraint default (-1 is no constraint) */ opt.mincpus = -1; @@ -566,7 +569,7 @@ static void _opt_env() case OPT_DEBUG: if (val != NULL) { - _debug = + _verbose = (int) strtol(val, &end, 10); if (!(end && *end == '\0')) { error("%s=%s invalid", @@ -689,10 +692,6 @@ static void _opt_args(int ac, char **av) _verbose++; break; - case OPT_DEBUG: - _debug++; - break; - case OPT_OUTPUT: opt.ofname = strdup(arg); break; diff --git a/src/srun/opt.h b/src/srun/opt.h index 92410f345e2cdddbad5e1bc45d21607f0005f49a..acdde91a3ad04865a4fe5761dc282d9e573e7d18 100644 --- a/src/srun/opt.h +++ b/src/srun/opt.h @@ -115,6 +115,7 @@ typedef struct srun_options { char *ifname; /* --input -i filename */ char *efname; /* --error, -e filename */ + int slurmd_debug; /* --slurmd-debug, -D */ char *core_format; /* --corefile-format=, -C type */ char *attach; /* --attach=id -a id */ bool join; /* --join, -j */ @@ -129,9 +130,9 @@ typedef struct srun_options { bool allocate; /* --allocate, -A */ bool overcommit; /* --overcommit, -O */ bool batch; /* --batch, -b */ - bool fail_kill; /* --kill, -k */ + bool no_kill; /* --no-kill, -k */ bool share; /* --share, -s */ - int max_wait; /* --wait, -w */ + int max_wait; /* --wait, -W */ #ifdef HAVE_TOTALVIEW bool totalview; /* srun controlled by TotalView */ #endif diff --git a/src/srun/srun.c b/src/srun/srun.c index 278d37812f993eaf56c8a4276f26a3ee23d57230..c301d2bfca59fdb8939781fa80209dcfcce74b11 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -317,7 +317,7 @@ _allocate_nodes(void) else job.num_procs = opt.nprocs * opt.cpus_per_task; - if (opt.fail_kill) + if (opt.no_kill) job.kill_on_node_fail = 0; if (opt.time_limit > -1) job.time_limit = opt.time_limit; @@ -494,7 +494,8 @@ _sig_thr(void *arg) { job_t *job = (job_t *)arg; sigset_t set; - time_t last_intr = 0; + time_t last_intr = 0; + time_t last_intr_sent = 0; int signo; while (1) { @@ -520,6 +521,12 @@ _sig_thr(void *arg) info("sending Ctrl-C to job"); last_intr = time(NULL); _fwd_signal(job, signo); + if ((time(NULL) - last_intr_sent) < 1) { + info("forcing termination"); + update_job_state(job, SRUN_JOB_OVERDONE); + pthread_kill(job->ioid, SIGTERM); + } + last_intr_sent = time(NULL); } else { info("forcing termination"); pthread_kill(job->ioid, SIGTERM); @@ -702,7 +709,7 @@ _run_batch_job(void) job.user_id = opt.uid; - if (opt.fail_kill) + if (opt.no_kill) job.kill_on_node_fail = 0; if (opt.time_limit > -1) job.time_limit = opt.time_limit;