diff --git a/NEWS b/NEWS index 4d513ac5f22cfe091a5591ec7ccd7023215faebd..e3c70d28624dd19bda7efd8981a93b71cf13d0a3 100644 --- a/NEWS +++ b/NEWS @@ -7,6 +7,7 @@ documents those changes that are of interest to users and admins. partition Shared parameter in slurm.conf (e.g. "Shared=FORCE:3" for two jobs to share the resources). From Chris Holmes, HP. -- Fix a bug in the processing of srun's --exclusive option for a job step. + -- Made salloc use api instead of local code for message handling * Changes in SLURM 1.3.0-pre5 ============================= diff --git a/src/salloc/Makefile.am b/src/salloc/Makefile.am index 5f65b5e67c8880e450ca314f95b77f7e96abd5b7..ee6286761834f2910d2a8dbef2100c706cde1d23 100644 --- a/src/salloc/Makefile.am +++ b/src/salloc/Makefile.am @@ -6,7 +6,7 @@ INCLUDES = -I$(top_srcdir) bin_PROGRAMS = salloc -salloc_SOURCES = salloc.c salloc.h opt.c opt.h msg.c msg.h +salloc_SOURCES = salloc.c salloc.h opt.c opt.h convenience_libs = $(top_builddir)/src/api/libslurmhelper.la diff --git a/src/salloc/Makefile.in b/src/salloc/Makefile.in index 682c279ee2b285a9c8919de30a00f9183aae054b..2273e747b19663a608bab6e87a4f176854fce9bf 100644 --- a/src/salloc/Makefile.in +++ b/src/salloc/Makefile.in @@ -69,7 +69,7 @@ CONFIG_CLEAN_FILES = am__installdirs = "$(DESTDIR)$(bindir)" binPROGRAMS_INSTALL = $(INSTALL_PROGRAM) PROGRAMS = $(bin_PROGRAMS) -am_salloc_OBJECTS = salloc.$(OBJEXT) opt.$(OBJEXT) msg.$(OBJEXT) +am_salloc_OBJECTS = salloc.$(OBJEXT) opt.$(OBJEXT) salloc_OBJECTS = $(am_salloc_OBJECTS) salloc_DEPENDENCIES = $(convenience_libs) salloc_LINK = $(LIBTOOL) --tag=CC $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) \ @@ -256,7 +256,7 @@ top_builddir = @top_builddir@ top_srcdir = @top_srcdir@ AUTOMAKE_OPTIONS = foreign INCLUDES = -I$(top_srcdir) -salloc_SOURCES = salloc.c salloc.h opt.c opt.h msg.c msg.h +salloc_SOURCES = salloc.c salloc.h opt.c opt.h convenience_libs = $(top_builddir)/src/api/libslurmhelper.la salloc_LDADD = \ $(convenience_libs) @@ -333,7 +333,6 @@ mostlyclean-compile: distclean-compile: -rm -f *.tab.c -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/msg.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/opt.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/salloc.Po@am__quote@ diff --git a/src/salloc/msg.c b/src/salloc/msg.c deleted file mode 100644 index 12a6834e66ea8a9450ef151aa4dbabdf24275045..0000000000000000000000000000000000000000 --- a/src/salloc/msg.c +++ /dev/null @@ -1,322 +0,0 @@ -/*****************************************************************************\ - * src/salloc/msg.c - Message handler for salloc - ***************************************************************************** - * Copyright (C) 2006 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Christopher J. Morrone <morrone2@llnl.gov> - * UCRL-CODE-226842. - * - * This file is part of SLURM, a resource management program. - * For details, see <http://www.llnl.gov/linux/slurm/>. - * - * SLURM is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) - * any later version. - * - * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along - * with SLURM; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -\*****************************************************************************/ - -#if HAVE_CONFIG_H -# include "config.h" -#endif - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <sys/un.h> -#include <sys/types.h> -#include <signal.h> -#include <pthread.h> - -#include <slurm/slurm.h> - -#include "src/common/slurm_protocol_defs.h" -#include "src/common/slurm_protocol_api.h" -#include "src/common/slurm_protocol_common.h" -#include "src/common/net.h" -#include "src/common/fd.h" -#include "src/common/forward.h" -#include "src/common/xmalloc.h" -#include "src/common/slurm_auth.h" -#include "src/common/eio.h" -#include "src/common/xsignal.h" - -#include "src/salloc/salloc.h" -#include "src/salloc/opt.h" -#include "src/salloc/msg.h" - -struct salloc_msg_thread { - eio_handle_t *handle; - pthread_t id; -}; - -static uid_t slurm_uid; -static void _handle_msg(slurm_msg_t *msg); -static bool _message_socket_readable(eio_obj_t *obj); -static int _message_socket_accept(eio_obj_t *obj, List objs); -static pthread_mutex_t msg_thr_start_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t msg_thr_start_cond = PTHREAD_COND_INITIALIZER; -static struct io_operations message_socket_ops = { - readable: &_message_socket_readable, - handle_read: &_message_socket_accept -}; - -static void *_msg_thr_internal(void *arg) -{ - int signals[] = {SIGHUP, SIGINT, SIGQUIT, SIGPIPE, SIGTERM, - SIGUSR1, SIGUSR2, 0}; - - debug("Entering _msg_thr_internal"); - xsignal_block(signals); - pthread_mutex_lock(&msg_thr_start_lock); - pthread_cond_signal(&msg_thr_start_cond); - pthread_mutex_unlock(&msg_thr_start_lock); - eio_handle_mainloop((eio_handle_t *)arg); - debug("Leaving _msg_thr_internal"); - - return NULL; -} - -extern salloc_msg_thread_t *msg_thr_create(uint16_t *port) -{ - int sock = -1; - eio_obj_t *obj; - salloc_msg_thread_t *msg_thr = NULL; - - debug("Entering _msg_thr_create()"); - slurm_uid = (uid_t) slurm_get_slurm_user_id(); - msg_thr = (salloc_msg_thread_t *)xmalloc(sizeof(salloc_msg_thread_t)); - - if (net_stream_listen(&sock, (short *)port) < 0) { - error("unable to intialize step launch listening socket: %m"); - xfree(msg_thr); - return NULL; - } - debug("port from net_stream_listen is %hu", *port); - - obj = eio_obj_create(sock, &message_socket_ops, NULL); - - msg_thr->handle = eio_handle_create(); - eio_new_initial_obj(msg_thr->handle, obj); - pthread_mutex_lock(&msg_thr_start_lock); - if (pthread_create(&msg_thr->id, NULL, - _msg_thr_internal, (void *)msg_thr->handle) != 0) { - error("pthread_create of message thread: %m"); - eio_handle_destroy(msg_thr->handle); - xfree(msg_thr); - return NULL; - } - /* Wait until the message thread has blocked signals - before continuing. */ - pthread_cond_wait(&msg_thr_start_cond, &msg_thr_start_lock); - pthread_mutex_unlock(&msg_thr_start_lock); - - return msg_thr; -} - -extern void msg_thr_destroy(salloc_msg_thread_t *msg_thr) -{ - if (msg_thr == NULL) - return; - - eio_signal_shutdown(msg_thr->handle); - pthread_join(msg_thr->id, NULL); - eio_handle_destroy(msg_thr->handle); - xfree(msg_thr); -} - - -static bool _message_socket_readable(eio_obj_t *obj) -{ - debug3("Called _message_socket_readable"); - if (obj->shutdown == true) { - if (obj->fd != -1) { - debug2(" false, shutdown"); - close(obj->fd); - obj->fd = -1; - /*_wait_for_connections();*/ - } else { - debug2(" false"); - } - return false; - } else { - return true; - } -} - -static int _message_socket_accept(eio_obj_t *obj, List objs) -{ - int fd; - unsigned char *uc; - short port; - struct sockaddr_un addr; - slurm_msg_t *msg = NULL; - int len = sizeof(addr); - - debug3("Called _msg_socket_accept"); - - while ((fd = accept(obj->fd, (struct sockaddr *)&addr, - (socklen_t *)&len)) < 0) { - if (errno == EINTR) - continue; - if (errno == EAGAIN - || errno == ECONNABORTED - || errno == EWOULDBLOCK) { - return SLURM_SUCCESS; - } - error("Error on msg accept socket: %m"); - obj->shutdown = true; - return SLURM_SUCCESS; - } - - fd_set_close_on_exec(fd); - fd_set_blocking(fd); - - /* Should not call slurm_get_addr() because the IP may not be - in /etc/hosts. */ - uc = (unsigned char *)&((struct sockaddr_in *)&addr)->sin_addr.s_addr; - port = ((struct sockaddr_in *)&addr)->sin_port; - debug2("got message connection from %u.%u.%u.%u:%hu", - uc[0], uc[1], uc[2], uc[3], ntohs(port)); - fflush(stdout); - - msg = xmalloc(sizeof(slurm_msg_t)); - slurm_msg_t_init(msg); -again: - if(slurm_receive_msg(fd, msg, 0) != 0) { - printf("error on slurm_recieve_msg\n"); - fflush(stdout); - if (errno == EINTR) { - goto again; - } - error("slurm_receive_msg[%u.%u.%u.%u]: %m", - uc[0],uc[1],uc[2],uc[3]); - goto cleanup; - } - - _handle_msg(msg); /* handle_msg frees msg->data */ -cleanup: - if ((msg->conn_fd >= 0) && slurm_close_accepted_conn(msg->conn_fd) < 0) - error ("close(%d): %m", msg->conn_fd); - slurm_free_msg(msg); - - return SLURM_SUCCESS; -} - - -static void _handle_node_fail(slurm_msg_t *msg) -{ - srun_node_fail_msg_t *nf = (srun_node_fail_msg_t *)msg->data; - - error("Node failure on %s", nf->nodelist); - - slurm_free_srun_node_fail_msg(msg->data); -} - -/* - * Job has been notified of it's approaching time limit. - * Job will be killed shortly after timeout. - * This RPC can arrive multiple times with the same or updated timeouts. - */ -static void _handle_timeout(slurm_msg_t *msg) -{ - static time_t last_timeout = 0; - srun_timeout_msg_t *to = (srun_timeout_msg_t *)msg->data; - - debug3("received timeout message"); - if (to->timeout != last_timeout) { - last_timeout = to->timeout; - info("Job allocation time limit to be reached at %s", - ctime(&to->timeout)); - } - - slurm_free_srun_timeout_msg(msg->data); -} - -static void _handle_user_msg(slurm_msg_t *msg) -{ - srun_user_msg_t *um; - - um = msg->data; - info("%s", um->msg); - slurm_free_srun_user_msg(msg->data); -} - -static void _handle_job_complete(slurm_msg_t *msg) -{ - srun_job_complete_msg_t *comp = (srun_job_complete_msg_t *)msg->data; - debug3("job complete message received"); - - if (comp->step_id == NO_VAL) { - pthread_mutex_lock(&allocation_state_lock); - if (allocation_state != REVOKED) { - /* If the allocation_state is already REVOKED, then - * no need to print this message. We probably - * relinquished the allocation ourself. - */ - info("Job allocation %u has been revoked.", - comp->job_id); - } - if (allocation_state == GRANTED - && command_pid > -1 - && opt.kill_command_signal_set) { - verbose("Sending signal %d to command \"%s\", pid %d", - opt.kill_command_signal, - command_argv[0], command_pid); - kill(command_pid, opt.kill_command_signal); - } - allocation_state = REVOKED; - pthread_mutex_unlock(&allocation_state_lock); - } else { - verbose("Job step %u.%u is finished.", - comp->job_id, comp->step_id); - } - slurm_free_srun_job_complete_msg(msg->data); -} - -static void -_handle_msg(slurm_msg_t *msg) -{ - uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred); - uid_t uid = getuid(); - - if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) { - error ("Security violation, slurm message from uid %u", - (unsigned int) req_uid); - return; - } - - switch (msg->msg_type) { - case SRUN_PING: - debug("received ping message"); - slurm_send_rc_msg(msg, SLURM_SUCCESS); - slurm_free_srun_ping_msg(msg->data); - break; - case SRUN_JOB_COMPLETE: - _handle_job_complete(msg); - break; - case SRUN_TIMEOUT: - _handle_timeout(msg); - break; - case SRUN_USER_MSG: - _handle_user_msg(msg); - break; - case SRUN_NODE_FAIL: - _handle_node_fail(msg); - break; - default: - error("received spurious message type: %d\n", - msg->msg_type); - break; - } - return; -} - diff --git a/src/salloc/msg.h b/src/salloc/msg.h deleted file mode 100644 index 71267db9e100d4159cbf94090ae629ed7fd21c4b..0000000000000000000000000000000000000000 --- a/src/salloc/msg.h +++ /dev/null @@ -1,39 +0,0 @@ -/*****************************************************************************\ - * src/salloc/msg.h - Message handler for salloc - * - * $Id: salloc.c 8570 2006-07-13 21:12:58Z morrone $ - ***************************************************************************** - * Copyright (C) 2006 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Christopher J. Morrone <morrone2@llnl.gov> - * UCRL-CODE-226842. - * - * This file is part of SLURM, a resource management program. - * For details, see <http://www.llnl.gov/linux/slurm/>. - * - * SLURM is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) - * any later version. - * - * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along - * with SLURM; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -\*****************************************************************************/ - -#ifndef _SALLOC_MSG_H -#define _SALLOC_MSG_H - -#include <stdint.h> - -typedef struct salloc_msg_thread salloc_msg_thread_t; - -extern salloc_msg_thread_t *msg_thr_create(uint16_t *port); -extern void msg_thr_destroy(salloc_msg_thread_t *msg_thr); - -#endif /* _SALLOC_MSG_H */ diff --git a/src/salloc/salloc.c b/src/salloc/salloc.c index 5555ebd7f419447c6cd6adcd6c8dd5e9612de81f..6bd5fdb7b2ffbdfefe2d75e2723893efcd47e754 100644 --- a/src/salloc/salloc.c +++ b/src/salloc/salloc.c @@ -49,7 +49,6 @@ #include "src/salloc/salloc.h" #include "src/salloc/opt.h" -#include "src/salloc/msg.h" #define MAX_RETRIES 3 @@ -70,6 +69,11 @@ static void _pending_callback(uint32_t job_id); static void _ignore_signal(int signo); static void _exit_on_signal(int signo); static void _signal_while_allocating(int signo); +static void _job_complete_handler(srun_job_complete_msg_t *msg); +static void _timeout_handler(srun_timeout_msg_t *msg); +static void _user_msg_handler(srun_user_msg_t *msg); +static void _ping_handler(srun_ping_msg_t *msg); +static void _node_fail_handler(srun_node_fail_msg_t *msg); int main(int argc, char *argv[]) { @@ -77,7 +81,7 @@ int main(int argc, char *argv[]) job_desc_msg_t desc; resource_allocation_response_msg_t *alloc; time_t before, after; - salloc_msg_thread_t *msg_thr; + allocation_msg_thread_t *msg_thr; char **env = NULL; int status = 0; int errnum = 0; @@ -86,6 +90,7 @@ int main(int argc, char *argv[]) pid_t rc_pid = 0; int rc = 0; static char *msg = "Slurm job queue full, sleeping and retrying."; + slurm_allocation_callbacks_t callbacks; log_init(xbasename(argv[0]), logopt, 0, NULL); if (initialize_and_process_args(argc, argv) < 0) { @@ -107,8 +112,13 @@ int main(int argc, char *argv[]) exit(1); } + callbacks.ping = _ping_handler; + callbacks.timeout = _timeout_handler; + callbacks.job_complete = _job_complete_handler; + callbacks.user_msg = _user_msg_handler; + callbacks.node_fail = _node_fail_handler; /* create message thread to handle pings and such from slurmctld */ - msg_thr = msg_thr_create(&desc.other_port); + msg_thr = slurm_allocation_msg_thr_create(&desc.other_port, &callbacks); xsignal(SIGHUP, _signal_while_allocating); xsignal(SIGINT, _signal_while_allocating); @@ -140,7 +150,7 @@ int main(int argc, char *argv[]) } else { error("Failed to allocate resources: %m"); } - msg_thr_destroy(msg_thr); + slurm_allocation_msg_thr_destroy(msg_thr); exit(1); } after = time(NULL); @@ -228,7 +238,7 @@ relinquish: pthread_mutex_unlock(&allocation_state_lock); slurm_free_resource_allocation_response_msg(alloc); - msg_thr_destroy(msg_thr); + slurm_allocation_msg_thr_destroy(msg_thr); /* * Figure out what return code we should use. If the user's command @@ -411,3 +421,67 @@ static void _exit_on_signal(int signo) { exit_flag = true; } + +/* This typically signifies the job was cancelled by scancel */ +static void _job_complete_handler(srun_job_complete_msg_t *comp) +{ + if (comp->step_id == NO_VAL) { + pthread_mutex_lock(&allocation_state_lock); + if (allocation_state != REVOKED) { + /* If the allocation_state is already REVOKED, then + * no need to print this message. We probably + * relinquished the allocation ourself. + */ + info("Job allocation %u has been revoked.", + comp->job_id); + } + if (allocation_state == GRANTED + && command_pid > -1 + && opt.kill_command_signal_set) { + verbose("Sending signal %d to command \"%s\", pid %d", + opt.kill_command_signal, + command_argv[0], command_pid); + kill(command_pid, opt.kill_command_signal); + } + allocation_state = REVOKED; + pthread_mutex_unlock(&allocation_state_lock); + } else { + verbose("Job step %u.%u is finished.", + comp->job_id, comp->step_id); + } +} + +/* + * Job has been notified of it's approaching time limit. + * Job will be killed shortly after timeout. + * This RPC can arrive multiple times with the same or updated timeouts. + * FIXME: We may want to signal the job or perform other action for this. + * FIXME: How much lead time do we want for this message? Some jobs may + * require tens of minutes to gracefully terminate. + */ +static void _timeout_handler(srun_timeout_msg_t *msg) +{ + static time_t last_timeout = 0; + + if (msg->timeout != last_timeout) { + last_timeout = msg->timeout; + verbose("Job allocation time limit to be reached at %s", + ctime(&msg->timeout)); + } +} + +static void _user_msg_handler(srun_user_msg_t *msg) +{ + info("%s", msg->msg); +} + +static void _ping_handler(srun_ping_msg_t *msg) +{ + /* the api will respond so there really isn't anything to do + here */ +} + +static void _node_fail_handler(srun_node_fail_msg_t *msg) +{ + error("Node failure on %s", msg->nodelist); +} diff --git a/testsuite/expect/test15.23 b/testsuite/expect/test15.23 index 3002d26b12ff7f8a534919cb0c9d3a75a65caa70..ed0aefe1d3154fbbf1b4d67bc88ab17487b202e6 100755 --- a/testsuite/expect/test15.23 +++ b/testsuite/expect/test15.23 @@ -53,7 +53,7 @@ set env(SALLOC_TIMELIMIT) 2 set matches 0 set salloc_pid [spawn $salloc -N1 $bin_bash] expect { - -re "debug: Entering _msg_thr_create" { + -re "debug: Entering _msg_thr_internal" { incr matches exp_continue }