diff --git a/NEWS b/NEWS index 8371723ba5f1802dc5c4ce909ae13e9a61a97279..fe6a9aefc0ee981536eb085ab21086c70a9a2cce 100644 --- a/NEWS +++ b/NEWS @@ -29,6 +29,13 @@ documents those changes that are of interest to users and admins. formatted in XYZxXYZ format. -- Add support for partition configuration "Shared=exclusive". This is equivalent to "srun --exclusive" when select/cons_res is configured. + -- In sched/wiki2, report the reason for a node being unavailable for the + GETNODES command using the CAT="<reason>" field. + -- In sched/wiki2 with select/linear, duplicate hostnames in HOSTLIST, one + per allocated processor. + -- Fix bug in scancel with specific signal and job lacks active steps. + -- In sched/wiki2, add support for NOTIFYJOB ARG=<jobid> MSG=<message>. + This sends a message to an active srun command. * Changes in SLURM 1.2.11 ========================= diff --git a/doc/man/man1/scancel.1 b/doc/man/man1/scancel.1 index e4248988c764c733302687da6503a1ce3f0bd99c..691cdacf935e128c1654bc096e999a0bc639b2df 100644 --- a/doc/man/man1/scancel.1 +++ b/doc/man/man1/scancel.1 @@ -15,7 +15,7 @@ be printed and the job will not be signaled. .TP \fB\-b\fR, \fB\-\-batch\fR -Signal only the batch job shell. +Signal the batch job shell and its child processes. .TP \fB\-\-help\fR @@ -61,17 +61,22 @@ Print additional logging. Multiple v's increase logging detail. This option is incompatible with the \fB\-\-quiet\fR option. .TP \fB\-V\fR, \fB\-\-Version\fR -Print the version number of the scontrol command. +Print the version number of the scancel command. + .TP ARGUMENTS .TP \fIjob_id\fP -The Slurm job ID of the job to have one or more of its steps signaled. +The Slurm job ID to be signaled. +If no step_id is specified and the \fB\-\-batch\fR option is not used, +then all jobs steps associated with the provided job_id will be signaled. +If no job steps are active then no processes will be signalled. +Any batch script which spawned the job steps will only be signalled if +the \fB\-\-batch\fR option is used. .TP \fIstep_id\fP -The step ID of the job step to be signaled. If none is provided and the -\fB\-\-batch\fR option is not used, then all jobs steps associated with -the provided job_id will be signaled. +The step ID of the job step to be signaled. + .SH "ENVIRONMENT VARIABLES" .PP Some \fBscancel\fR options may be set via environment variables. These diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index c2350f3b64d7e26d50a094ef6dc855b1c67e3b55..6efdb2ff79dd96103acaea120087898fd0cfd6bc 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -500,6 +500,14 @@ void inline slurm_free_srun_timeout_msg(srun_timeout_msg_t * msg) xfree(msg); } +void inline slurm_free_srun_user_msg(srun_user_msg_t * user_msg) +{ + if (user_msg) { + xfree(user_msg->msg); + xfree(user_msg); + } +} + void inline slurm_free_checkpoint_msg(checkpoint_msg_t *msg) { xfree(msg); diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index b3576721939495df6ca0508844fdf6707814bda7..8d72e4c3a90cea6d028e676cfe86b8c238640ecc 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -183,6 +183,7 @@ typedef enum { SRUN_TIMEOUT, SRUN_NODE_FAIL, SRUN_JOB_COMPLETE, + SRUN_USER_MSG, PMI_KVS_PUT_REQ = 7201, PMI_KVS_PUT_RESP, @@ -593,6 +594,11 @@ typedef struct suspend_msg { uint32_t job_id; /* slurm job_id */ } suspend_msg_t; +typedef struct srun_user_msg { + uint32_t job_id; /* slurm job_id */ + char *msg; /* message to user's srun */ +} srun_user_msg_t; + typedef struct kvs_get_msg { uint16_t task_id; /* job step's task id */ uint16_t size; /* count of tasks in job */ @@ -743,6 +749,7 @@ void inline slurm_free_srun_job_complete_msg(srun_job_complete_msg_t * msg); void inline slurm_free_srun_ping_msg(srun_ping_msg_t * msg); void inline slurm_free_srun_node_fail_msg(srun_node_fail_msg_t * msg); void inline slurm_free_srun_timeout_msg(srun_timeout_msg_t * msg); +void inline slurm_free_srun_user_msg(srun_user_msg_t * msg); void inline slurm_free_checkpoint_msg(checkpoint_msg_t *msg); void inline slurm_free_checkpoint_comp_msg(checkpoint_comp_msg_t *msg); void inline slurm_free_checkpoint_resp_msg(checkpoint_resp_msg_t *msg); diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 4a51f9f9591f28690130b11a3c3a54123c4c2091..0a24f87bcf7b9696ed4f34d768d33f1dfa15a25a 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -280,8 +280,11 @@ static int _unpack_srun_node_fail_msg(srun_node_fail_msg_t ** msg_ptr, Buf buffer); static void _pack_srun_timeout_msg(srun_timeout_msg_t * msg, Buf buffer); -static int -_unpack_srun_timeout_msg(srun_timeout_msg_t ** msg_ptr, Buf buffer); +static int _unpack_srun_timeout_msg(srun_timeout_msg_t ** msg_ptr, + Buf buffer); + +static void _pack_srun_user_msg(srun_user_msg_t * msg, Buf buffer); +static int _unpack_srun_user_msg(srun_user_msg_t ** msg_ptr, Buf buffer); static void _pack_checkpoint_msg(checkpoint_msg_t *msg, Buf buffer); static int _unpack_checkpoint_msg(checkpoint_msg_t **msg_ptr, Buf buffer); @@ -622,6 +625,9 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) case SRUN_TIMEOUT: _pack_srun_timeout_msg((srun_timeout_msg_t *)msg->data, buffer); break; + case SRUN_USER_MSG: + _pack_srun_user_msg((srun_user_msg_t *)msg->data, buffer); + break; case REQUEST_CHECKPOINT: _pack_checkpoint_msg((checkpoint_msg_t *)msg->data, buffer); break; @@ -933,6 +939,10 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) rc = _unpack_srun_timeout_msg((srun_timeout_msg_t **) & msg->data, buffer); break; + case SRUN_USER_MSG: + rc = _unpack_srun_user_msg((srun_user_msg_t **) + & msg->data, buffer); + break; case REQUEST_CHECKPOINT: rc = _unpack_checkpoint_msg((checkpoint_msg_t **) & msg->data, buffer); @@ -3745,6 +3755,33 @@ unpack_error: return SLURM_ERROR; } +static void +_pack_srun_user_msg(srun_user_msg_t * msg, Buf buffer) +{ + xassert ( msg != NULL ); + + pack32((uint32_t)msg->job_id, buffer); + packstr(msg->msg, buffer); +} + +static int +_unpack_srun_user_msg(srun_user_msg_t ** msg_ptr, Buf buffer) +{ + uint16_t uint16_tmp; + srun_user_msg_t * msg_user; + xassert ( msg_ptr != NULL ); + + msg_user = xmalloc(sizeof (srun_user_msg_t)) ; + *msg_ptr = msg_user; + + safe_unpack32(&msg_user->job_id, buffer); + safe_unpackstr_xmalloc(&msg_user->msg, &uint16_tmp, buffer); + return SLURM_SUCCESS; + +unpack_error: + return SLURM_ERROR; +} + static void _pack_suspend_msg(suspend_msg_t *msg, Buf buffer) { xassert ( msg != NULL ); diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index b712719544e3e4b421fe2d0ca1aafc43c4b88f9d..d8443f680a3f0fe074daf15acfb7647abfb42f4e 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -622,6 +622,9 @@ again: rc = poll(&ufds, 1, 5000); done: fcntl(__fd, F_SETFL, flags); + /* NOTE: Connection refused is typically reported for + * non-responsived nodes plus attempts to communicate + * with terminated srun commands. */ if (err) { slurm_seterrno(err); debug2("_slurm_connect failed: %m"); diff --git a/src/plugins/sched/wiki2/Makefile.am b/src/plugins/sched/wiki2/Makefile.am index 3aedd1c33b3283ab40e2381fab5f39469930966e..22cbc7e54e7d3726c4769705f818a02f56d58e04 100644 --- a/src/plugins/sched/wiki2/Makefile.am +++ b/src/plugins/sched/wiki2/Makefile.am @@ -20,6 +20,7 @@ sched_wiki2_la_SOURCES = \ initialize.c \ job_add_task.c \ job_modify.c \ + job_notify.c \ job_release_task.c \ job_requeue.c \ job_signal.c \ diff --git a/src/plugins/sched/wiki2/Makefile.in b/src/plugins/sched/wiki2/Makefile.in index a2a139a837e13870769861e5d3b2d5e0c32d476c..e70dde31a1e7dc3207107c97fe356f94c66605ae 100644 --- a/src/plugins/sched/wiki2/Makefile.in +++ b/src/plugins/sched/wiki2/Makefile.in @@ -77,9 +77,9 @@ LTLIBRARIES = $(pkglib_LTLIBRARIES) sched_wiki2_la_LIBADD = am_sched_wiki2_la_OBJECTS = cancel_job.lo crypto.lo event.lo \ get_jobs.lo get_nodes.lo initialize.lo job_add_task.lo \ - job_modify.lo job_release_task.lo job_requeue.lo job_signal.lo \ - job_will_run.lo msg.lo resume_job.lo sched_wiki.lo \ - start_job.lo suspend_job.lo + job_modify.lo job_notify.lo job_release_task.lo job_requeue.lo \ + job_signal.lo job_will_run.lo msg.lo resume_job.lo \ + sched_wiki.lo start_job.lo suspend_job.lo sched_wiki2_la_OBJECTS = $(am_sched_wiki2_la_OBJECTS) sched_wiki2_la_LINK = $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ @@ -274,6 +274,7 @@ sched_wiki2_la_SOURCES = \ initialize.c \ job_add_task.c \ job_modify.c \ + job_notify.c \ job_release_task.c \ job_requeue.c \ job_signal.c \ @@ -363,6 +364,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/initialize.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job_add_task.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job_modify.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job_notify.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job_release_task.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job_requeue.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/job_signal.Plo@am__quote@ diff --git a/src/plugins/sched/wiki2/get_jobs.c b/src/plugins/sched/wiki2/get_jobs.c index 8ad1e613c240f03c754a4cb5ace0d9b59272e921..2b2ee272babe3baeeca963a8abf743ca79a13707 100644 --- a/src/plugins/sched/wiki2/get_jobs.c +++ b/src/plugins/sched/wiki2/get_jobs.c @@ -239,12 +239,7 @@ static char * _dump_job(struct job_record *job_ptr, int state_info) } } else if (!IS_JOB_FINISHED(job_ptr)) { char *hosts; - if (job_ptr->cr_enabled) { - hosts = _full_task_list(job_ptr); - } else { - hosts = bitmap2wiki_node_name( - job_ptr->node_bitmap); - } + hosts = _full_task_list(job_ptr); xstrcat(buf, "TASKLIST="); xstrcat(buf, hosts); xstrcat(buf, ";"); diff --git a/src/plugins/sched/wiki2/get_nodes.c b/src/plugins/sched/wiki2/get_nodes.c index fda7575eaf1278c9ec83ac64427203b9908ca37e..90076e7090ee200bee25f76096f4658eb393be35 100644 --- a/src/plugins/sched/wiki2/get_nodes.c +++ b/src/plugins/sched/wiki2/get_nodes.c @@ -62,6 +62,7 @@ static char * _get_node_state(struct node_record *node_ptr); * CDISK=<MB>; MB of disk space on node * CPROCS=<cpus>; CPU count on node * [FEATURE=<feature>;] features associated with node, if any, + * [CAT=<reason>]; Reason for a node being down or drained * colon separator * [#<NODEID>:...]; */ @@ -163,6 +164,10 @@ static char * _dump_node(struct node_record *node_ptr, int state_info) node_ptr->name, _get_node_state(node_ptr)); xstrcat(buf, tmp); + if (node_ptr->reason) { + snprintf(tmp, sizeof(tmp), "CAT=\"%s\";", node_ptr->reason); + xstrcat(buf, tmp); + } if (state_info == SLURM_INFO_STATE) return buf; diff --git a/src/plugins/sched/wiki2/job_modify.c b/src/plugins/sched/wiki2/job_modify.c index a35fb7447bdd2c48ee7eb8d6d574ac41d870b2ae..82f17550ae70e03b45e6d48329bee1232fa9a7c5 100644 --- a/src/plugins/sched/wiki2/job_modify.c +++ b/src/plugins/sched/wiki2/job_modify.c @@ -82,7 +82,7 @@ static int _job_modify(uint32_t jobid, char *bank_ptr, return ESLURM_INVALID_JOB_ID; } if (IS_JOB_FINISHED(job_ptr)) { - error("wiki: MODIFYJOB jobid %d is finished", jobid); + error("wiki: MODIFYJOB jobid %u is finished", jobid); return ESLURM_DISABLED; } @@ -144,7 +144,10 @@ static int _job_modify(uint32_t jobid, char *bank_ptr, return SLURM_SUCCESS; } -/* RET 0 on success, -1 on failure */ +/* Modify a job: + * CMD=MODIFYJOB ARG=<jobid> PARTITION=<name> NODES=<number> + * DEPEND=afterany:<jobid> TIMELIMT=<seconds> BANK=<name> + * RET 0 on success, -1 on failure */ extern int job_modify_wiki(char *cmd_ptr, int *err_code, char **err_msg) { char *arg_ptr, *bank_ptr, *depend_ptr, *nodes_ptr; diff --git a/src/plugins/sched/wiki2/job_notify.c b/src/plugins/sched/wiki2/job_notify.c new file mode 100644 index 0000000000000000000000000000000000000000..ac7ed5217faf57e78248676b8440e6e881eeb713 --- /dev/null +++ b/src/plugins/sched/wiki2/job_notify.c @@ -0,0 +1,106 @@ +/*****************************************************************************\ + * job_notify.c - Process Wiki job notify request + ***************************************************************************** + * Copyright (C) 2006-2007 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Morris Jette <jette1@llnl.gov> + * UCRL-CODE-226842. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * In addition, as a special exception, the copyright holders give permission + * to link the code of portions of this program with the OpenSSL library under + * certain conditions as described in each individual source file, and + * distribute linked combinations including the two. You must obey the GNU + * General Public License in all respects for all of the code used other than + * OpenSSL. If you modify file(s) with this exception, you may extend this + * exception to your version of the file(s), but you are not obligated to do + * so. If you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files in + * the program, then also delete it here. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +\*****************************************************************************/ + +#include "./msg.h" +#include <strings.h> +#include "src/slurmctld/locks.h" +#include "src/slurmctld/slurmctld.h" +#include "src/slurmctld/srun_comm.h" + +static int _job_notify(uint32_t jobid, char *msg_ptr) +{ + struct job_record *job_ptr; + + job_ptr = find_job_record(jobid); + if (job_ptr == NULL) { + error("wiki: NOTIFYJOB has invalid jobid %u", jobid); + return ESLURM_INVALID_JOB_ID; + } + if (IS_JOB_FINISHED(job_ptr)) { + error("wiki: NOTIFYJOB jobid %u is finished", jobid); + return ESLURM_INVALID_JOB_ID; + } + srun_user_message(job_ptr, msg_ptr); + return SLURM_SUCCESS; +} + +/* Notify a job via arbitrary message: + * CMD=NOTIFYJOB ARG=<jobid> MSG=<string> + * RET 0 on success, -1 on failure */ +extern int job_notify_wiki(char *cmd_ptr, int *err_code, char **err_msg) +{ + char *arg_ptr, *msg_ptr; + int slurm_rc; + uint32_t jobid; + static char reply_msg[128]; + /* Locks: read job */ + slurmctld_lock_t job_read_lock = { + NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; + + arg_ptr = strstr(cmd_ptr, "ARG="); + if (arg_ptr == NULL) { + *err_code = -300; + *err_msg = "NOTIFYJOB lacks ARG="; + error("wiki: NOTIFYJOB lacks ARG="); + return -1; + } + arg_ptr += 4; + jobid = atol(arg_ptr); + msg_ptr = strstr(cmd_ptr, "MSG="); + if (msg_ptr == NULL) { + *err_code = -300; + *err_msg = "NOTIFYJOB lacks MSG="; + error("wiki: NOTIFYJOB lacks MSG="); + return -1; + } + msg_ptr += 4; + + lock_slurmctld(job_read_lock); + slurm_rc = _job_notify(jobid, msg_ptr); + unlock_slurmctld(job_read_lock); + if (slurm_rc != SLURM_SUCCESS) { + *err_code = -700; + *err_msg = slurm_strerror(slurm_rc); + error("wiki: Failed to notify job %u (%m)", jobid); + return -1; + } + + snprintf(reply_msg, sizeof(reply_msg), + "job %u notified successfully", jobid); + *err_msg = reply_msg; + return 0; +} diff --git a/src/plugins/sched/wiki2/msg.c b/src/plugins/sched/wiki2/msg.c index 16fefe4bfa7b811c1093d7f9a8500f4ebcd6309f..e3abfb9227ff3a2e8f305eb6fbe305d5e86f4259 100644 --- a/src/plugins/sched/wiki2/msg.c +++ b/src/plugins/sched/wiki2/msg.c @@ -560,6 +560,8 @@ static void _proc_msg(slurm_fd new_fd, char *msg) job_will_run(cmd_ptr, &err_code, &err_msg); } else if (strncmp(cmd_ptr, "MODIFYJOB", 9) == 0) { job_modify_wiki(cmd_ptr, &err_code, &err_msg); + } else if (strncmp(cmd_ptr, "NOTIFYJOB", 9) == 0) { + job_notify_wiki(cmd_ptr, &err_code, &err_msg); } else if (strncmp(cmd_ptr, "SIGNALJOB", 9) == 0) { job_signal_wiki(cmd_ptr, &err_code, &err_msg); } else if (strncmp(cmd_ptr, "INITIALIZE", 10) == 0) { diff --git a/src/plugins/sched/wiki2/msg.h b/src/plugins/sched/wiki2/msg.h index ac4e096b5f8337ba06684fe51033124fc74a53c6..264cbb3ae568cce7ae0fd903f927116d460a0a7e 100644 --- a/src/plugins/sched/wiki2/msg.h +++ b/src/plugins/sched/wiki2/msg.h @@ -107,6 +107,7 @@ extern int get_nodes(char *cmd_ptr, int *err_code, char **err_msg); extern int initialize_wiki(char *cmd_ptr, int *err_code, char **err_msg); extern int job_add_task(char *cmd_ptr, int *err_code, char **err_msg); extern int job_modify_wiki(char *cmd_ptr, int *err_code, char **err_msg); +extern int job_notify_wiki(char *cmd_ptr, int *err_code, char **err_msg); extern int job_release_task(char *cmd_ptr, int *err_code, char **err_msg); extern int job_requeue_wiki(char *cmd_ptr, int *err_code, char **err_msg); extern int job_signal_wiki(char *cmd_ptr, int *err_code, char **err_msg); diff --git a/src/salloc/msg.c b/src/salloc/msg.c index 01c44ad7c457d08a6776998215d06c3068b48f81..12a6834e66ea8a9450ef151aa4dbabdf24275045 100644 --- a/src/salloc/msg.c +++ b/src/salloc/msg.c @@ -241,6 +241,15 @@ static void _handle_timeout(slurm_msg_t *msg) slurm_free_srun_timeout_msg(msg->data); } +static void _handle_user_msg(slurm_msg_t *msg) +{ + srun_user_msg_t *um; + + um = msg->data; + info("%s", um->msg); + slurm_free_srun_user_msg(msg->data); +} + static void _handle_job_complete(slurm_msg_t *msg) { srun_job_complete_msg_t *comp = (srun_job_complete_msg_t *)msg->data; @@ -297,6 +306,9 @@ _handle_msg(slurm_msg_t *msg) case SRUN_TIMEOUT: _handle_timeout(msg); break; + case SRUN_USER_MSG: + _handle_user_msg(msg); + break; case SRUN_NODE_FAIL: _handle_node_fail(msg); break; diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 5b6722df9ae75d4b3ddf0522a35ec65f3346ed5f..689dcd995be84a6b77afcd38691f0e3cfd618d1e 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -217,7 +217,7 @@ void *agent(void *args) /* info("I am here and agent_cnt is %d of %d with type %d", */ /* agent_cnt, MAX_AGENT_CNT, agent_arg_ptr->msg_type); */ slurm_mutex_lock(&agent_cnt_mutex); - while (1) { + while (slurmctld_config.shutdown_time == 0) { if (agent_cnt < MAX_AGENT_CNT) { agent_cnt++; break; @@ -226,6 +226,8 @@ void *agent(void *args) } } slurm_mutex_unlock(&agent_cnt_mutex); + if (slurmctld_config.shutdown_time) + return NULL; /* basic argument value tests */ begin_time = time(NULL); @@ -379,6 +381,7 @@ static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr) && (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE) && (agent_arg_ptr->msg_type != SRUN_TIMEOUT) && (agent_arg_ptr->msg_type != SRUN_NODE_FAIL) + && (agent_arg_ptr->msg_type != SRUN_USER_MSG) && (agent_arg_ptr->msg_type != SRUN_JOB_COMPLETE)) { agent_info_ptr->get_reply = true; span = set_span(agent_arg_ptr->node_count, 0); @@ -494,6 +497,7 @@ static void *_wdog(void *args) if ( (agent_ptr->msg_type == SRUN_JOB_COMPLETE) || (agent_ptr->msg_type == SRUN_PING) || (agent_ptr->msg_type == SRUN_TIMEOUT) + || (agent_ptr->msg_type == SRUN_USER_MSG) || (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) || (agent_ptr->msg_type == SRUN_NODE_FAIL) ) srun_agent = true; @@ -577,7 +581,8 @@ static void _notify_slurmctld_jobs(agent_info_t *agent_ptr) *agent_ptr->msg_args_pptr; job_id = msg->job_id; step_id = NO_VAL; - } else if (agent_ptr->msg_type == SRUN_JOB_COMPLETE) { + } else if ((agent_ptr->msg_type == SRUN_JOB_COMPLETE) + || (agent_ptr->msg_type == SRUN_USER_MSG)) { return; /* no need to note srun response */ } else if (agent_ptr->msg_type == SRUN_NODE_FAIL) { return; /* no need to note srun response */ @@ -753,9 +758,6 @@ static void *_thread_per_group_rpc(void *args) /* Locks: Write job, write node */ slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK }; - /* Locks: Read job */ - slurmctld_lock_t job_read_lock = { - NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; #endif xassert(args != NULL); xsignal(SIGUSR1, _sig_handler); @@ -765,41 +767,12 @@ static void *_thread_per_group_rpc(void *args) srun_agent = ( (msg_type == SRUN_PING) || (msg_type == SRUN_JOB_COMPLETE) || (msg_type == SRUN_TIMEOUT) || + (msg_type == SRUN_USER_MSG) || (msg_type == RESPONSE_RESOURCE_ALLOCATION) || (msg_type == SRUN_NODE_FAIL) ); thread_ptr->start_time = time(NULL); -#if AGENT_IS_THREAD - if (srun_agent) { - uint32_t job_id = 0; - enum job_states state = JOB_END; - struct job_record *job_ptr = NULL; - - if ((msg_type == SRUN_PING) - || (msg_type == SRUN_JOB_COMPLETE)) { - srun_ping_msg_t *msg = task_ptr->msg_args_ptr; - job_id = msg->job_id; - } else if (msg_type == SRUN_TIMEOUT) { - srun_timeout_msg_t *msg = task_ptr->msg_args_ptr; - job_id = msg->job_id; - } else if (msg_type == SRUN_NODE_FAIL) { - srun_node_fail_msg_t *msg = task_ptr->msg_args_ptr; - job_id = msg->job_id; - } else if (msg_type == RESPONSE_RESOURCE_ALLOCATION) { - resource_allocation_response_msg_t *msg = - task_ptr->msg_args_ptr; - job_id = msg->job_id; - } - lock_slurmctld(job_read_lock); - if (job_id) - job_ptr = find_job_record(job_id); - if (job_ptr) - state = job_ptr->job_state; - unlock_slurmctld(job_read_lock); - } -#endif - slurm_mutex_lock(thread_mutex_ptr); thread_ptr->state = DSH_ACTIVE; thread_ptr->end_time = thread_ptr->start_time + COMMAND_TIMEOUT; @@ -1303,6 +1276,8 @@ static void _purge_agent_args(agent_arg_t *agent_arg_ptr) else if ((agent_arg_ptr->msg_type == REQUEST_TERMINATE_JOB) || (agent_arg_ptr->msg_type == REQUEST_KILL_TIMELIMIT)) slurm_free_kill_job_msg(agent_arg_ptr->msg_args); + else if (agent_arg_ptr->msg_type == SRUN_USER_MSG) + slurm_free_srun_user_msg(agent_arg_ptr->msg_args); else xfree(agent_arg_ptr->msg_args); } diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 6661434c3f73bef845bcd0d9ceb613a146f805c7..5330a2e6aa63d225e0b60b7c41ae1ad225b1c547 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -391,18 +391,19 @@ int main(int argc, char *argv[]) { /* This should purge all allocated memory, *\ \* Anything left over represents a leak. */ - int i; + int i, cnt; - /* Give running agents a chance to complete and purge */ - sleep(1); - agent_purge(); - for (i=0; i<4; i++) { - if (get_agent_count() == 0) - break; - sleep(5); + /* Give running agents a chance to complete and free memory. + * Wait up to 30 seconds (3 seconds * 10) */ + for (i=0; i<10; i++) { agent_purge(); + sleep(3); + cnt = get_agent_count(); + if (cnt == 0) + break; } - + if (i >= 10) + error("Left %d agent threads active", cnt); /* Purge our local data structures */ job_fini(); diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c index d5eae790a9a52e3dcdcbfa04272c8d36c67fbe11..c24e75ea74d22a88da80cd6f36fbcc588a957586 100644 --- a/src/slurmctld/node_scheduler.c +++ b/src/slurmctld/node_scheduler.c @@ -1405,13 +1405,11 @@ extern void build_node_details(struct job_record *job_ptr) xrealloc(job_ptr->node_addr, (sizeof(slurm_addr) * job_ptr->node_cnt)); - job_ptr->alloc_lps_cnt = 0; - xfree(job_ptr->alloc_lps); - if (job_ptr->cr_enabled) { - cr_enabled = job_ptr->cr_enabled; - job_ptr->alloc_lps = xmalloc(job_ptr->node_cnt * sizeof(uint32_t)); - job_ptr->alloc_lps_cnt = job_ptr->node_cnt; - } + job_ptr->alloc_lps_cnt = job_ptr->node_cnt; + xrealloc(job_ptr->alloc_lps, + (sizeof(uint32_t) * job_ptr->node_cnt)); + if (job_ptr->cr_enabled) + cr_enabled = job_ptr->cr_enabled; while ((this_node_name = hostlist_shift(host_list))) { node_ptr = find_node_record(this_node_name); @@ -1435,15 +1433,13 @@ extern void build_node_details(struct job_record *job_ptr) node_ptr, job_ptr, SELECT_AVAIL_CPUS, &usable_lps); if (error_code == SLURM_SUCCESS) { - if (cr_enabled && job_ptr->alloc_lps) { + if (job_ptr->alloc_lps) { job_ptr->alloc_lps[cr_count++] = usable_lps; } } else { - if (cr_enabled) { - xfree(job_ptr->alloc_lps); - job_ptr->alloc_lps_cnt = 0; - } + xfree(job_ptr->alloc_lps); + job_ptr->alloc_lps_cnt = 0; error("Unable to get extra jobinfo " "from JobId=%u", job_ptr->job_id); } diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 222cfbca100d022f16a735d49aee2e5846d0cb19..48aafd4a902cb9520c0cd7542e7ff98d5a2492a2 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -434,9 +434,10 @@ static int _make_step_cred(struct step_record *step_rec, else cred_arg.alloc_lps_cnt = step_rec->job_ptr->alloc_lps_cnt; if (cred_arg.alloc_lps_cnt > 0) { - cred_arg.alloc_lps = xmalloc(cred_arg.alloc_lps_cnt * sizeof(int)); + cred_arg.alloc_lps = xmalloc(cred_arg.alloc_lps_cnt * + sizeof(uint32_t)); memcpy(cred_arg.alloc_lps, step_rec->job_ptr->alloc_lps, - cred_arg.alloc_lps_cnt*sizeof(int)); + cred_arg.alloc_lps_cnt*sizeof(uint32_t)); } else cred_arg.alloc_lps = NULL; diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 43269818985b7dd16b8c8c58b376825cf57df27e..ca95be7bc42162a38180b0f604bd5ce1dbadae22 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -374,11 +374,11 @@ struct job_record { * linear plugins * 0 if cr is NOT enabled, * 1 if cr is enabled */ - uint32_t alloc_lps_cnt; /* number of hosts in alloc_lps - or 0 if alloc_lps is not needed - for the credentials */ - uint32_t *alloc_lps; /* number of logical processors - * allocated for this job */ + uint32_t alloc_lps_cnt; /* number of hosts in alloc_lps + * or 0 if alloc_lps is not needed + * for the credentials */ + uint32_t *alloc_lps; /* number of logical processors + * allocated for this job */ uint16_t mail_type; /* see MAIL_JOB_* in slurm.h */ char *mail_user; /* user to get e-mail notification */ uint32_t requid; /* requester user ID */ diff --git a/src/slurmctld/srun_comm.c b/src/slurmctld/srun_comm.c index 4b00204abcd00646e7bb546a628214cabfc680df..a43fd99fa524fab78fb4a8a420d260d249fd97b2 100644 --- a/src/slurmctld/srun_comm.c +++ b/src/slurmctld/srun_comm.c @@ -243,6 +243,32 @@ extern void srun_timeout (struct job_record *job_ptr) list_iterator_destroy(step_iterator); } + +/* + * srun_user_message - Send arbitrary message to an srun job (no job steps) + */ +extern void srun_user_message(struct job_record *job_ptr, char *msg) +{ + slurm_addr * addr; + srun_user_msg_t *msg_arg; + + xassert(job_ptr); + if ((job_ptr->job_state != JOB_PENDING) + && (job_ptr->job_state != JOB_RUNNING)) + return; + + if (job_ptr->other_port + && job_ptr->other_host && job_ptr->other_host[0]) { + addr = xmalloc(sizeof(struct sockaddr_in)); + slurm_set_addr(addr, job_ptr->other_port, job_ptr->other_host); + msg_arg = xmalloc(sizeof(srun_user_msg_t)); + msg_arg->job_id = job_ptr->job_id; + msg_arg->msg = xstrdup(msg); + _srun_agent_launch(addr, job_ptr->other_host, SRUN_USER_MSG, + msg_arg); + } +} + /* * srun_complete - notify srun of a job's termination * IN job_ptr - pointer to the slurmctld job record diff --git a/src/slurmctld/srun_comm.h b/src/slurmctld/srun_comm.h index 8fc06d0dc312075773dbbdc882899d06dc735ce1..e4602e6dd516a1ed458036eebe100b3119c5950a 100644 --- a/src/slurmctld/srun_comm.h +++ b/src/slurmctld/srun_comm.h @@ -78,4 +78,9 @@ extern void srun_response(uint32_t job_id, uint32_t step_id); */ extern void srun_timeout (struct job_record *job_ptr); +/* + * srun_user_message - Send arbitrary message to an srun job (no job steps) + */ +extern void srun_user_message(struct job_record *job_ptr, char *msg); + #endif /* !_HAVE_SRUN_COMM_H */ diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index c9c895e329f66b9b873c97acba0ed53c58003650..0ed3737f959e0930d852b0c2b116443b06fb8e91 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -128,7 +128,7 @@ static int _waiter_complete (uint32_t jobid); static bool _steps_completed_now(uint32_t jobid); static void _wait_state_completed(uint32_t jobid, int max_delay); -static uid_t _get_job_uid(uint32_t jobid); +static long _get_job_uid(uint32_t jobid); static gids_t *_gids_cache_lookup(char *user, gid_t gid); @@ -1131,7 +1131,7 @@ _rpc_stat_jobacct(slurm_msg_t *msg) stat_jobacct_msg_t *resp = NULL; int fd; uid_t req_uid; - uid_t job_uid; + long job_uid; debug3("Entering _rpc_stat_jobacct"); /* step completion messages are only allowed from other slurmstepd, @@ -1139,20 +1139,28 @@ _rpc_stat_jobacct(slurm_msg_t *msg) req_uid = g_slurm_auth_get_uid(msg->auth_cred); job_uid = _get_job_uid(req->job_id); + if (job_uid < 0) { + error("stat_jobacct for invalid job_id: %u", + req->job_id); + if (msg->conn_fd >= 0) + slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID); + return ESLURM_INVALID_JOB_ID; + } + /* * check that requesting user ID is the SLURM UID or root */ if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) { error("stat_jobacct from uid %ld for job %u " "owned by uid %ld", - (long) req_uid, req->job_id, - (long) job_uid); + (long) req_uid, req->job_id, job_uid); if (msg->conn_fd >= 0) { slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); return ESLURM_USER_ID_MISSING;/* or bad in this case */ } - } + } + resp = xmalloc(sizeof(stat_jobacct_msg_t)); slurm_msg_t_copy(&resp_msg, msg); resp->job_id = req->job_id; @@ -1463,7 +1471,7 @@ done: slurm_free_reattach_tasks_response_msg(resp); } -static uid_t +static long _get_job_uid(uint32_t jobid) { List steps; @@ -1471,7 +1479,7 @@ _get_job_uid(uint32_t jobid) step_loc_t *stepd; slurmstepd_info_t *info = NULL; int fd; - uid_t uid = 0; + long uid = -1; steps = stepd_available(conf->spooldir, conf->node_name); i = list_iterator_create(steps); @@ -1496,7 +1504,7 @@ _get_job_uid(uint32_t jobid) stepd->jobid, stepd->stepid); continue; } - uid = (uid_t)info->uid; + uid = (long)info->uid; break; } list_iterator_destroy(i); @@ -1745,7 +1753,7 @@ _rpc_signal_job(slurm_msg_t *msg) { signal_job_msg_t *req = msg->data; uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred); - uid_t job_uid; + long job_uid; List steps; ListIterator i; step_loc_t *stepd = NULL; @@ -1768,6 +1776,9 @@ _rpc_signal_job(slurm_msg_t *msg) debug("_rpc_signal_job, uid = %d, signal = %d", req_uid, req->signal); job_uid = _get_job_uid(req->job_id); + if (job_uid < 0) + goto no_job; + /* * check that requesting user ID is the SLURM UID or root */ @@ -1798,8 +1809,10 @@ _rpc_signal_job(slurm_msg_t *msg) continue; } - if (stepd->stepid == SLURM_BATCH_SCRIPT) + if (stepd->stepid == SLURM_BATCH_SCRIPT) { + debug2("batch script itself not signalled"); continue; + } step_cnt++; @@ -1819,9 +1832,12 @@ _rpc_signal_job(slurm_msg_t *msg) } list_iterator_destroy(i); list_destroy(steps); - if (step_cnt == 0) + + no_job: + if (step_cnt == 0) { debug2("No steps in jobid %u to send signal %d", req->job_id, req->signal); + } /* * At this point, if connection still open, we send controller @@ -1844,7 +1860,7 @@ _rpc_suspend_job(slurm_msg_t *msg) { suspend_msg_t *req = msg->data; uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred); - uid_t job_uid; + long job_uid; List steps; ListIterator i; step_loc_t *stepd; @@ -1859,6 +1875,8 @@ _rpc_suspend_job(slurm_msg_t *msg) debug("_rpc_suspend_job jobid=%u uid=%d", req->job_id, req_uid); job_uid = _get_job_uid(req->job_id); + if (job_uid < 0) + goto no_job; /* * check that requesting user ID is the SLURM UID or root */ @@ -1908,8 +1926,12 @@ _rpc_suspend_job(slurm_msg_t *msg) } list_iterator_destroy(i); list_destroy(steps); - if (step_cnt == 0) - debug2("No steps in jobid %u to suspend/resume", req->job_id); + + no_job: + if (step_cnt == 0) { + debug2("No steps in jobid %u to suspend/resume", + req->job_id); + } /* * At this point, if connection still open, we send controller diff --git a/src/srun/allocate.c b/src/srun/allocate.c index 64497f22e02d17e0544133888c999225467c9771..f41388882e371294fbf398b315404ce11379ab8a 100644 --- a/src/srun/allocate.c +++ b/src/srun/allocate.c @@ -316,6 +316,7 @@ _handle_msg(slurm_msg_t *msg, resource_allocation_response_msg_t **resp) uid_t slurm_uid = (uid_t) slurm_get_slurm_user_id(); int rc = 0; srun_timeout_msg_t *to; + srun_user_msg_t *um; if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) { error ("Security violation, slurm message from uid %u", @@ -346,6 +347,11 @@ _handle_msg(slurm_msg_t *msg, resource_allocation_response_msg_t **resp) timeout_handler(to->timeout); slurm_free_srun_timeout_msg(msg->data); break; + case SRUN_USER_MSG: + um = msg->data; + info("%s", um->msg); + slurm_free_srun_user_msg(msg->data); + break; default: error("received spurious message type: %d\n", msg->msg_type); diff --git a/src/srun/msg.c b/src/srun/msg.c index 7c1b648527dd3a333fcb4f76e8bb880c324f4c23..c96034999ef8da0b48e54598a37e8b1c4e5bf3e2 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -549,6 +549,7 @@ _handle_msg(srun_job_t *job, slurm_msg_t *msg) int rc; srun_timeout_msg_t *to; srun_node_fail_msg_t *nf; + srun_user_msg_t *um; if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) { error ("Security violation, slurm message from uid %u", @@ -584,6 +585,11 @@ _handle_msg(srun_job_t *job, slurm_msg_t *msg) timeout_handler(to->timeout); slurm_free_srun_timeout_msg(msg->data); break; + case SRUN_USER_MSG: + um = msg->data; + info("%s", um->msg); + slurm_free_srun_user_msg(msg->data); + break; case SRUN_NODE_FAIL: verbose("node_fail received"); nf = msg->data; diff --git a/testsuite/expect/test7.7.prog.c b/testsuite/expect/test7.7.prog.c index 3ca3fac2e03f7d6d79c5e6ce0393e14faf94db10..4c7d1e4cae7b44c756d911c3eeb2a53101e5b51f 100644 --- a/testsuite/expect/test7.7.prog.c +++ b/testsuite/expect/test7.7.prog.c @@ -336,6 +336,19 @@ static void _modify_job(long my_job_id) (uint32_t) now, my_job_id); _xmit(out_msg); } + +static void _notify_job(long my_job_id) +{ + time_t now = time(NULL); + char out_msg[256]; + + snprintf(out_msg, sizeof(out_msg), + "TS=%u AUTH=root DT=CMD=NOTIFYJOB ARG=%ld " + "MSG=this is a test", + (uint32_t) now, my_job_id); + _xmit(out_msg); +} + static void _resume_job(long my_job_id) { time_t now = time(NULL); @@ -402,6 +415,7 @@ int main(int argc, char * argv[]) _get_nodes(); _job_will_run(job_id); _modify_job(job_id); + /* _notify_job(65544); */ _get_jobs(); _start_job(job_id); _get_jobs();