From 6d86358315664b5f59b7dd2d1dc7d6a0d038d9ec Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" <morrone2@llnl.gov> Date: Fri, 18 Aug 2006 19:25:29 +0000 Subject: [PATCH] First pass at handling signals in slaunch and aborting steps in the step_launch api code. --- slurm/slurm.h.in | 18 ++++++++-- src/api/step_ctx.h | 5 +-- src/api/step_launch.c | 78 +++++++++++++++++++++++++++++++++---------- src/slaunch/slaunch.c | 27 ++++++++++++--- 4 files changed, 102 insertions(+), 26 deletions(-) diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index 9200fc9f4c9..1fcf6329776 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -562,12 +562,16 @@ typedef struct { bool multi_prog; uint32_t slurmd_debug; /* remote slurmd debug level */ bool parallel_debug; - void (*task_start_callback)(launch_tasks_response_msg_t *); - void (*task_finish_callback)(task_exit_msg_t *); char *task_prolog; char *task_epilog; } slurm_job_step_launch_t; +typedef struct { + void (*task_start)(launch_tasks_response_msg_t *); + void (*task_finish)(task_exit_msg_t *); +} slurm_job_step_launch_callbacks_t; + + typedef struct { uint32_t job_id; /* job ID */ uint16_t step_id; /* step ID */ @@ -1077,7 +1081,8 @@ extern void slurm_job_step_launch_t_init PARAMS((slurm_job_step_launch_t *ptr)); * RET SLURM_SUCCESS or SLURM_ERROR (with errno set) */ extern int slurm_step_launch PARAMS((slurm_step_ctx ctx, - const slurm_job_step_launch_t *params)); + const slurm_job_step_launch_t *params, + const slurm_job_step_launch_callbacks_t *callbacks)); /* * Block until all tasks have started. @@ -1089,6 +1094,13 @@ extern int slurm_step_launch_wait_start PARAMS((slurm_step_ctx ctx)); */ extern void slurm_step_launch_wait_finish PARAMS((slurm_step_ctx ctx)); +/* + * Abort an in-progress launch, or terminate the fully launched job step. + * + * Can be called from a signal handler. + */ +void slurm_step_launch_abort PARAMS((slurm_step_ctx ctx)); + /*****************************************************************************\ * SLURM CONTROL CONFIGURATION READ/PRINT/UPDATE FUNCTIONS \*****************************************************************************/ diff --git a/src/api/step_ctx.h b/src/api/step_ctx.h index a142b5083ae..e139a7d2482 100644 --- a/src/api/step_ctx.h +++ b/src/api/step_ctx.h @@ -51,6 +51,8 @@ struct step_launch_state { int tasks_requested; bitstr_t *tasks_started; /* or attempted to start, but failed */ bitstr_t *tasks_exited; /* or never started correctly */ + bool abort; + bool abort_action_taken; /* message thread variables */ eio_handle_t *msg_handle; @@ -64,8 +66,7 @@ struct step_launch_state { step_resp, do not free */ /* user registered callbacks */ - void (*task_start_callback)(launch_tasks_response_msg_t *); - void (*task_finish_callback)(task_exit_msg_t *); + slurm_job_step_launch_callbacks_t callback; }; struct slurm_step_ctx_struct { diff --git a/src/api/step_launch.c b/src/api/step_launch.c index 88de63e9307..0226e5e93c9 100644 --- a/src/api/step_launch.c +++ b/src/api/step_launch.c @@ -119,19 +119,44 @@ void slurm_job_step_launch_t_init (slurm_job_step_launch_t *ptr) ptr->multi_prog = false; ptr->slurmd_debug = 0; ptr->parallel_debug = false; - ptr->task_start_callback = NULL; - ptr->task_finish_callback = NULL; ptr->task_prolog = NULL; ptr->task_epilog = NULL; } +/* + * Initialize launch state structure inside of the step context. + */ +static void _init_launch_state(slurm_step_ctx ctx, + const slurm_job_step_launch_callbacks_t *cb) +{ + ctx->launch_state = xmalloc(sizeof(struct step_launch_state)); + pthread_mutex_init(&ctx->launch_state->lock, NULL); + pthread_cond_init(&ctx->launch_state->cond, NULL); + ctx->launch_state->tasks_requested = ctx->step_req->num_tasks; + ctx->launch_state->tasks_started = bit_alloc(ctx->step_req->num_tasks); + ctx->launch_state->tasks_exited = bit_alloc(ctx->step_req->num_tasks); + ctx->launch_state->layout = ctx->step_resp->step_layout; + if (cb != NULL) { + /* copy the user specified callback pointers */ + memcpy(&(ctx->launch_state->callback), cb, + sizeof(slurm_job_step_launch_callbacks_t)); + } else { + /* set all callbacks to NULL */ + memset(&(ctx->launch_state->callback), 0, + sizeof(slurm_job_step_launch_callbacks_t)); + } + ctx->launch_state->abort = false; + ctx->launch_state->abort_action_taken = false; +} + /* * slurm_step_launch - launch a parallel job step * IN ctx - job step context generated by slurm_step_ctx_create * RET SLURM_SUCCESS or SLURM_ERROR (with errno set) */ int slurm_step_launch (slurm_step_ctx ctx, - const slurm_job_step_launch_t *params) + const slurm_job_step_launch_t *params, + const slurm_job_step_launch_callbacks_t *callbacks) { launch_tasks_request_msg_t launch; int i; @@ -145,20 +170,11 @@ int slurm_step_launch (slurm_step_ctx ctx, return SLURM_ERROR; } - /* Initialize launch state structure */ - ctx->launch_state = xmalloc(sizeof(struct step_launch_state)); + _init_launch_state(ctx, callbacks); if (ctx->launch_state == NULL) { error("Failed to allocate memory for step launch state: %m"); return SLURM_ERROR; } - pthread_mutex_init(&ctx->launch_state->lock, NULL); - pthread_cond_init(&ctx->launch_state->cond, NULL); - ctx->launch_state->tasks_requested = ctx->step_req->num_tasks; - ctx->launch_state->tasks_started = bit_alloc(ctx->step_req->num_tasks); - ctx->launch_state->tasks_exited = bit_alloc(ctx->step_req->num_tasks); - ctx->launch_state->task_start_callback = params->task_start_callback; - ctx->launch_state->task_finish_callback = params->task_finish_callback; - ctx->launch_state->layout = ctx->step_resp->step_layout; /* Create message receiving sockets and handler thread */ _msg_thr_create(ctx->launch_state, ctx->step_req->node_count, @@ -258,6 +274,14 @@ int slurm_step_launch_wait_start(slurm_step_ctx ctx) pthread_mutex_lock(&sls->lock); while (bit_set_count(sls->tasks_started) < sls->tasks_requested) { pthread_cond_wait(&sls->cond, &sls->lock); + if (sls->abort && !sls->abort_action_taken) { + slurm_kill_job_step(ctx->job_id, + ctx->step_resp->job_step_id, + SIGKILL); + sls->abort_action_taken = true; + pthread_mutex_unlock(&sls->lock); + return 0; + } } pthread_mutex_unlock(&sls->lock); return 1; @@ -274,6 +298,13 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx) pthread_mutex_lock(&sls->lock); while (bit_set_count(sls->tasks_exited) < sls->tasks_requested) { pthread_cond_wait(&sls->cond, &sls->lock); + if (sls->abort && !sls->abort_action_taken) { + slurm_kill_job_step(ctx->job_id, + ctx->step_resp->job_step_id, + SIGKILL); + sls->abort_action_taken = true; + break; + } } /* Then shutdown the message handler thread */ @@ -293,6 +324,19 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx) xfree(sls->resp_port); } +/* + * Abort an in-progress launch, or terminate the fully launched job step. + * + * Can be called from a signal handler. + */ +void slurm_step_launch_abort(slurm_step_ctx ctx) +{ + struct step_launch_state *sls = ctx->launch_state; + + sls->abort = true; + pthread_cond_signal(&sls->cond); +} + /********************************************************************** * Message handler functions **********************************************************************/ @@ -459,8 +503,8 @@ _launch_handler(struct step_launch_state *sls, slurm_msg_t *resp) bit_set(sls->tasks_started, msg->task_ids[i]); } - if (sls->task_start_callback != NULL) - (sls->task_start_callback)(msg); + if (sls->callback.task_start != NULL) + (sls->callback.task_start)(msg); pthread_cond_signal(&sls->cond); pthread_mutex_unlock(&sls->lock); @@ -480,8 +524,8 @@ _exit_handler(struct step_launch_state *sls, slurm_msg_t *exit_msg) bit_set(sls->tasks_exited, msg->task_id_list[i]); } - if (sls->task_finish_callback != NULL) - (sls->task_finish_callback)(msg); + if (sls->callback.task_finish != NULL) + (sls->callback.task_finish)(msg); pthread_cond_signal(&sls->cond); pthread_mutex_unlock(&sls->lock); diff --git a/src/slaunch/slaunch.c b/src/slaunch/slaunch.c index eec60c89150..11be446e226 100644 --- a/src/slaunch/slaunch.c +++ b/src/slaunch/slaunch.c @@ -119,18 +119,28 @@ static void _mpir_init(int num_tasks); static void _mpir_cleanup(void); static void _mpir_set_executable_names(const char *executable_name); static void _mpir_dump_proctable(void); +static void _ignore_signal(int signo); +static void _exit_on_signal(int signo); int slaunch(int argc, char **argv) { log_options_t logopt = LOG_OPTS_STDERR_ONLY; job_step_create_request_msg_t step_req; slurm_job_step_launch_t params; + slurm_job_step_launch_callbacks_t callbacks; int rc; uint32_t *hostids = NULL; log_init(xbasename(argv[0]), logopt, 0, NULL); char **env; int i, j; + xsignal(SIGHUP, _exit_on_signal); + xsignal(SIGINT, _exit_on_signal); + xsignal(SIGQUIT, _ignore_signal); + xsignal(SIGPIPE, _ignore_signal); + xsignal(SIGTERM, _exit_on_signal); + xsignal(SIGUSR1, _ignore_signal); + xsignal(SIGUSR2, _ignore_signal); /* Initialize plugin stack, read options from plugins, etc. */ if (spank_init(NULL) < 0) @@ -244,12 +254,12 @@ int slaunch(int argc, char **argv) _setup_local_fds(¶ms.local_fds, (int)step_ctx->job_id, (int)step_ctx->step_resp->job_step_id, hostids); params.parallel_debug = opt.parallel_debug ? true : false; - params.task_start_callback = _task_start; - params.task_finish_callback = _task_finish; + callbacks.task_start = _task_start; + callbacks.task_finish = _task_finish; _mpir_init(step_req.num_tasks); - rc = slurm_step_launch(step_ctx, ¶ms); + rc = slurm_step_launch(step_ctx, ¶ms, &callbacks); if (rc != SLURM_SUCCESS) { error("Application launch failed: %m"); goto cleanup; @@ -558,7 +568,7 @@ _terminate_job_step(slurm_step_ctx ctx) slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_JOBID, &job_id); slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_STEPID, &step_id); - info("Terminating job step"); + info("Terminating job step %u.%u"); slurm_kill_job_step(job_id, step_id, SIGKILL); } @@ -753,3 +763,12 @@ _mpir_dump_proctable() } } +static void _ignore_signal(int signo) +{ + /* do nothing */ +} + +static void _exit_on_signal(int signo) +{ + slurm_step_launch_abort(step_ctx); +} -- GitLab