diff --git a/src/sattach/opt.c b/src/sattach/opt.c index e148276b23e0e4dd44c3dd6ef9a6b9503a4e32b9..e503c99c6ca8970947f2e46a755d30cccf2e5960 100644 --- a/src/sattach/opt.c +++ b/src/sattach/opt.c @@ -72,6 +72,9 @@ #include "src/common/mpi.h" +/* generic getopt_long flags, integers and *not* valid characters */ +#define LONG_OPT_INFO_ONLY 0x100 + /*---- global variables, defined in opt.h ----*/ opt_t opt; @@ -177,6 +180,7 @@ static void _opt_default() opt.labelio = false; opt.ctrl_comm_ifhn = xshort_hostname(); memcpy(&opt.fds, &fds, sizeof(fds)); + opt.info_only = false; } /*---[ env var processing ]-----------------------------------------------*/ @@ -240,14 +244,16 @@ void set_options(const int argc, char **argv) { int opt_char, option_index = 0; static struct option long_options[] = { - {"help", no_argument, 0, 'h'}, - {"quiet", no_argument, 0, 'q'}, - {"usage", no_argument, 0, 'u'}, - {"verbose", no_argument, 0, 'v'}, - {"version", no_argument, 0, 'V'}, + {"help", no_argument, 0, 'h'}, + {"label", no_argument, 0, 'l'}, + {"quiet", no_argument, 0, 'q'}, + {"usage", no_argument, 0, 'u'}, + {"verbose", no_argument, 0, 'v'}, + {"version", no_argument, 0, 'V'}, + {"info", no_argument, 0, LONG_OPT_INFO_ONLY}, {NULL} }; - char *opt_string = "+hquvV"; + char *opt_string = "+hlquvV"; opt.progname = xbasename(argv[0]); optind = 0; @@ -263,6 +269,9 @@ void set_options(const int argc, char **argv) case 'h': _help(); exit(0); + case 'l': + opt.labelio = true; + break; case 'q': opt.quiet++; break; @@ -276,6 +285,9 @@ void set_options(const int argc, char **argv) _print_version(); exit(0); break; + case LONG_OPT_INFO_ONLY: + opt.info_only = true; + break; default: fatal("Unrecognized command line parameter %c", opt_char); diff --git a/src/sattach/opt.h b/src/sattach/opt.h index f68e12bcce297ea53ec81c0a370f75cf38961bf7..c6e97d56ed59ddc9f18d8c8cc9997c9aade5a12c 100644 --- a/src/sattach/opt.h +++ b/src/sattach/opt.h @@ -65,6 +65,7 @@ typedef struct sbatch_options { char *ctrl_comm_ifhn; bool labelio; slurm_step_io_fds_t fds; + bool info_only; } opt_t; extern opt_t opt; diff --git a/src/sattach/sattach.c b/src/sattach/sattach.c index 1d622a19742ce194bafe40bcd3c84b1bf6a7b15f..da23c60851f71f2e5dfe424589c984711dc7dbf0 100644 --- a/src/sattach/sattach.c +++ b/src/sattach/sattach.c @@ -41,6 +41,7 @@ #include <netinet/in.h> #include <sys/socket.h> #include <sys/un.h> +#include <pthread.h> #include <slurm/slurm.h> @@ -75,7 +76,7 @@ static int _attach_to_tasks(uint32_t jobid, /********************************************************************** * Message handler declarations **********************************************************************/ -struct message_thread_state { +typedef struct message_thread_state { pthread_mutex_t lock; pthread_cond_t cond; int tasks_requested; @@ -90,16 +91,12 @@ struct message_thread_state { /* set to -1 if slaunch message handler should not attempt to handle */ uint16_t num_resp_port; uint16_t *resp_port; /* array of message response ports */ - - /* user registered callbacks */ - slurm_job_step_launch_callbacks_t callback; -}; - -static int _msg_thr_create(struct message_thread_state *sls, int num_nodes); -static void _handle_msg(struct message_thread_state *sls, slurm_msg_t *msg); +} message_thread_state_t; +static message_thread_state_t *_msg_thr_create(int num_nodes, int num_tasks); +static void _msg_thr_destroy(message_thread_state_t *mts); +static void _handle_msg(message_thread_state_t *mts, slurm_msg_t *msg); static bool _message_socket_readable(eio_obj_t *obj); static int _message_socket_accept(eio_obj_t *obj, List objs); - static struct io_operations message_socket_ops = { readable: &_message_socket_readable, handle_read: &_message_socket_accept @@ -113,7 +110,7 @@ int main(int argc, char *argv[]) log_options_t logopt = LOG_OPTS_STDERR_ONLY; slurm_step_layout_t *layout; slurm_cred_t fake_cred; - struct message_thread_state mts; + message_thread_state_t *mts; client_io_t *io; log_init(xbasename(argv[0]), logopt, 0, NULL); @@ -133,31 +130,30 @@ int main(int argc, char *argv[]) error("Could not get job step info: %m"); return 1; } + if (opt.info_only) { + print_layout_info(layout); + exit(0); + } fake_cred = _generate_fake_cred(opt.jobid, opt.stepid, opt.uid, layout->node_list); - memset(&mts, 0, sizeof(struct message_thread_state)); - _msg_thr_create(&mts, layout->node_cnt); + mts = _msg_thr_create(layout->node_cnt, layout->task_cnt); io = client_io_handler_create(opt.fds, layout->task_cnt, layout->node_cnt, fake_cred, opt.labelio); - - print_layout_info(layout); - client_io_handler_start(io); _attach_to_tasks(opt.jobid, opt.stepid, layout, fake_cred, - mts.num_resp_port, mts.resp_port, + mts->num_resp_port, mts->resp_port, io->num_listen, io->listenport); - sleep(300); + _msg_thr_destroy(mts); slurm_job_step_layout_free(layout); client_io_handler_finish(io); client_io_handler_destroy(io); - return 0; } @@ -166,16 +162,18 @@ static void print_layout_info(slurm_step_layout_t *layout) hostlist_t nl; int i, j; - info("node count = %d", layout->node_cnt); - info("total task count = %d", layout->task_cnt); - info("node names = \"%s\"", layout->node_list); + printf("Job step layout:\n"); + printf("\t%d tasks, %d nodes (%s)\n\n", + layout->task_cnt, layout->node_cnt, layout->node_list); nl = hostlist_create(layout->node_list); for (i = 0; i < layout->node_cnt; i++) { char *name = hostlist_nth(nl, i); - info("%s: node %d, tasks %d", name, i, layout->tasks[i]); + printf("\tNode %d (%s), %d task(s): ", + i, name, layout->tasks[i]); for (j = 0; j < layout->tasks[i]; j++) { - info("\ttask %d", layout->tids[i][j]); + printf("%d ", layout->tids[i][j]); } + printf("\n"); free(name); } } @@ -313,7 +311,6 @@ static int _attach_to_tasks(uint32_t jobid, int timeout; reattach_tasks_request_msg_t reattach_msg; - debug("Entering _attach_to_tasks"); slurm_msg_t_init(&msg); slurm_msg_t_init(&first_node_resp); @@ -379,14 +376,12 @@ static int _attach_to_tasks(uint32_t jobid, } - - /********************************************************************** * Message handler functions **********************************************************************/ static void *_msg_thr_internal(void *arg) { - struct message_thread_state *mts = (struct message_thread_state *)arg; + message_thread_state_t *mts = (message_thread_state_t *)arg; eio_handle_mainloop(mts->msg_handle); @@ -401,22 +396,28 @@ _estimate_nports(int nclients, int cli_per_port) return d.rem > 0 ? d.quot + 1 : d.quot; } -static int _msg_thr_create(struct message_thread_state *mts, int num_nodes) +static message_thread_state_t *_msg_thr_create(int num_nodes, int num_tasks) { int sock = -1; short port = -1; eio_obj_t *obj; int i; + message_thread_state_t *mts; debug("Entering _msg_thr_create()"); - + mts = (message_thread_state_t *)xmalloc(sizeof(message_thread_state_t)); + pthread_mutex_init(&mts->lock, NULL); + pthread_cond_init(&mts->cond, NULL); + mts->tasks_started = bit_alloc(num_tasks); + mts->tasks_exited = bit_alloc(num_tasks); mts->msg_handle = eio_handle_create(); mts->num_resp_port = _estimate_nports(num_nodes, 48); mts->resp_port = xmalloc(sizeof(uint16_t) * mts->num_resp_port); for (i = 0; i < mts->num_resp_port; i++) { if (net_stream_listen(&sock, &port) < 0) { - error("unable to intialize step launch listening socket: %m"); - return SLURM_ERROR; + error("unable to intialize step launch" + " listening socket: %m"); + goto fail; } mts->resp_port[i] = port; obj = eio_obj_create(sock, &message_socket_ops, (void *)mts); @@ -426,10 +427,26 @@ static int _msg_thr_create(struct message_thread_state *mts, int num_nodes) if (pthread_create(&mts->msg_thread, NULL, _msg_thr_internal, (void *)mts) != 0) { error("pthread_create of message thread: %m"); - return SLURM_ERROR; + goto fail; } - return SLURM_SUCCESS; + return mts; +fail: + eio_handle_destroy(mts->msg_handle); + xfree(mts->resp_port); + xfree(mts); + return NULL; +} + +static void _msg_thr_destroy(message_thread_state_t *mts) +{ + eio_signal_shutdown(mts->msg_handle); + pthread_join(mts->msg_thread, NULL); + eio_handle_destroy(mts->msg_handle); + pthread_mutex_destroy(&mts->lock); + pthread_cond_destroy(&mts->cond); + bit_free(mts->tasks_started); + bit_free(mts->tasks_exited); } static bool _message_socket_readable(eio_obj_t *obj) @@ -451,7 +468,7 @@ static bool _message_socket_readable(eio_obj_t *obj) static int _message_socket_accept(eio_obj_t *obj, List objs) { - struct message_thread_state *mts = (struct message_thread_state *)obj->arg; + message_thread_state_t *mts = (message_thread_state_t *)obj->arg; int fd; unsigned char *uc; @@ -524,7 +541,7 @@ cleanup: } static void -_launch_handler(struct message_thread_state *mts, slurm_msg_t *resp) +_launch_handler(message_thread_state_t *mts, slurm_msg_t *resp) { launch_tasks_response_msg_t *msg = resp->data; int i; @@ -535,19 +552,17 @@ _launch_handler(struct message_thread_state *mts, slurm_msg_t *resp) bit_set(mts->tasks_started, msg->task_ids[i]); } - if (mts->callback.task_start != NULL) - (mts->callback.task_start)(msg); - pthread_cond_signal(&mts->cond); pthread_mutex_unlock(&mts->lock); } static void -_exit_handler(struct message_thread_state *mts, slurm_msg_t *exit_msg) +_exit_handler(message_thread_state_t *mts, slurm_msg_t *exit_msg) { task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data; int i; + int rc; pthread_mutex_lock(&mts->lock); @@ -556,15 +571,31 @@ _exit_handler(struct message_thread_state *mts, slurm_msg_t *exit_msg) bit_set(mts->tasks_exited, msg->task_id_list[i]); } - if (mts->callback.task_finish != NULL) - (mts->callback.task_finish)(msg); + verbose("%d tasks finished (rc=%u)", + msg->num_tasks, msg->return_code); + if (WIFEXITED(msg->return_code)) { + rc = WEXITSTATUS(msg->return_code); + if (rc != 0) { + for (i = 0; i < msg->num_tasks; i++) { + error("task %u exited with exit code %d", + msg->task_id_list[i], rc); + } + } + } else if (WIFSIGNALED(msg->return_code)) { + for (i = 0; i < msg->num_tasks; i++) { + verbose("task %u killed by signal %d", + msg->task_id_list[i], + WTERMSIG(msg->return_code)); + } + rc = 1; + } pthread_cond_signal(&mts->cond); pthread_mutex_unlock(&mts->lock); } static void -_handle_msg(struct message_thread_state *mts, slurm_msg_t *msg) +_handle_msg(message_thread_state_t *mts, slurm_msg_t *msg) { uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred); static uid_t slurm_uid;