From 0332b3e5c88b907abf19f5106314c0fcea333806 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Wed, 22 Jan 2003 21:43:57 +0000 Subject: [PATCH] scancel accepts additional signals now. slurmctld accepts step kill RPC and forwards the signals to slurmd(s). --- src/slurmctld/agent.h | 2 +- src/slurmctld/controller.c | 22 ++++----- src/slurmctld/slurmctld.h | 13 ++++++ src/slurmctld/step_mgr.c | 94 +++++++++++++++++++++++++++++++------- 4 files changed, 102 insertions(+), 29 deletions(-) diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h index 3512d850867..dbb0b3330a9 100644 --- a/src/slurmctld/agent.h +++ b/src/slurmctld/agent.h @@ -34,7 +34,7 @@ #define AGENT_IS_THREAD 1 /* set if agent itself a thread of * slurmctld, 0 for function call */ -#define AGENT_THREAD_COUNT 20 /* maximum active agent threads */ +#define AGENT_THREAD_COUNT 10 /* maximum active agent threads */ #define COMMAND_TIMEOUT 5 /* seconds */ typedef struct agent_arg { diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index b6c5b2ad291..5a87e746ebe 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -121,7 +121,7 @@ inline static void _slurm_rpc_dump_conf(slurm_msg_t * msg); inline static void _slurm_rpc_dump_nodes(slurm_msg_t * msg); inline static void _slurm_rpc_dump_partitions(slurm_msg_t * msg); inline static void _slurm_rpc_dump_jobs(slurm_msg_t * msg); -inline static void _slurm_rpc_job_step_cancel(slurm_msg_t * msg); +inline static void _slurm_rpc_job_step_kill(slurm_msg_t * msg); inline static void _slurm_rpc_job_step_complete(slurm_msg_t * msg); inline static void _slurm_rpc_job_step_create(slurm_msg_t * msg); inline static void _slurm_rpc_job_step_get_info(slurm_msg_t * msg); @@ -697,7 +697,7 @@ static void _slurmctld_req (slurm_msg_t * msg) slurm_free_job_desc_msg(msg->data); break; case REQUEST_CANCEL_JOB_STEP: - _slurm_rpc_job_step_cancel(msg); + _slurm_rpc_job_step_kill(msg); slurm_free_job_step_kill_msg(msg->data); break; case REQUEST_COMPLETE_JOB_STEP: @@ -918,9 +918,9 @@ static void _slurm_rpc_dump_partitions(slurm_msg_t * msg) } } -/* _slurm_rpc_job_step_cancel - process RPC to cancel an entire job or +/* _slurm_rpc_job_step_kill - process RPC to cancel an entire job or * an individual job step */ -static void _slurm_rpc_job_step_cancel(slurm_msg_t * msg) +static void _slurm_rpc_job_step_kill(slurm_msg_t * msg) { /* init */ int error_code = 0; @@ -946,13 +946,13 @@ static void _slurm_rpc_job_step_cancel(slurm_msg_t * msg) /* return result */ if (error_code) { info( - "_slurm_rpc_job_step_cancel error %d for %u, time=%ld", + "_slurm_rpc_job_step_kill error %d for %u, time=%ld", error_code, job_step_kill_msg->job_id, (long) (clock() - start_time)); slurm_send_rc_msg(msg, error_code); } else { info( - "_slurm_rpc_job_step_cancel success for JobId=%u, time=%ld", + "_slurm_rpc_job_step_kill success for JobId=%u, time=%ld", job_step_kill_msg->job_id, (long) (clock() - start_time)); slurm_send_rc_msg(msg, SLURM_SUCCESS); @@ -963,29 +963,27 @@ static void _slurm_rpc_job_step_cancel(slurm_msg_t * msg) } } else { - error_code = job_step_cancel(job_step_kill_msg->job_id, + error_code = job_step_signal(job_step_kill_msg->job_id, job_step_kill_msg->job_step_id, + job_step_kill_msg->signal, uid); unlock_slurmctld(job_write_lock); /* return result */ if (error_code) { info( - "_slurm_rpc_job_step_cancel error %d for %u.%u, time=%ld", + "_slurm_rpc_job_step_kill error %d for %u.%u, time=%ld", error_code, job_step_kill_msg->job_id, job_step_kill_msg->job_step_id, (long) (clock() - start_time)); slurm_send_rc_msg(msg, error_code); } else { info( - "_slurm_rpc_job_step_cancel success for %u.%u, time=%ld", + "_slurm_rpc_job_step_kill success for %u.%u, time=%ld", job_step_kill_msg->job_id, job_step_kill_msg->job_step_id, (long) (clock() - start_time)); slurm_send_rc_msg(msg, SLURM_SUCCESS); - - /* Below function provides its own locking */ - (void) dump_all_job_state(); } } } diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 95ffc8d0bbb..3e190b2af36 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -627,6 +627,19 @@ extern int job_complete (uint32_t job_id, uid_t uid, bool requeue, extern int job_step_complete (uint32_t job_id, uint32_t job_step_id, uid_t uid, bool requeue, uint32_t job_return_code); +/* + * job_step_signal - signal the specified job step + * IN job_id - id of the job to be cancelled + * IN step_id - id of the job step to be cancelled + * IN signal - user id of user issuing the RPC + * IN uid - user id of user issuing the RPC + * RET 0 on success, otherwise ESLURM error code + * global: job_list - pointer global job list + * last_job_update - time of last job table update + */ +extern int job_step_signal(uint32_t job_id, uint32_t step_id, + uint16_t signal, uid_t uid); + /* * job_time_limit - terminate jobs which have exceeded their time limit * global: job_list - pointer global job list diff --git a/src/slurmctld/step_mgr.c b/src/slurmctld/step_mgr.c index b8a97df2e58..6d94444814d 100644 --- a/src/slurmctld/step_mgr.c +++ b/src/slurmctld/step_mgr.c @@ -44,12 +44,14 @@ #include "src/common/bitstring.h" #include "src/common/slurm_errno.h" +#include "src/slurmctld/agent.h" #include "src/slurmctld/locks.h" #include "src/slurmctld/slurmctld.h" static void _pack_ctld_job_step_info(struct step_record *step, Buf buffer); static bitstr_t * _pick_step_nodes (struct job_record *job_ptr, step_specs *step_spec ); +static void _signal_step_tasks(struct step_record *step_ptr, uint16_t signal); /* * create_step_record - create an empty step_record for the specified job. @@ -192,18 +194,20 @@ find_step_record(struct job_record *job_ptr, uint16_t step_id) /* - * job_step_cancel - cancel the specified job step + * job_step_signal - signal the specified job step * IN job_id - id of the job to be cancelled * IN step_id - id of the job step to be cancelled + * IN signal - user id of user issuing the RPC * IN uid - user id of user issuing the RPC * RET 0 on success, otherwise ESLURM error code * global: job_list - pointer global job list * last_job_update - time of last job table update */ -int job_step_cancel(uint32_t job_id, uint32_t step_id, uid_t uid) +int job_step_signal(uint32_t job_id, uint32_t step_id, + uint16_t signal, uid_t uid) { struct job_record *job_ptr; - int error_code; + struct step_record *step_ptr; job_ptr = find_job_record(job_id); if (job_ptr == NULL) { @@ -224,23 +228,81 @@ int job_step_cancel(uint32_t job_id, uint32_t step_id, uid_t uid) return ESLURM_USER_ID_MISSING; } - if (job_ptr->job_state == JOB_RUNNING) { - last_job_update = time(NULL); - error_code = delete_step_record(job_ptr, step_id); - if (error_code == ENOENT) { - info("job_step_cancel step %u.%u not found", - job_id, step_id); - return ESLURM_ALREADY_DONE; - } + step_ptr = find_step_record(job_ptr, (uint16_t)step_id); + if (step_ptr == NULL) { + info("job_step_cancel step %u.%u not found", + job_id, step_id); + return ESLURM_ALREADY_DONE; + } + + _signal_step_tasks(step_ptr, signal); + return SLURM_SUCCESS; - job_ptr->time_last_active = time(NULL); - return SLURM_SUCCESS; +} + +static void _signal_step_tasks(struct step_record *step_ptr, uint16_t signal) +{ + int i; + kill_tasks_msg_t *kill_tasks_msg; + agent_arg_t *agent_args; + pthread_attr_t attr_agent; + pthread_t thread_agent; + int buf_rec_size = 0; + + agent_args = xmalloc(sizeof(agent_arg_t)); + agent_args->msg_type = REQUEST_KILL_TASKS; + agent_args->retry = 1; + kill_tasks_msg = xmalloc(sizeof(kill_tasks_msg_t)); + kill_tasks_msg->job_id = step_ptr->job_ptr->job_id; + kill_tasks_msg->job_step_id = step_ptr->step_id; + kill_tasks_msg->signal = signal; + + for (i = 0; i < node_record_count; i++) { + if (bit_test(step_ptr->step_node_bitmap, i) == 0) + continue; + if ((agent_args->node_count + 1) > buf_rec_size) { + buf_rec_size += 32; + xrealloc((agent_args->slurm_addr), + (sizeof(struct sockaddr_in) * + buf_rec_size)); + xrealloc((agent_args->node_names), + (MAX_NAME_LEN * buf_rec_size)); + } + agent_args->slurm_addr[agent_args->node_count] = + node_record_table_ptr[i].slurm_addr; + strncpy(&agent_args-> + node_names[MAX_NAME_LEN * agent_args->node_count], + node_record_table_ptr[i].name, MAX_NAME_LEN); + agent_args->node_count++; } - info("job_step_cancel: step %u.%u can't be cancelled from state=%s", - job_id, step_id, job_state_string(job_ptr->job_state)); - return ESLURM_TRANSITION_STATE_NO_UPDATE; + if (agent_args->node_count == 0) { + xfree(kill_tasks_msg); + xfree(agent_args); + return; + } + agent_args->msg_args = kill_tasks_msg; + debug("Spawning signal agent"); + if (pthread_attr_init(&attr_agent)) + fatal("pthread_attr_init error %m"); + if (pthread_attr_setdetachstate + (&attr_agent, PTHREAD_CREATE_DETACHED)) + error("pthread_attr_setdetachstate error %m"); +#ifdef PTHREAD_SCOPE_SYSTEM + if (pthread_attr_setscope(&attr_agent, PTHREAD_SCOPE_SYSTEM)) + error("pthread_attr_setscope error %m"); +#endif + if (pthread_create + (&thread_agent, &attr_agent, agent, (void *) agent_args)) { + error("pthread_create error %m"); + sleep(1); /* sleep and try once more */ + if (pthread_create + (&thread_agent, &attr_agent, agent, + (void *) agent_args)) + fatal("pthread_create error %m"); + } + return; } -- GitLab