diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 5cb8948b025ae5855b08566ae6fdc5a78e712da2..2fefcfff4f022f47d851177defcff3d5449aac8e 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -16,69 +16,75 @@ elan_sources = endif if WITH_AUTHD -authd_sources = slurm_auth_authd.c +auth_sources = slurm_auth_authd.c else -authd_sources = slurm_auth_authd.c +auth_sources = slurm_auth_authd.c endif noinst_LTLIBRARIES = libcommon.la libdaemonize.la libcred.la CPPFLAGS = $(SSL_CPPFLAGS) -libcommon_la_SOURCES = xmalloc.c \ - xassert.c \ - xstring.c \ - strlcpy.c \ - list.c \ - fd.c \ - log.c \ - eio.c \ - cbuf.c \ - safeopen.c \ - bitstring.c \ - pack.c \ - hostlist.c \ - parse_spec.c \ - slurm_errno.c \ - slurm_protocol_api.c \ - slurm_protocol_pack.c \ - slurm_protocol_util.c \ - slurm_protocol_socket_implementation.c \ - slurm_protocol_defs.c \ - $(elan_sources) \ - $(authd_sources) +libcommon_la_SOURCES = \ + xmalloc.c \ + xassert.c \ + xstring.c \ + xsignal.c \ + strlcpy.c \ + list.c \ + fd.c \ + log.c \ + eio.c \ + cbuf.c \ + safeopen.c \ + bitstring.c \ + pack.c \ + hostlist.c \ + parse_spec.c \ + slurm_errno.c \ + slurm_protocol_api.c \ + slurm_protocol_pack.c \ + slurm_protocol_util.c \ + slurm_protocol_socket_implementation.c \ + slurm_protocol_defs.c \ + $(elan_sources) \ + $(auth_sources) -noinst_HEADERS = xmalloc.h \ - xassert.h \ - xstring.h \ - macros.h \ - list.h \ - log.h \ - eio.h \ - cbuf.h \ - safeopen.h \ - strlcpy.h \ - bitstring.h \ - pack.h \ - hostlist.h \ - parse_spec.h \ - slurm_protocol_api.h \ - slurm_protocol_pack.h \ - slurm_protocol_util.h \ - slurm_protocol_defs.h \ - slurm_protocol_common.h \ - slurm_protocol_socket_common.h \ - slurm_protocol_mongo_common.h \ - slurm_protocol_interface.h \ - slurm_errno.h \ - qsw.h +noinst_HEADERS = \ + xmalloc.h \ + xassert.h \ + xstring.h \ + xsignal.h \ + macros.h \ + list.h \ + log.h \ + eio.h \ + cbuf.h \ + safeopen.h \ + strlcpy.h \ + bitstring.h \ + pack.h \ + hostlist.h \ + parse_spec.h \ + slurm_protocol_api.h \ + slurm_protocol_pack.h \ + slurm_protocol_util.h \ + slurm_protocol_defs.h \ + slurm_protocol_common.h \ + slurm_protocol_socket_common.h \ + slurm_protocol_mongo_common.h \ + slurm_protocol_interface.h \ + slurm_errno.h \ + qsw.h -libdaemonize_la_SOURCES = \ - daemonize.c -libcred_la_SOURCES = credential_utils.c signature_utils.c list.c +libdaemonize_la_SOURCES = \ + daemonize.c +libcred_la_SOURCES = \ + credential_utils.c \ + signature_utils.c EXTRA_libcommon_la_SOURCES = \ - qsw.c - -libcred_la_LIBADD = $(SSL_LIBS) $(AUTHD_LIBS) + qsw.c +libcommon_la_LIBADD = $(AUTHD_LIBS) $(SSL_LIBS) +libcred_la_LIBADD = $(SSL_LIBS) libcred_la_LDFLAGS = $(SSL_LDFLAGS) diff --git a/src/common/cbuf.c b/src/common/cbuf.c index 125c38af476e1234ebd5f37f661cfe1bc3e4bce2..f24ddf85a81347415c9b54884a43fb5135565a6d 100644 --- a/src/common/cbuf.c +++ b/src/common/cbuf.c @@ -1,39 +1,41 @@ /*****************************************************************************\ * $Id$ ***************************************************************************** - * Copyright (C) 2001-2002 The Regents of the University of California. + * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Chris Dunlap <cdunlap@llnl.gov>. - * UCRL-CODE-2002-009. * - * This file is part of ConMan, a remote console management program. - * For details, see <http://www.llnl.gov/linux/conman/>. + * This file is from LSD-Tools, the LLNL Software Development Toolbox. * - * ConMan is free software; you can redistribute it and/or modify it under + * LSD-Tools is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * - * ConMan is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. + * LSD-Tools is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. * * You should have received a copy of the GNU General Public License along - * with ConMan; if not, write to the Free Software Foundation, Inc., + * with LSD-Tools; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. ***************************************************************************** * Refer to "cbuf.h" for documentation on public functions. \*****************************************************************************/ +/* FIXME: Revisit locations of cbuf_is_valid() calls. + */ + + #ifdef HAVE_CONFIG_H # include "config.h" #endif /* HAVE_CONFIG_H */ #ifdef WITH_PTHREADS # include <pthread.h> -# include <stdio.h> +# include <syslog.h> #endif /* WITH_PTHREADS */ #include <assert.h> @@ -98,17 +100,17 @@ typedef int (*cbuf_iof) (void *cbuf_data, void *arg, int len); * Prototypes * ****************/ -static int cbuf_find_nl (cbuf_t cb); +static int cbuf_find_line (cbuf_t cb); -static int cbuf_get_fd (void *srcbuf, int *pdstfd, int len); -static int cbuf_get_mem (void *srcbuf, unsigned char **pdstbuf, int len); -static int cbuf_put_fd (void *dstbuf, int *psrcfd, int len); -static int cbuf_put_mem (void *dstbuf, unsigned char **psrcbuf, int len); +static int cbuf_get_fd (void *dstbuf, int *psrcfd, int len); +static int cbuf_get_mem (void *dstbuf, unsigned char **psrcbuf, int len); +static int cbuf_put_fd (void *srcbuf, int *pdstfd, int len); +static int cbuf_put_mem (void *srcbuf, unsigned char **pdstbuf, int len); -static int cbuf_peeker (cbuf_t cb, int len, cbuf_iof getf, void *dst); -static int cbuf_reader (cbuf_t cb, int len, cbuf_iof getf, void *dst); -static int cbuf_replayer (cbuf_t cb, int len, cbuf_iof getf, void *dst); -static int cbuf_writer (cbuf_t cb, int len, cbuf_iof putf, void *src, +static int cbuf_dropper (cbuf_t cb, int len); +static int cbuf_reader (cbuf_t cb, int len, cbuf_iof putf, void *dst); +static int cbuf_replayer (cbuf_t cb, int len, cbuf_iof putf, void *dst); +static int cbuf_writer (cbuf_t cb, int len, cbuf_iof getf, void *src, int *pdropped); static int cbuf_grow (cbuf_t cb, int n); @@ -132,49 +134,44 @@ static int cbuf_is_valid (cbuf_t cb); #endif /* !MIN */ #ifdef WITH_PTHREADS - -# define cbuf_mutex_init(mutex) \ +/* + * FIXME: Replace syslog/abort jazz with macro that can be overridden. + */ +# define cbuf_mutex_init(cb) \ do { \ - int cbuf_errno; \ - if ((cbuf_errno = pthread_mutex_init(mutex, NULL)) != 0) { \ - fprintf(stderr, "ERROR: pthread_mutex_init() failed: %s\n", \ - strerror(cbuf_errno)); exit(1); \ - } \ + int e = pthread_mutex_init(&cb->mutex, NULL); \ + if (e) errno = e, syslog(LOG_ERR, "cbuf mutex init: %m"), abort(); \ } while (0) -# define cbuf_mutex_lock(mutex) \ +# define cbuf_mutex_lock(cb) \ do { \ - int cbuf_errno; \ - if ((cbuf_errno = pthread_mutex_lock(mutex)) != 0) { \ - fprintf(stderr, "ERROR: pthread_mutex_lock() failed: %s\n", \ - strerror(cbuf_errno)); exit(1); \ - } \ + int e = pthread_mutex_lock(&cb->mutex); \ + if (e) errno = e, syslog(LOG_ERR, "cbuf mutex lock: %m"), abort(); \ } while (0) -# define cbuf_mutex_unlock(mutex) \ +# define cbuf_mutex_unlock(cb) \ do { \ - int cbuf_errno; \ - if ((cbuf_errno = pthread_mutex_unlock(mutex)) != 0) { \ - fprintf(stderr, "ERROR: pthread_mutex_unlock() failed: %s\n", \ - strerror(cbuf_errno)); exit(1); \ - } \ + int e = pthread_mutex_unlock(&cb->mutex); \ + if (e) errno = e, syslog(LOG_ERR, "cbuf mutex unlock: %m"), abort(); \ } while (0) -# define cbuf_mutex_destroy(mutex) \ +# define cbuf_mutex_destroy(cb) \ do { \ - int cbuf_errno; \ - if ((cbuf_errno = pthread_mutex_destroy(mutex)) != 0) { \ - fprintf(stderr, "ERROR: pthread_mutex_destroy() failed: %s\n", \ - strerror(cbuf_errno)); exit(1); \ - } \ + int e = pthread_mutex_destroy(&cb->mutex); \ + if (e) errno = e, syslog(LOG_ERR, "cbuf mutex destroy: %m"), abort();\ } while (0) +# ifndef NDEBUG + static int cbuf_mutex_is_locked (cbuf_t cb); +# endif /* !NDEBUG */ + #else /* !WITH_PTHREADS */ -# define cbuf_mutex_init(mutex) -# define cbuf_mutex_lock(mutex) -# define cbuf_mutex_unlock(mutex) -# define cbuf_mutex_destroy(mutex) +# define cbuf_mutex_init(cb) +# define cbuf_mutex_lock(cb) +# define cbuf_mutex_unlock(cb) +# define cbuf_mutex_destroy(cb) +# define cbuf_mutex_is_locked(cb) (1) #endif /* !WITH_PTHREADS */ @@ -212,7 +209,7 @@ cbuf_create(int minsize, int maxsize) errno = ENOMEM; return(out_of_memory()); } - cbuf_mutex_init(&cb->mutex); + cbuf_mutex_init(cb); cb->size = minsize; cb->minsize = minsize; cb->maxsize = (maxsize > minsize) ? maxsize : minsize; @@ -233,9 +230,12 @@ cbuf_create(int minsize, int maxsize) */ memcpy(cb->data - CBUF_MAGIC_LEN, (void *) &cb->magic, CBUF_MAGIC_LEN); memcpy(cb->data + cb->size + 1, (void *) &cb->magic, CBUF_MAGIC_LEN); -#endif /* !NDEBUG */ + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); + cbuf_mutex_unlock(cb); +#endif /* !NDEBUG */ + return(cb); } @@ -244,7 +244,7 @@ void cbuf_destroy(cbuf_t cb) { assert(cb != NULL); - cbuf_mutex_lock(&cb->mutex); + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); #ifndef NDEBUG @@ -258,8 +258,8 @@ cbuf_destroy(cbuf_t cb) #endif /* !NDEBUG */ free(cb->data); - cbuf_mutex_unlock(&cb->mutex); - cbuf_mutex_destroy(&cb->mutex); + cbuf_mutex_unlock(cb); + cbuf_mutex_destroy(cb); free(cb); return; } @@ -269,12 +269,12 @@ void cbuf_flush(cbuf_t cb) { assert(cb != NULL); - cbuf_mutex_lock(&cb->mutex); + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); cb->used = 0; cb->i_in = cb->i_out = 0; assert(cbuf_is_valid(cb)); - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_unlock(cb); return; } @@ -285,10 +285,10 @@ cbuf_is_empty(cbuf_t cb) int used; assert(cb != NULL); - cbuf_mutex_lock(&cb->mutex); + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); used = cb->used; - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_unlock(cb); return(used == 0); } @@ -299,10 +299,10 @@ cbuf_size(cbuf_t cb) int size; assert(cb != NULL); - cbuf_mutex_lock(&cb->mutex); + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); size = cb->size; - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_unlock(cb); return(size); } @@ -313,10 +313,10 @@ cbuf_free(cbuf_t cb) int free; assert(cb != NULL); - cbuf_mutex_lock(&cb->mutex); + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); free = cb->size - cb->used; - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_unlock(cb); return(free); } @@ -327,10 +327,10 @@ cbuf_used(cbuf_t cb) int used; assert(cb != NULL); - cbuf_mutex_lock(&cb->mutex); + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); used = cb->used; - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_unlock(cb); return(used); } @@ -357,16 +357,15 @@ cbuf_drop(cbuf_t cb, int len) errno = EINVAL; return(-1); } - cbuf_mutex_lock(&cb->mutex); + if (len == 0) { + return(0); + } + cbuf_mutex_lock(cb); assert(cbuf_is_valid(cb)); - n = MIN(len, cb->used); - if (n > 0) { - cb->used -= n; - cb->i_out = (cb->i_out + n) % (cb->size + 1); - } + cbuf_dropper(cb, n); assert(cbuf_is_valid(cb)); - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_unlock(cb); return(n); } @@ -389,8 +388,8 @@ cbuf_move(cbuf_t src, cbuf_t dst, int len) /* XXX: What about deadlock? Yow! * Grab the locks in order of the lowest memory address. */ - cbuf_mutex_lock(&src->mutex); - cbuf_mutex_lock(&dst->mutex); + cbuf_mutex_lock(src); + cbuf_mutex_lock(dst); assert(cbuf_is_valid(src)); assert(cbuf_is_valid(dst)); /* @@ -398,8 +397,8 @@ cbuf_move(cbuf_t src, cbuf_t dst, int len) */ assert(cbuf_is_valid(dst)); assert(cbuf_is_valid(src)); - cbuf_mutex_unlock(&dst->mutex); - cbuf_mutex_unlock(&src->mutex); + cbuf_mutex_unlock(dst); + cbuf_mutex_unlock(src); return(0); } #endif @@ -408,9 +407,21 @@ cbuf_move(cbuf_t src, cbuf_t dst, int len) int cbuf_peek(cbuf_t cb, void *dstbuf, int len) { - /* XXX: NOT IMPLEMENTED. - */ - return(0); + int n; + + assert(cb != NULL); + + if ((dstbuf == NULL) || (len < 0)) { + errno = EINVAL; + return(-1); + } + if (len == 0) { + return(0); + } + cbuf_mutex_lock(cb); + n = cbuf_reader(cb, len, (cbuf_iof) cbuf_put_mem, &dstbuf); + cbuf_mutex_unlock(cb); + return(n); } @@ -428,9 +439,12 @@ cbuf_read(cbuf_t cb, void *dstbuf, int len) if (len == 0) { return(0); } - cbuf_mutex_lock(&cb->mutex); - n = cbuf_reader(cb, len, (cbuf_iof) cbuf_get_mem, &dstbuf); - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_lock(cb); + n = cbuf_reader(cb, len, (cbuf_iof) cbuf_put_mem, &dstbuf); + if (n > 0) { + cbuf_dropper(cb, n); + } + cbuf_mutex_unlock(cb); return(n); } @@ -449,15 +463,15 @@ cbuf_replay(cbuf_t cb, void *dstbuf, int len) if (len == 0) { return(0); } - cbuf_mutex_lock(&cb->mutex); - n = cbuf_replayer(cb, len, (cbuf_iof) cbuf_get_mem, &dstbuf); - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_lock(cb); + n = cbuf_replayer(cb, len, (cbuf_iof) cbuf_put_mem, &dstbuf); + cbuf_mutex_unlock(cb); return(n); } int -cbuf_write(cbuf_t cb, void *srcbuf, int len, int *dropped) +cbuf_write(cbuf_t cb, const void *srcbuf, int len, int *dropped) { int n; @@ -467,12 +481,15 @@ cbuf_write(cbuf_t cb, void *srcbuf, int len, int *dropped) errno = EINVAL; return(-1); } + if (dropped) { + *dropped = 0; + } if (len == 0) { return(0); } - cbuf_mutex_lock(&cb->mutex); - n = cbuf_writer(cb, len, (cbuf_iof) cbuf_put_mem, &srcbuf, dropped); - cbuf_mutex_unlock(&cb->mutex); + cbuf_mutex_lock(cb); + n = cbuf_writer(cb, len, (cbuf_iof) cbuf_get_mem, &srcbuf, dropped); + cbuf_mutex_unlock(cb); return(n); } @@ -480,9 +497,23 @@ cbuf_write(cbuf_t cb, void *srcbuf, int len, int *dropped) int cbuf_peek_to_fd(cbuf_t cb, int dstfd, int len) { - /* XXX: NOT IMPLEMENTED. - */ - return(0); + int n = 0; + + assert(cb != NULL); + + if ((dstfd < 0) || (len < -1)) { + errno = EINVAL; + return(-1); + } + cbuf_mutex_lock(cb); + if (len == -1) { + len = cb->used; + } + if (len > 0) { + n = cbuf_reader(cb, len, (cbuf_iof) cbuf_put_fd, &dstfd); + } + cbuf_mutex_unlock(cb); + return(n); } @@ -497,12 +528,17 @@ cbuf_read_to_fd(cbuf_t cb, int dstfd, int len) errno = EINVAL; return(-1); } - cbuf_mutex_lock(&cb->mutex); - if (len == -1) + cbuf_mutex_lock(cb); + if (len == -1) { len = cb->used; - if (len > 0) - n = cbuf_reader(cb, len, (cbuf_iof) cbuf_get_fd, &dstfd); - cbuf_mutex_unlock(&cb->mutex); + } + if (len > 0) { + n = cbuf_reader(cb, len, (cbuf_iof) cbuf_put_fd, &dstfd); + if (n > 0) { + cbuf_dropper(cb, n); + } + } + cbuf_mutex_unlock(cb); return(n); } @@ -518,12 +554,14 @@ cbuf_replay_to_fd(cbuf_t cb, int dstfd, int len) errno = EINVAL; return(-1); } - cbuf_mutex_lock(&cb->mutex); - if (len == -1) + cbuf_mutex_lock(cb); + if (len == -1) { len = cb->size - cb->used; - if (len > 0) - n = cbuf_reader(cb, len, (cbuf_iof) cbuf_get_fd, &dstfd); - cbuf_mutex_unlock(&cb->mutex); + } + if (len > 0) { + n = cbuf_replayer(cb, len, (cbuf_iof) cbuf_put_fd, &dstfd); + } + cbuf_mutex_unlock(cb); return(n); } @@ -539,61 +577,93 @@ cbuf_write_from_fd(cbuf_t cb, int srcfd, int len, int *dropped) errno = EINVAL; return(-1); } - cbuf_mutex_lock(&cb->mutex); - if (len == -1) + if (dropped) { + *dropped = 0; + } + cbuf_mutex_lock(cb); + /* + * XXX: Is the current -1 len behavior such a good idea? + * This prevents the buffer from both wrapping and growing. + */ + if (len == -1) { len = cb->size - cb->used; - if (len > 0) - n = cbuf_writer(cb, len, (cbuf_iof) cbuf_put_fd, &srcfd, dropped); - cbuf_mutex_unlock(&cb->mutex); + } + if (len > 0) { + n = cbuf_writer(cb, len, (cbuf_iof) cbuf_get_fd, &srcfd, dropped); + } + cbuf_mutex_unlock(cb); return(n); } int -cbuf_gets(cbuf_t cb, char *dst, int len) +cbuf_get_line(cbuf_t cb, char *dst, int len) { - /* XXX: Pro'ly best to do this as cbuf_peek + cbuf_drop. - */ + int n, m; + char *pdst; + assert(cb != NULL); - if ((dst == NULL) || (len <= 0)) { + if (((dst == NULL) && (len != 0)) || (len < 0)) { errno = EINVAL; return(-1); } - /* XXX: NOT IMPLEMENTED. - */ - return(-1); + pdst = dst; + cbuf_mutex_lock(cb); + n = cbuf_find_line(cb); + if (n > 0) { + if (len > 0) { + m = MIN(n, len - 1); + if (m > 0) { + n = cbuf_reader(cb, m, (cbuf_iof) cbuf_put_mem, &pdst); + assert(n == m); + } + dst[m] = '\0'; + } + cbuf_dropper(cb, n); + } + cbuf_mutex_unlock(cb); + return(n); } int -cbuf_peeks(cbuf_t cb, char *dst, int len) +cbuf_peek_line(cbuf_t cb, char *dst, int len) { - /* XXX: Use cbuf_find_nl. - */ + int n, m; + char *pdst; + assert(cb != NULL); - if ((dst == NULL) || (len <= 0)) { + if (((dst == NULL) && (len != 0)) || (len < 0)) { errno = EINVAL; return(-1); } - /* XXX: NOT IMPLEMENTED. - */ - return(-1); + pdst = dst; + cbuf_mutex_lock(cb); + n = cbuf_find_line(cb); + if (n > 0) { + if (len > 0) { + m = MIN(n, len - 1); + if (m > 0) { + n = cbuf_reader(cb, m, (cbuf_iof) cbuf_put_mem, &pdst); + assert(n == m); + } + dst[m] = '\0'; + } + } + cbuf_mutex_unlock(cb); + return(n); } int -cbuf_puts(cbuf_t cb, char *src, int *dropped) +cbuf_put_line(cbuf_t cb, const char *src, int *dropped) { - /* XXX: Handle case where src string exceeds buffer size. - * But cannot simply advance src ptr, since writer() - * may be able to grow cbuf if needed. Ugh! - * Pro'ly best to wrap it in a loop. Sigh. - * Should always return strlen(src). - */ - int n; int len; + int nget, ngot, ndropped, n, d; + const char *psrc; + const char *newline = "\n"; assert(cb != NULL); @@ -602,31 +672,51 @@ cbuf_puts(cbuf_t cb, char *src, int *dropped) return(-1); } len = strlen(src); - if (len == 0) { - return(0); + nget = len; + ngot = 0; + ndropped = 0; + psrc = src; + cbuf_mutex_lock(cb); + while (nget > 0) { + n = cbuf_writer(cb, nget, (cbuf_iof) cbuf_get_mem, &psrc, &d); + assert (n > 0); + nget -= n; + ngot += n; + ndropped += d; } - cbuf_mutex_lock(&cb->mutex); - n = cbuf_writer(cb, len, (cbuf_iof) cbuf_put_mem, src, dropped); - cbuf_mutex_unlock(&cb->mutex); - return(n); + if (src[len - 1] != '\n') { /* append newline if needed */ + n = cbuf_writer(cb, 1, (cbuf_iof) cbuf_get_mem, &newline, &d); + ngot++; + ndropped += d; + } + cbuf_mutex_unlock(cb); + assert((ngot == len) || (ngot == len + 1)); + if (dropped) { + *dropped = ndropped; + } + return(ngot); } static int -cbuf_find_nl(cbuf_t cb) +cbuf_find_line(cbuf_t cb) { -/* Returns the number of bytes up to and including the next newline. - * Assumes 'cb' is locked upon entry. +/* Returns the number of bytes up to and including the next newline or NUL. */ int i, n; + unsigned char c; assert(cb != NULL); + assert(cbuf_mutex_is_locked(cb)); i = cb->i_out; n = 1; while (i != cb->i_in) { - if (cb->data[i] == '\n') + c = cb->data[i]; + if ((c == '\n') || (c == '\0')) { + assert(n <= cb->used); return(n); + } i = (i + 1) % (cb->size + 1); n++; } @@ -635,174 +725,185 @@ cbuf_find_nl(cbuf_t cb) static int -cbuf_get_fd(void *srcbuf, int *pdstfd, int len) +cbuf_get_fd(void *dstbuf, int *psrcfd, int len) { -/* Copies data from cbuf's 'srcbuf' into the file referenced - * by the file descriptor pointed at by 'pdstfd'. - * Returns the number of bytes written to the fd, or <0 on error. +/* Copies data from the file referenced by the file descriptor + * pointed at by 'psrcfd' into cbuf's 'dstbuf'. + * Returns the number of bytes read from the fd, 0 on EOF, or <0 on error. */ int n; - assert(srcbuf != NULL); - assert(pdstfd != NULL); - assert(*pdstfd >= 0); + assert(dstbuf != NULL); + assert(psrcfd != NULL); + assert(*psrcfd >= 0); assert(len > 0); do { - n = write(*pdstfd, srcbuf, len); + n = read(*psrcfd, dstbuf, len); } while ((n < 0) && (errno == EINTR)); return(n); } static int -cbuf_get_mem(void *srcbuf, unsigned char **pdstbuf, int len) +cbuf_get_mem(void *dstbuf, unsigned char **psrcbuf, int len) { -/* Copies data from cbuf's 'srcbuf' into the buffer pointed at by 'pdstbuf'. +/* Copies data from the buffer pointed at by 'psrcbuf' into cbuf's 'dstbuf'. * Returns the number of bytes copied. */ - assert(srcbuf != NULL); - assert(pdstbuf != NULL); - assert(*pdstbuf != NULL); + assert(dstbuf != NULL); + assert(psrcbuf != NULL); + assert(*psrcbuf != NULL); assert(len > 0); - memcpy(*pdstbuf, srcbuf, len); - *pdstbuf += len; + memcpy(dstbuf, *psrcbuf, len); + *psrcbuf += len; return(len); } static int -cbuf_put_fd(void *dstbuf, int *psrcfd, int len) +cbuf_put_fd(void *srcbuf, int *pdstfd, int len) { -/* Copies data from the file referenced by the file descriptor - * pointed at by 'psrcfd' into cbuf's 'dstbuf'. - * Returns the number of bytes read from the fd, 0 on EOF, or <0 on error. +/* Copies data from cbuf's 'srcbuf' into the file referenced + * by the file descriptor pointed at by 'pdstfd'. + * Returns the number of bytes written to the fd, or <0 on error. */ int n; - assert(dstbuf != NULL); - assert(psrcfd != NULL); - assert(*psrcfd >= 0); + assert(srcbuf != NULL); + assert(pdstfd != NULL); + assert(*pdstfd >= 0); assert(len > 0); do { - n = read(*psrcfd, dstbuf, len); + n = write(*pdstfd, srcbuf, len); } while ((n < 0) && (errno == EINTR)); return(n); } static int -cbuf_put_mem(void *dstbuf, unsigned char **psrcbuf, int len) +cbuf_put_mem(void *srcbuf, unsigned char **pdstbuf, int len) { -/* Copies data from the buffer pointed at by 'psrcbuf' into cbuf's 'dstbuf'. +/* Copies data from cbuf's 'srcbuf' into the buffer pointed at by 'pdstbuf'. * Returns the number of bytes copied. */ - assert(dstbuf != NULL); - assert(psrcbuf != NULL); - assert(*psrcbuf != NULL); + assert(srcbuf != NULL); + assert(pdstbuf != NULL); + assert(*pdstbuf != NULL); assert(len > 0); - memcpy(dstbuf, *psrcbuf, len); - *psrcbuf += len; + memcpy(*pdstbuf, srcbuf, len); + *pdstbuf += len; return(len); } static int -cbuf_peeker(cbuf_t cb, int len, cbuf_iof getf, void *dst) +cbuf_dropper(cbuf_t cb, int len) { - /* XXX: NOT IMPLEMENTED. +/* Discards up to 'len' bytes of unread data from 'cb'. + * Returns the number of bytes dropped. + */ + assert(cb != NULL); + assert(len > 0); + assert(len <= cb->used); + assert(cbuf_mutex_is_locked(cb)); + + cb->used -= len; + cb->i_out = (cb->i_out + len) % (cb->size + 1); + assert(cbuf_is_valid(cb)); + + /* Attempt to shrink buffer if possible. */ - return(0); + if ((cb->size - cb->used > CBUF_CHUNK) && (cb->size > cb->minsize)) { + cbuf_shrink(cb); + } + /* Don't call me clumsy, don't call me a fool. + * When things fall down on me, I'm following the rule. + */ + return(len); } static int -cbuf_reader(cbuf_t cb, int len, cbuf_iof getf, void *dst) +cbuf_reader(cbuf_t cb, int len, cbuf_iof putf, void *dst) { /* XXX: DOCUMENT ME. */ int nget, ngot; - int shortget; + int shortput; int n, m; assert(cb != NULL); assert(len > 0); - assert(getf != NULL); + assert(putf != NULL); assert(dst != NULL); + assert(cbuf_mutex_is_locked(cb)); assert(cbuf_is_valid(cb)); nget = MIN(len, cb->used); ngot = 0; - shortget = 0; + shortput = 0; n = MIN(nget, (cb->size + 1) - cb->i_out); if (n > 0) { - m = getf(&cb->data[cb->i_out], dst, n); - if (m <= 0) + m = putf(&cb->data[cb->i_out], dst, n); + if (m <= 0) { return(m); - if (m != n) - shortget = 1; + } + if (m != n) { + shortput = 1; + } ngot += m; } n = nget - ngot; - if ((n > 0) && (!shortget)) { - m = getf(&cb->data[0], dst, n); + if ((n > 0) && (!shortput)) { + m = putf(&cb->data[0], dst, n); if (m > 0) { ngot += m; } } - - if (ngot > 0) { - cb->used -= ngot; - cb->i_out = (cb->i_out + ngot) % (cb->size + 1); - } - - assert(cbuf_is_valid(cb)); - - /* Attempt to shrink buffer if possible. - */ - if ((cb->size - cb->used > CBUF_CHUNK) && (cb->size > cb->minsize)) { - cbuf_shrink(cb); - } return(ngot); } static int -cbuf_replayer(cbuf_t cb, int len, cbuf_iof getf, void *dst) +cbuf_replayer(cbuf_t cb, int len, cbuf_iof putf, void *dst) { /* XXX: DOCUMENT ME. */ assert(cb != NULL); assert(len > 0); - assert(getf != NULL); + assert(putf != NULL); assert(dst != NULL); + assert(cbuf_mutex_is_locked(cb)); assert(cbuf_is_valid(cb)); /* XXX: NOT IMPLEMENTED. */ - return(0); + errno = ENOSYS; + return(-1); } static int -cbuf_writer(cbuf_t cb, int len, cbuf_iof putf, void *src, int *pdropped) +cbuf_writer(cbuf_t cb, int len, cbuf_iof getf, void *src, int *pdropped) { /* XXX: DOCUMENT ME. */ int free; int nget, ngot; - int shortput; + int shortget; int n, m; assert(cb != NULL); assert(len > 0); - assert(putf != NULL); + assert(getf != NULL); assert(src != NULL); + assert(cbuf_mutex_is_locked(cb)); assert(cbuf_is_valid(cb)); /* Attempt to grow buffer if necessary. @@ -811,42 +912,45 @@ cbuf_writer(cbuf_t cb, int len, cbuf_iof putf, void *src, int *pdropped) if ((len > free) && (cb->size < cb->maxsize)) { free += cbuf_grow(cb, len - free); } - /* Compute total number of bytes to attempt to write into buffer. */ nget = MIN(len, cb->size); ngot = 0; - shortput = 0; + shortget = 0; + if (pdropped) { + *pdropped = 0; + } /* Copy first chunk of data (ie, up to the end of the buffer). */ n = MIN(nget, (cb->size + 1) - cb->i_in); if (n > 0) { - m = putf(&cb->data[cb->i_in], src, n); - if (m <= 0) - return(m); - if (m != n) - shortput = 1; + m = getf(&cb->data[cb->i_in], src, n); + if (m <= 0) { + return(m); /* got ERR or EOF */ + } + if (m != n) { + shortget = 1; + } cb->i_in += m; cb->i_in %= (cb->size + 1); /* the hokey-pokey cbuf wrap-around */ ngot += m; } /* Copy second chunk of data (ie, from the beginning of the buffer). - * If the first putf() was short, the second putf() is not attempted. - * If the second putf() returns EOF/ERR, it will be masked by the success - * of the first putf(). This only occurs with put_fd, and the EOF/ERR + * If the first getf() was short, the second getf() is not attempted. + * If the second getf() returns EOF/ERR, it will be masked by the success + * of the first getf(). This only occurs with get_fd, and the EOF/ERR * condition should be returned on the next invocation. */ n = nget - ngot; - if ((n > 0) && (!shortput)) { - m = putf(&cb->data[0], src, n); + if ((n > 0) && (!shortget)) { + m = getf(&cb->data[0], src, n); if (m > 0) { cb->i_in += m; /* hokey-pokey not needed here */ ngot += m; } } - /* Check to see if any data in the circular-buffer was overwritten. */ if (ngot > 0) { @@ -872,7 +976,6 @@ cbuf_grow(cbuf_t cb, int n) /* Attempts to grow the circular buffer 'cb' by at least 'n' bytes. * Returns the number of bytes by which the buffer has grown (which may be * less-than, equal-to, or greater-than the number of bytes requested). - * Assumes 'cb' is locked upon entry. */ unsigned char *data; /* tmp ptr to data buffer */ int size_old; /* size of buffer upon func entry */ @@ -881,6 +984,7 @@ cbuf_grow(cbuf_t cb, int n) assert(cb != NULL); assert(n > 0); + assert(cbuf_mutex_is_locked(cb)); if (cb->size == cb->maxsize) { return(0); @@ -896,8 +1000,9 @@ cbuf_grow(cbuf_t cb, int n) m = MIN(m, (cb->maxsize + size_meta)); assert(m > cb->alloc); + data = cb->data; #ifndef NDEBUG - data = cb->data - CBUF_MAGIC_LEN; /* jump back to what malloc returned */ + data -= CBUF_MAGIC_LEN; /* jump back to what malloc returned */ #endif /* !NDEBUG */ if (!(data = realloc(data, m))) { @@ -942,9 +1047,9 @@ static int cbuf_shrink(cbuf_t cb) { /* XXX: DOCUMENT ME. - * Assumes 'cb' is locked upon entry. */ assert(cb != NULL); + assert(cbuf_mutex_is_locked(cb)); if (cb->size == cb->minsize) { return(0); @@ -959,16 +1064,34 @@ cbuf_shrink(cbuf_t cb) } +#ifndef NDEBUG +#ifdef WITH_PTHREADS +static int +cbuf_mutex_is_locked(cbuf_t cb) +{ +/* Returns true if the mutex is locked; o/w, returns false. + */ + int rc; + + assert(cb != NULL); + rc = pthread_mutex_trylock(&cb->mutex); + return(rc == EBUSY ? 1 : 0); +} +#endif /* WITH_PTHREADS */ +#endif /* !NDEBUG */ + + #ifndef NDEBUG static int cbuf_is_valid(cbuf_t cb) { /* Validates the data structure. All invariants should be tested here. * Returns true if everything is valid; o/w, aborts due to assertion failure. - * Assumes 'cb' is locked upon entry. */ int free; + assert(cb != NULL); + assert(cbuf_mutex_is_locked(cb)); assert(cb->data != NULL); assert(cb->magic == CBUF_MAGIC); assert(memcmp(cb->data - CBUF_MAGIC_LEN, @@ -990,12 +1113,15 @@ cbuf_is_valid(cbuf_t cb) assert(cb->i_out >= 0); assert(cb->i_out <= cb->size); - if (cb->i_in == cb->i_out) + if (cb->i_in == cb->i_out) { free = cb->size; - else if (cb->i_in < cb->i_out) + } + else if (cb->i_in < cb->i_out) { free = (cb->i_out - cb->i_in) - 1; - else + } + else { free = ((cb->size + 1) - cb->i_in) + cb->i_out - 1; + } assert(cb->size - cb->used == free); return(1); diff --git a/src/common/cbuf.h b/src/common/cbuf.h index e62a71b37bc6a373d00afbcc44d28be416eeb541..20f38a9f74c61cf832d524558b3064e71499eee3 100644 --- a/src/common/cbuf.h +++ b/src/common/cbuf.h @@ -1,26 +1,24 @@ /*****************************************************************************\ * $Id$ ***************************************************************************** - * Copyright (C) 2001-2002 The Regents of the University of California. + * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Chris Dunlap <cdunlap@llnl.gov>. - * UCRL-CODE-2002-009. * - * This file is part of ConMan, a remote console management program. - * For details, see <http://www.llnl.gov/linux/conman/>. + * This file is from LSD-Tools, the LLNL Software Development Toolbox. * - * ConMan is free software; you can redistribute it and/or modify it under + * LSD-Tools is free software; you can redistribute it and/or modify it under * the terms of the GNU General Public License as published by the Free * Software Foundation; either version 2 of the License, or (at your option) * any later version. * - * ConMan is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. + * LSD-Tools is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. * * You should have received a copy of the GNU General Public License along - * with ConMan; if not, write to the Free Software Foundation, Inc., + * with LSD-Tools; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. \*****************************************************************************/ @@ -47,6 +45,9 @@ * linker will expect to find an external Out-Of-Memory Function. * * If WITH_PTHREADS is defined, these routines will be thread-safe. + * + * XXX: Buffer shrinking not implemented yet. + * XXX: Adaptive chunksize not implemented yet. */ @@ -116,6 +117,7 @@ int cbuf_peek (cbuf_t cb, void *dstbuf, int len); /* * Reads up to 'len' bytes of data from 'cb' into the buffer 'dstbuf', * but does not consume the data read from the cbuf. + * The "peek" can be committed to the cbuf via a call to cbuf_drop(). * Returns the number of bytes read, or <0 on error (with errno set). */ @@ -130,9 +132,10 @@ int cbuf_replay (cbuf_t cb, void *dstbuf, int len); * Replays up to 'len' bytes of previously read data from 'cb' into the * buffer 'dstbuf'. * Returns the number of bytes replayed, or <0 on error (with errno set). + * XXX: Not implemented yet. */ -int cbuf_write (cbuf_t cb, void *srcbuf, int len, int *dropped); +int cbuf_write (cbuf_t cb, const void *srcbuf, int len, int *dropped); /* * Writes up to 'len' bytes of data from the buffer 'srcbuf' into 'cb'. * Returns the number of bytes written, or <0 on error (with errno set). @@ -144,6 +147,7 @@ int cbuf_peek_to_fd (cbuf_t cb, int dstfd, int len); * Reads up to 'len' bytes of data from 'cb' into the file referenced by the * file descriptor 'dstfd', but does not consume the data read from the * cbuf. If 'len' is -1, it will be set to cbuf_used(). + * The "peek" can be committed to the cbuf via a call to cbuf_drop(). * Returns the number of bytes read, or <0 on error (with errno set). */ @@ -160,6 +164,7 @@ int cbuf_replay_to_fd (cbuf_t cb, int dstfd, int len); * referenced by the file descriptor 'dstfd'. If 'len' is -1, it will be * set to the maximum number of bytes available for replay. * Returns the number of bytes replayed, or <0 on error (with errno set). + * XXX: Not implemented yet. */ int cbuf_write_from_fd (cbuf_t cb, int srcfd, int len, int *dropped); @@ -171,29 +176,33 @@ int cbuf_write_from_fd (cbuf_t cb, int srcfd, int len, int *dropped); * Sets 'dropped' (if not NULL) to the number of bytes of data overwritten. */ -int cbuf_gets (cbuf_t cb, char *dst, int len); +int cbuf_get_line (cbuf_t cb, char *dst, int len); /* * Reads a line of data from 'cb' into the buffer 'dst'. Reading stops after - * a newline which is also stored in the 'dst' buffer. The buffer will - * always be NUL-terminated and contain at most 'len - 1' characters. - * Returns the strlen of the line on success; truncation occurred if >= 'len'. - * Returns 0 if a newline is not found; no data is consumed in this case. - * Returns <0 on error (with errno set). + * a newline or NUL which is also written into the 'dst' buffer. The buffer + * will be NUL-terminated and contain at most 'len - 1' characters. If + * 'len' is 0, the line will be discarded from the cbuf and nothing will be + * written into the 'dst' buffer. + * Returns the line strlen on success; if >= 'len', truncation occurred and + * the excess line data was discarded. Returns 0 if a newline is not found; + * no data is consumed in this case. Returns <0 on error (with errno set). */ -int cbuf_peeks (cbuf_t cb, char *dst, int len); +int cbuf_peek_line (cbuf_t cb, char *dst, int len); /* * Reads a line of data from 'cb' into the buffer 'dst', but does not consume - * the data read from the cbuf. Reading stops after a newline which is also - * stored in the 'dst' buffer. The buffer will always be NUL-terminated and - * contain at most 'len - 1' characters. + * the data read from the cbuf. Reading stops after a newline or NUL which + * is also written into the 'dst' buffer. The buffer will be NUL-terminated + * and contain at most 'len - 1' characters. + * The "peek" can be committed to the cbuf via a call to cbuf_drop(). * Returns the strlen of the line on success; truncation occurred if >= 'len'. * Returns 0 if a newline is not found, or <0 on error (with errno set). */ -int cbuf_puts (cbuf_t cb, char *src, int *dropped); +int cbuf_put_line (cbuf_t cb, const char *src, int *dropped); /* - * Writes the NUL-terminated string 'src' into 'cb'. + * Writes the NUL-terminated string 'src' into 'cb'. A newline will be + * appended to the cbuf if 'src' does not contain a trailing newline. * Returns the number of characters written, or <0 or error (with errno set). * Sets 'dropped' (if not NULL) to the number of bytes of data overwritten. */ diff --git a/src/common/eio.c b/src/common/eio.c index 18695216be39e941bd28fa662cd9b98c9609c923..0d3048b61440c1d4bff16f798423ca7b3d5a18e4 100644 --- a/src/common/eio.c +++ b/src/common/eio.c @@ -95,7 +95,6 @@ _poll_internal(struct pollfd *pfds, unsigned int nfds) return -1; } } - verbose("poll returned %d"); return n; } diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 3b5be940bdff6059cfbae30c80ead0e514c0b415..d96cdb918685d9e9f6e3cf167d2bffb782ec8749 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -221,7 +221,8 @@ typedef struct { uid_t user_id; char *node_list; time_t expiration_time; - char signature[SLURM_SSL_SIGNATURE_LENGTH]; /* What are we going to do here? */ + char signature[SLURM_SSL_SIGNATURE_LENGTH]; + /* What are we going to do here? */ } slurm_job_credential_t; typedef struct { @@ -429,25 +430,26 @@ typedef struct last_update_msg { } last_update_msg_t; typedef struct launch_tasks_request_msg { - uint32_t job_id; - uint32_t job_step_id; - uint32_t nnodes; /* number of nodes in this job step */ - uint32_t nprocs; /* number of processes in this job step */ - uint32_t uid; - uint32_t srun_node_id; /* node id of this node (relative to job) */ + uint32_t job_id; + uint32_t job_step_id; + uint32_t nnodes; /* number of nodes in this job step */ + uint32_t nprocs; /* number of processes in this job step */ + uint32_t uid; + uint32_t srun_node_id; /* node id of this node (relative to job) */ + uint32_t tasks_to_launch; + uint16_t envc; + uint16_t argc; + char **env; + char **argv; + char *cwd; + uint16_t resp_port; + uint16_t io_port; + uint32_t *global_task_ids; slurm_job_credential_t *credential; /* job credential */ - uint32_t tasks_to_launch; - uint16_t envc; - char **env; - char *cwd; - uint16_t argc; - char **argv; - slurm_addr response_addr; - slurm_addr streams; - uint32_t *global_task_ids; + #ifdef HAVE_LIBELAN3 - qsw_jobinfo_t qsw_job; /* Elan3 switch context, opaque data structure */ + qsw_jobinfo_t qsw_job; /* Elan3 switch context */ #endif } launch_tasks_request_msg_t; diff --git a/src/common/slurm_protocol_implementation.c b/src/common/slurm_protocol_implementation.c index 990af169a622b33cbd85619d302ab4927fc6bf4b..5ff5fdd386d189d146394592e896559e4300f74a 100644 --- a/src/common/slurm_protocol_implementation.c +++ b/src/common/slurm_protocol_implementation.c @@ -1,5 +1,5 @@ #ifndef _SLURM_PROTOCOL_IMPLEMENTATION_C -#define _SLURM_PROTOCOL_IMPLEMENTATION_C_ +#define _SLURM_PROTOCOL_IMPLEMENTATION_C #if MONG_IMPLEMENTATION # include <src/common/slurm_protocol_mongo_implementation.h> diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index f5db36ef1320599102f3ea512d0f173b6de6a023..862f08eb87f104360f2f61423c13b71f6fbf9e49 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -1366,9 +1366,9 @@ int unpack_task_exit_msg ( task_exit_msg_t ** msg_ptr , Buf buffer ) void pack_launch_tasks_response_msg ( launch_tasks_response_msg_t * msg , Buf buffer ) { - pack32 ( msg -> return_code , buffer ) ; + pack32 ( msg -> return_code , buffer ) ; packstr ( msg -> node_name , buffer ) ; - pack32 ( msg -> srun_node_id , buffer ) ; + pack32 ( msg -> srun_node_id , buffer ) ; } int unpack_launch_tasks_response_msg ( launch_tasks_response_msg_t ** msg_ptr , Buf buffer ) @@ -1403,9 +1403,9 @@ void pack_launch_tasks_request_msg ( launch_tasks_request_msg_t * msg , Buf buff packstring_array ( msg -> env , msg -> envc , buffer ) ; packstr ( msg -> cwd , buffer ) ; packstring_array ( msg -> argv , msg -> argc , buffer ) ; - slurm_pack_slurm_addr ( & msg -> response_addr , buffer ) ; - slurm_pack_slurm_addr ( & msg -> streams , buffer ) ; - pack32_array ( msg -> global_task_ids , ( uint16_t ) msg -> tasks_to_launch , buffer ) ; + pack16 ( msg -> resp_port , buffer ) ; + pack16 ( msg -> io_port , buffer ) ; + pack32_array ( msg -> global_task_ids , (uint16_t) msg -> tasks_to_launch , buffer ) ; #ifdef HAVE_LIBELAN3 qsw_pack_jobinfo( msg -> qsw_job , buffer ) ; #endif @@ -1417,8 +1417,7 @@ int unpack_launch_tasks_request_msg ( launch_tasks_request_msg_t ** msg_ptr , Bu launch_tasks_request_msg_t * msg ; msg = xmalloc ( sizeof ( launch_tasks_request_msg_t ) ) ; - if (msg == NULL) - { + if (msg == NULL) { *msg_ptr = NULL ; return ENOMEM ; } @@ -1434,8 +1433,8 @@ int unpack_launch_tasks_request_msg ( launch_tasks_request_msg_t ** msg_ptr , Bu unpackstring_array ( & msg -> env , & msg -> envc , buffer ) ; unpackstr_xmalloc ( & msg -> cwd , & uint16_tmp , buffer ) ; unpackstring_array ( & msg -> argv , & msg->argc , buffer ) ; - slurm_unpack_slurm_addr_no_alloc ( & msg -> response_addr , buffer ) ; - slurm_unpack_slurm_addr_no_alloc ( & msg -> streams , buffer ) ; + unpack16 ( & msg -> resp_port , buffer ) ; + unpack16 ( & msg -> io_port , buffer ) ; unpack32_array ( & msg -> global_task_ids , & uint16_tmp , buffer ) ; #ifdef HAVE_LIBELAN3 diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index 945bd1a48adc760af74f1769b733eee2ef6ec9b2..245212014cf5b4039766f5634757a9e08f72235b 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -774,13 +774,17 @@ void _slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host } void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) { - struct hostent * host_info = gethostbyname ( host ) ; - if (host_info == NULL) { - error ("gethostbyname failure on %s", host); - slurm_address->sin_family = 0; - slurm_address->sin_port = 0; + struct hostent * host_info; + if (host != NULL) { + host_info = gethostbyname ( host ) ; + if (host_info == NULL) { + error ("gethostbyname failure on %s", host); + slurm_address->sin_family = 0; + slurm_address->sin_port = 0; + } + memcpy ( & slurm_address -> sin_addr . s_addr , + host_info -> h_addr , host_info -> h_length ) ; } - memcpy ( & slurm_address -> sin_addr . s_addr , host_info -> h_addr , host_info -> h_length ) ; slurm_address -> sin_family = AF_SLURM ; slurm_address -> sin_port = htons ( port ) ; } diff --git a/src/common/slurm_protocol_util.c b/src/common/slurm_protocol_util.c index 8538b9c1301ae96523604c9110f5ccb1efdf2e73..3e6bc1bf6d7acc43a2f77c535fc404c0cbfd41fd 100644 --- a/src/common/slurm_protocol_util.c +++ b/src/common/slurm_protocol_util.c @@ -194,7 +194,6 @@ void slurm_print_job_credential(FILE * stream, void slurm_print_launch_task_msg(launch_tasks_request_msg_t * msg) { int i; - char addrbuf[256]; debug3("job_id: %i", msg->job_id); debug3("job_step_id: %i", msg->job_step_id); debug3("uid: %i", msg->uid); @@ -209,11 +208,8 @@ void slurm_print_launch_task_msg(launch_tasks_request_msg_t * msg) for (i = 0; i < msg->argc; i++) { debug3("argv[%i]: %s", i, msg->argv[i]); } - slurm_print_slurm_addr(&msg->response_addr, addrbuf, - sizeof(addrbuf)); - debug3("msg -> response_addr = %s", addrbuf); - slurm_print_slurm_addr(&msg->streams, addrbuf, sizeof(addrbuf)); - debug3("msg -> streams = %s", addrbuf); + debug3("msg -> resp_port = %d", msg->resp_port); + debug3("msg -> io_port = %d", msg->io_port); for (i = 0; i < msg->tasks_to_launch; i++) { debug3("global_task_id[%i]: %i ", i, diff --git a/src/slurmd/Makefile.am b/src/slurmd/Makefile.am index 8e4add04636c18512639bbebd69c1ce8a7a4272c..d64594722600d17e0f9532251c28a88a25fd8383 100644 --- a/src/slurmd/Makefile.am +++ b/src/slurmd/Makefile.am @@ -26,10 +26,13 @@ slurmd_LDADD = $(LDADD) $(interconnect_lib) common_sources = \ - slurmd.c \ - mgr.c \ + slurmd.c slurmd.h \ + req.c req.h \ + mgr.c mgr.h \ get_mach_stat.c \ + get_mach_stat.h \ read_proc.c \ + read_proc.h \ job.c job.h \ io.c io.h \ semaphore.c semaphore.h \ diff --git a/src/slurmd/get_mach_stat.c b/src/slurmd/get_mach_stat.c index 25e097b6cea25bb70b54b85364b25f98a5ddaf70..058a8c5ecba367d9bffedced24e38112f52dbc60 100644 --- a/src/slurmd/get_mach_stat.c +++ b/src/slurmd/get_mach_stat.c @@ -50,6 +50,7 @@ #include <src/slurmd/get_mach_stat.h> #define BUF_SIZE 1024 +#define DEFAULT_TMP_FS "/tmp" char *get_tmp_fs_name (void); diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 3b32729cadc3ad8e4523504175b9f506de22bfca..9517e9d2437ab58147cd40e6847e59e1916beeac 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -204,7 +204,6 @@ static void * _io_thr(void *arg) { slurmd_job_t *job = (slurmd_job_t *) arg; - log_reinit(); io_handle_events(job->objs); verbose("IO handler exited"); return (void *)1; @@ -721,7 +720,7 @@ _write(io_obj_t *obj, List objs) if (io->disconnected) return 0; - verbose("Need to write %ld bytes to %s %d", + debug3("Need to write %ld bytes to %s %d", cbuf_used(io->buf), _io_str[io->type], io->id); @@ -748,7 +747,7 @@ _write(io_obj_t *obj, List objs) return -1; } - verbose("Wrote %d bytes to %s %d", + debug3("Wrote %d bytes to %s %d", n, _io_str[io->type], io->id); return 0; @@ -802,11 +801,11 @@ _task_read(io_obj_t *obj, List objs) t->id, obj->fd, errno); return -1; } - verbose("read %d bytes from %s %d", + debug3("read %d bytes from %s %d", n, _io_str[t->type], t->id); if (n == 0) { /* got eof */ - verbose("got eof on task %ld", t->id); + debug3("got eof on task %ld", t->id); _shutdown_task_obj(t); close(obj->fd); obj->fd = -1; @@ -819,7 +818,7 @@ _task_read(io_obj_t *obj, List objs) i = list_iterator_create(t->readers); while((r = list_next(i))) { n = cbuf_write(r->buf, (void *) buf, n, NULL); - verbose("wrote %ld bytes into %s buf", n, + debug3("wrote %ld bytes into %s buf", n, _io_str[r->type]); } list_iterator_destroy(i); @@ -870,7 +869,7 @@ _client_read(io_obj_t *obj, List objs) client->id); if (n == 0) { /* got eof, disconnect this client */ - verbose("client %d closed connection", client->id); + debug3("client %d closed connection", client->id); if (!client->disconnected) _io_disconnect_client(client); xassert(_validate_io_list(objs)); diff --git a/src/slurmd/job.c b/src/slurmd/job.c index 0b61cde6765cee65fd17ac05e03fc358defcd3e1..40624584da909bb47cc928540d055688266d6d3f 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/job.c @@ -33,6 +33,8 @@ # include <string.h> #endif +#include <signal.h> + #include <src/common/xmalloc.h> #include <src/common/xassert.h> #include <src/common/xstring.h> @@ -53,13 +55,20 @@ static void _job_init_task_info(slurmd_job_t *job, /* create a slurmd job structure from a launch tasks message */ slurmd_job_t * -job_create(launch_tasks_request_msg_t *msg) +job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) { slurmd_job_t *job; srun_info_t *srun; + slurm_addr resp_addr; + slurm_addr io_addr; xassert(msg != NULL); + memcpy(&resp_addr, cli_addr, sizeof(slurm_addr)); + memcpy(&io_addr, cli_addr, sizeof(slurm_addr)); + slurm_set_addr(&resp_addr, msg->resp_port, NULL); + slurm_set_addr(&io_addr, msg->io_port, NULL); + job = xmalloc(sizeof(*job)); job->jobid = msg->job_id; @@ -85,10 +94,7 @@ job_create(launch_tasks_request_msg_t *msg) job->objs = list_create((ListDelF) io_obj_destroy); - srun = srun_info_create( (void *)msg->credential->signature, - msg->response_addr, - msg->streams - ); + srun = srun_info_create((void *)msg->credential->signature, resp_addr, io_addr); job->sruns = list_create((ListDelF) _srun_info_destructor); @@ -115,10 +121,24 @@ _job_init_task_info(slurmd_job_t *job, launch_tasks_request_msg_t *msg) } } +void +job_signal_tasks(slurmd_job_t *job, int signal) +{ + int n = job->ntasks; + while (--n >= 0) { + if (kill(job->task[n]->pid, signal) < 0) { + if (errno != EEXIST) { + error("job %d.%d: kill task %d: %m", + job->jobid, job->stepid, n); + } + } + } +} + /* remove job from shared memory, kill initiated tasks, etc */ void -job_kill(slurmd_job_t *job) +job_kill(slurmd_job_t *job, int rc) { job_state_t *state; @@ -128,19 +148,19 @@ job_kill(slurmd_job_t *job) return; if (*state > SLURMD_JOB_STARTING) { - /* singnal all tasks on step->task_list + /* signal all tasks on step->task_list * This will result in task exit msgs being sent to srun * XXX IMPLEMENT */ - /* job_signal_tasks(job, SIGKILL); */ + job_signal_tasks(job, SIGKILL); } *state = SLURMD_JOB_ENDING; shm_unlock_step_state(job->jobid, job->stepid); - /* forward remaining task_exit messages? */ - /* send_exit_codes() */ + return; } + void job_destroy(slurmd_job_t *job) { @@ -259,8 +279,10 @@ job_update_shm(slurmd_job_t *job) s.sw_id = 0; - if (shm_insert_step(&s) == SLURM_FAILURE) + if (shm_insert_step(&s) < 0) error("Updating shmem with new step info: %m"); + + verbose("shm_insert job %d.%d", job->jobid, job->stepid); } void diff --git a/src/slurmd/job.h b/src/slurmd/job.h index d284c20d2ef6069e26165c2d96ec2de54c66dce9..885575b9a0c444b8a3436b14d4c33d1f6ee515dc 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/job.h @@ -101,9 +101,9 @@ typedef struct slurmd_job { } slurmd_job_t; -slurmd_job_t * job_create(launch_tasks_request_msg_t *msg); +slurmd_job_t * job_create(launch_tasks_request_msg_t *msg, slurm_addr *client); -void job_kill(slurmd_job_t *job); +void job_kill(slurmd_job_t *job, int signal); void job_destroy(slurmd_job_t *job); @@ -120,4 +120,34 @@ void job_update_shm(slurmd_job_t *job); void job_delete_shm(slurmd_job_t *job); +#define job_error(j, fmt, args...) \ + do { \ + error("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ + } while (0) + +#define job_verbose(j, fmt, args...) \ + do { \ + verbose("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ + } while (0) + +#define job_debug(j, fmt, args...) \ + do { \ + debug("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ + } while (0) + +#define job_debug2(j, fmt, args...) \ + do { \ + debug2("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ + } while (0) + +#define job_debug3(j, fmt, args...) \ + do { \ + debug3("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ + } while (0) + +#define job_info(j, fmt, args...) \ + do { \ + info("%d.%d: " fmt, (j)->jobid, (j)->stepid, ## args); \ + } while (0) + #endif /* !_JOB_H */ diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 47bfcdb88f9038c704372debb5a0660426bb455a..38e25b2023c1b26e5aa00ea1ef3ccb014e80ab18 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -56,21 +56,19 @@ static int _send_exit_msg(int rc, task_info_t *t); /* Launch a job step on this node */ int -mgr_launch_tasks(launch_tasks_request_msg_t *msg) +mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *cli) { slurmd_job_t *job; - log_reinit(); - /* New process, so we must reinit shm */ if (shm_init() < 0) return SLURM_ERROR; - if (!(job = job_create(msg))) + if (!(job = job_create(msg, cli))) return SLURM_ERROR; slurmd_run_job(job); - debug2("%ld returned from slurmd_run_job()", getpid()); + job_debug2(job, "%ld returned from slurmd_run_job()", getpid()); shm_fini(); exit(0); return 0; /* not reached */ @@ -101,7 +99,7 @@ slurmd_run_job(slurmd_job_t *job) shm_fini(); if (interconnect_init(job) == SLURM_ERROR) { - error("interconnect_init failed"); + job_error(job, "interconnect_init failed"); rc = 2; shm_init(); goto done; @@ -109,8 +107,9 @@ slurmd_run_job(slurmd_job_t *job) /* Reattach to shared memory after interconnect is initialized */ + job_verbose(job, "%ld reattaching to shm", getpid()); if (shm_init() < 0) { - error("shm_init: %m"); + job_error(job, "unable to reattach to shm: %m"); rc = 1; goto done; } @@ -120,16 +119,16 @@ slurmd_run_job(slurmd_job_t *job) */ /* Option: connect slurmd stderr to srun local task 0: stderr? */ if (io_spawn_handler(job) == SLURM_ERROR) { - error("unable to spawn io handler"); + job_error(job, "unable to spawn io handler"); rc = 3; goto done; } job_launch_tasks(job); - verbose("job %d.%d complete, waiting on IO", job->jobid, job->stepid); + job_verbose(job, "job complete, waiting on IO"); io_close_all(job); pthread_join(job->ioid, NULL); - verbose("job %d.%d IO complete", job->jobid, job->stepid); + job_verbose(job, "IO complete"); done: interconnect_fini(job); /* ignore errors */ @@ -164,7 +163,7 @@ _wait_for_all_tasks(slurmd_job_t *job) int status; pid_t pid = waitpid(0, &status, 0); if (pid < (pid_t) 0) { - error("waitpid: %m"); + job_error(job, "waitpid: %m"); /* job_cleanup() */ } for (i = 0; i < job->ntasks; i++) { @@ -249,22 +248,22 @@ job_launch_tasks(slurmd_job_t *job) pid_t sid; int i; - debug3("%ld entered job_launch_tasks", getpid()); + job_debug3(job, "%ld entered job_launch_tasks", getpid()); xsignal(SIGPIPE, SIG_IGN); - if ((sid = setsid()) < (pid_t) 0) - error("setsid: %m"); + if ((sid = setsid()) < (pid_t) 0) { + job_error(job, "setsid: %m"); + } if (shm_update_step_sid(job->jobid, job->stepid, sid) < 0) - error("shm_update_step_sid: %m"); + job_error(job, "shm_update_step_sid: %m"); - debug2("invoking %d tasks for job %d.%d", job->ntasks, job->jobid, - job->stepid); + job_debug2(job, "invoking %d tasks", job->ntasks); for (i = 0; i < job->ntasks; i++) { task_t t; - verbose("going to fork task %d", i); + job_debug2(job, "going to fork task %d", i); t.id = i; t.global_id = job->task[i]->gid; t.ppid = getpid(); @@ -280,12 +279,12 @@ job_launch_tasks(slurmd_job_t *job) job->task[i]->pid = t.pid; - debug2("%ld: forked child process %ld for task %d", + job_debug2(job, "%ld: forked child process %ld for task %d", getpid(), (long) t.pid, i); - debug2("going to add task %d to shm", i); + job_debug2(job, "going to add task %d to shm", i); if (shm_add_task(job->jobid, job->stepid, &t) < 0) - error("shm_add_task: %m"); - debug2("task %d added to shm", i); + job_error(job, "shm_add_task: %m"); + job_debug2(job, "task %d added to shm", i); } diff --git a/src/slurmd/mgr.h b/src/slurmd/mgr.h index 9c426ccea3f37bdef218c16365459bcb72cec999..d1fc62c890937627dd185e8bef81877ba4175a5a 100644 --- a/src/slurmd/mgr.h +++ b/src/slurmd/mgr.h @@ -36,7 +36,7 @@ /* Launch a job step on this node */ -int mgr_launch_tasks(launch_tasks_request_msg_t *msg); +int mgr_launch_tasks(launch_tasks_request_msg_t *msg, slurm_addr *client); /* Instance of a slurmd "job" or job step: * We run: diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index 22df98297a05be7d0017b2ddd99eb0e0290bf98b..6f11376bb115b1ad31ddc6963c9bc9a07fac17a1 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -73,23 +73,46 @@ #define SHM_LOCKNAME "/.slurm.lock" /* Increment SHM_VERSION if format changes */ -#define SHM_VERSION 0x1001 +#define SHM_VERSION 1001 + +/* These macros convert shared memory pointers to local memory + * pointers and back again. Pointers in shared memory are relative + * to the address at which the shared memory is attached + * + * These routines must be used to convert a pointer in shared memory + * back to a "real" pointer in local memory. e.g. t = _taskp(t->next) + */ +#define _laddr(p) \ + ((p) ? (((size_t)(p)) + ((size_t)slurmd_shm)) : (size_t)NULL) +#define _offset(p) \ + ((p) ? (((size_t)(p)) - ((size_t)slurmd_shm)) : (size_t)NULL) + +#define _taskp(__p) (task_t *) _laddr(__p) +#define _toff(__p) (task_t *) _offset(__p) +#define _stepp(__p) (job_step_t *) _laddr(__p) +#define _soff(__p) (job_step_t *) _offset(__p) typedef struct shmem_struct { - int version; - int users; - job_step_t step[MAX_JOB_STEPS]; - task_t task[MAX_TASKS]; + int version; + int users; + job_step_t step[MAX_JOB_STEPS]; + task_t task[MAX_TASKS]; } slurmd_shm_t; -/* static variables: */ -static sem_t *shm_lock; -static char *lockname; -static int shmid; +/* + * static variables: + * */ +static sem_t *shm_lock; +static char *lockname; static slurmd_shm_t *slurmd_shm; +static int shmid; +static pid_t attach_pid = (pid_t) 0; -/* static function prototypes: */ + +/* + * static function prototypes: + */ static int _is_valid_ipc_name(const char *name); static char *_create_ipc_name(const char *name); static int _shm_unlink_lock(void); @@ -98,18 +121,21 @@ static void _shm_lock(void); static void _shm_unlock(void); static void _shm_initialize(void); static void _shm_prepend_task_to_step(job_step_t *, task_t *); +static void _shm_prepend_task_to_step_internal(job_step_t *, task_t *); static void _shm_task_copy(task_t *, task_t *); static void _shm_step_copy(job_step_t *, job_step_t *); static void _shm_clear_task(task_t *); static void _shm_clear_step(job_step_t *); static int _shm_find_step(uint32_t, uint32_t); -static task_t * _shm_alloc_task(void); -static task_t * _shm_find_task_in_step(job_step_t *s, int taskid); +static task_t * _shm_alloc_task(void); +static task_t * _shm_find_task_in_step(job_step_t *s, int taskid); +static job_step_t * _shm_copy_step(job_step_t *j); -/* initialize shared memory: - * Attach if shared region already exists, otherwise create and attach -*/ + +/* initialize shared memory: Attach to memory if shared region + * already exists - otherwise create and attach + */ int shm_init(void) { @@ -123,7 +149,11 @@ shm_fini(void) int destroy = 0; xassert(slurmd_shm != NULL); _shm_lock(); - if (--slurmd_shm->users == 0) + + verbose("%ld calling shm_fini() on %ld", getpid(), attach_pid); + xassert(attach_pid == getpid()); + + if ((attach_pid == getpid()) && (--slurmd_shm->users == 0)) destroy = 1; /* detach segment from local memory */ @@ -158,9 +188,24 @@ shm_cleanup(void) error("sem_unlink: %m"); xfree(s); } +} - - +List +shm_get_steps(void) +{ + List l; + int i; + xassert(slurmd_shm != NULL); + l = list_create((ListDelF) &shm_free_step); + _shm_lock(); + for (i = 0; i < MAX_JOB_STEPS; i++) { + if (slurmd_shm->step[i].state > SLURMD_JOB_UNUSED) { + job_step_t *s = _shm_copy_step(&slurmd_shm->step[i]); + if (s) list_append(l, (void *) s); + } + } + _shm_unlock(); + return l; } static int @@ -215,7 +260,7 @@ _create_ipc_name(const char *name) static int _shm_unlink_lock() { - debug3("process %ld removing shm lock", getpid()); + verbose("process %ld removing shm lock", getpid()); if (sem_unlink(lockname) == -1) return 0; xfree(lockname); @@ -270,14 +315,15 @@ shm_insert_step(job_step_t *step) } for (i = 0; i < MAX_JOB_STEPS; i++) { - if (slurmd_shm->step[i].state == SLURMD_JOB_UNUSED) + if (slurmd_shm->step[i].state <= SLURMD_JOB_UNUSED) break; } if (i == MAX_JOB_STEPS) { _shm_unlock(); slurm_seterrno_ret(ENOSPC); - } else + } else { _shm_step_copy(&slurmd_shm->step[i], step); + } _shm_unlock(); return SLURM_SUCCESS; @@ -292,6 +338,7 @@ shm_delete_step(uint32_t jobid, uint32_t stepid) _shm_unlock(); slurm_seterrno_ret(ESRCH); } + debug3("shm: found step %d.%d at %d", jobid, stepid, i); _shm_clear_step(&slurmd_shm->step[i]); _shm_unlock(); return 0; @@ -317,20 +364,17 @@ shm_signal_step(uint32_t jobid, uint32_t stepid, uint32_t signal) int retval = SLURM_SUCCESS; int i; job_step_t *s; - task_t *t, *tlast; + task_t *t; _shm_lock(); if ((i = _shm_find_step(jobid, stepid)) >= 0) { s = &slurmd_shm->step[i]; - tlast = NULL; - for (t = s->task_list; t; t = t->next) { - xassert(t != tlast); + for (t = _taskp(s->task_list); t; t = _taskp(t->next)) { if (t->pid > 0 && kill(t->pid, signo) < 0) { error("kill %d.%d task %d pid %ld: %m", jobid, stepid, t->id, (long)t->pid); retval = errno; } - tlast = t; } } else retval = ESRCH; @@ -342,40 +386,50 @@ shm_signal_step(uint32_t jobid, uint32_t stepid, uint32_t signal) return SLURM_SUCCESS; } +static job_step_t * +_shm_copy_step(job_step_t *j) +{ + job_step_t *s; + task_t *t; + + s = xmalloc(sizeof(*s)); + _shm_step_copy(s, j); + + for (t = _taskp(j->task_list); t; t = _taskp(t->next)) { + task_t *u = xmalloc(sizeof(*u)); + _shm_task_copy(u, t); + _shm_prepend_task_to_step(s, u); + } + return s; +} + job_step_t * shm_get_step(uint32_t jobid, uint32_t stepid) { int i; - job_step_t *s = NULL; - task_t *t; - + job_step_t *s; _shm_lock(); - if ((i = _shm_find_step(jobid, stepid)) >= 0) { - s = xmalloc(sizeof(job_step_t)); - _shm_step_copy(s, &slurmd_shm->step[i]); - for (t = slurmd_shm->step[i].task_list; t; t = t->next) { - task_t *u = xmalloc(sizeof(task_t)); - _shm_task_copy(u, t); - _shm_prepend_task_to_step(s, u); - } - - } + if ((i = _shm_find_step(jobid, stepid)) >= 0) + s = _shm_copy_step(&slurmd_shm->step[i]); _shm_unlock(); return s; } +/* + * Free a job step structure in local memory + */ void shm_free_step(job_step_t *step) { - task_t *p, *t; - if ((t = step->task_list)) { - do { - p = t->next; - xfree(t); - } while ((t = p)); - } + task_t *p, *t = step->task_list; xfree(step); + if (!t) + return; + do { + p = t->next; + xfree(t); + } while ((t = p)); } int @@ -547,41 +601,57 @@ shm_add_task(uint32_t jobid, uint32_t stepid, task_t *task) int i; job_step_t *s; task_t *t; + xassert(task != NULL); + _shm_lock(); if ((i = _shm_find_step(jobid, stepid)) < 0) { _shm_unlock(); slurm_seterrno_ret(ESRCH); } s = &slurmd_shm->step[i]; + debug2("adding task %d to step %d.%d", task->id, jobid, stepid); + if (_shm_find_task_in_step(s, task->id)) { _shm_unlock(); slurm_seterrno_ret(EEXIST); } + if (!(t = _shm_alloc_task())) { _shm_unlock(); slurm_seterrno_ret(ENOMEM); } + _shm_task_copy(t, task); - _shm_prepend_task_to_step(s, t); + _shm_prepend_task_to_step_internal(s, t); _shm_unlock(); + return 0; } +static void +_shm_prepend_task_to_step_internal(job_step_t *s, task_t *task) +{ + task->next = (s->task_list); + s->task_list = _toff(task); + task->job_step = _soff(s); +} + static void _shm_prepend_task_to_step(job_step_t *s, task_t *task) { - task->next = s->task_list; - s->task_list = task; + task->next = s->task_list; + s->task_list = task; task->job_step = s; } + static task_t * _shm_find_task_in_step(job_step_t *s, int taskid) { task_t *t = NULL; - for (t = s->task_list; t && t->used; t = t->next) { + for (t = _taskp(s->task_list); t && t->used; t = _taskp(t->next)) { if (t->id == taskid) return t; } @@ -593,8 +663,8 @@ _shm_alloc_task(void) { int i; for (i = 0; i < MAX_TASKS; i++) { - if (!slurmd_shm->task[i].used) { - slurmd_shm->task[i].used = true; + if (slurmd_shm->task[i].used == 0) { + slurmd_shm->task[i].used = 1; return &slurmd_shm->task[i]; } } @@ -606,7 +676,7 @@ _shm_task_copy(task_t *to, task_t *from) { *to = *from; /* next and step are not valid for copying */ - to->used = true; + to->used = 1; to->next = NULL; to->job_step = NULL; } @@ -616,7 +686,9 @@ _shm_step_copy(job_step_t *to, job_step_t *from) { *to = *from; to->state = SLURMD_JOB_ALLOCATED; - to->task_list = NULL; /* addition of tasks is another step */ + + /* addition of tasks is another step */ + to->task_list = NULL; } static void @@ -628,30 +700,25 @@ _shm_clear_task(task_t *t) static void _shm_clear_step(job_step_t *s) { - task_t *p, *t = s->task_list; - for (t = s->task_list; t; t = t->next) - _shm_clear_task(t); - + task_t *p, *t = _taskp(s->task_list); memset(s, 0, sizeof(*s)); + if (!t) + return; + do { + p = _taskp(t->next); + debug3("going to clear task %d", t->id); + _shm_clear_task(t); + } while ((t = p)); } - static int _shm_create() { int oflags = IPC_CREAT | IPC_EXCL | 0600; - key_t key = ftok(".", 'a'); - - if ((shmid = shmget(key, sizeof(slurmd_shm_t), oflags)) < 0) { - if ((shmid = shmget(key, sizeof(slurmd_shm_t), 0600)) < 0) { - if (errno == EINVAL) { - error("shm_init: Existing shm invalid. " - "Please remove."); - } else - error("shmget: %m"); - return SLURM_ERROR; - } - } + key_t key = ftok(lockname, 1); + + if ((shmid = shmget(key, sizeof(slurmd_shm_t), oflags)) < 0) + return SLURM_ERROR; slurmd_shm = shmat(shmid, NULL, 0); if (slurmd_shm == (void *)-1 || slurmd_shm == NULL) { @@ -661,29 +728,30 @@ _shm_create() _shm_initialize(); - return 1; + return SLURM_SUCCESS; } static int _shm_attach() { int oflags = 0; - key_t key = ftok(".", 'a'); + key_t key = ftok(lockname, 1); if ((shmid = shmget(key, sizeof(slurmd_shm_t), oflags)) < 0) return SLURM_ERROR; slurmd_shm = shmat(shmid, NULL, 0); - if (slurmd_shm == (void *)-1 || !slurmd_shm) + if (slurmd_shm == (void *)-1 || slurmd_shm == NULL) { + error("shmat: %m"); return SLURM_ERROR; + } return SLURM_SUCCESS; } /* - * Create shared memory region if it doesn't exist, if it does exist, - * reinitialize it. - * + * Attempt to create a new shared segment. If exclusive create fails, + * attach to existing segment and reinitialize it. */ static int _shm_new() @@ -692,12 +760,18 @@ _shm_new() error("shm_attach: %m"); return SLURM_FAILURE; } - _shm_initialize(); + attach_pid = getpid(); slurmd_shm->users = 1; _shm_unlock(); return SLURM_SUCCESS; } +/* + * Reattach to existing shared memory segment. If shared segment does + * not exist, create and initialize it. + * Here we assume that create of new semaphore failed, so we attach to + * the existing semaphore. + */ static int _shm_reopen() { @@ -715,13 +789,18 @@ _shm_reopen() } - /* Lock and unlock semaphore to ensure data is initialized */ + /* + * Lock and unlock semaphore to ensure data is initialized + */ _shm_lock(); if (slurmd_shm->version != SHM_VERSION) { error("shm_init: Wrong version in shared memory"); retval = SLURM_FAILURE; - } else + } else { slurmd_shm->users++; + attach_pid = getpid(); + } + _shm_unlock(); return retval; @@ -737,13 +816,18 @@ _shm_lock_and_initialize() if (slurmd_shm && slurmd_shm->version == SHM_VERSION) { /* we've already opened shared memory */ _shm_lock(); - slurmd_shm->users++; + if (attach_pid != getpid()) { + attach_pid = getpid(); + slurmd_shm->users++; + } _shm_unlock(); return SLURM_SUCCESS; } shm_lock = _sem_open(SHM_LOCKNAME, O_CREAT|O_EXCL, S_IRUSR|S_IWUSR, 0); + debug3("Initial open of semaphore: %m"); + if (shm_lock != SEM_FAILED) /* lock didn't exist. Create shmem */ return _shm_new(); else /* lock exists. Attach to shared memory */ diff --git a/src/slurmd/shm.h b/src/slurmd/shm.h index 74eb5788c0f003abe9ef173a9d1111190d5fb279..d54a8c1e58314950ea8087b57685f6267c594812 100644 --- a/src/slurmd/shm.h +++ b/src/slurmd/shm.h @@ -48,6 +48,7 @@ #endif #include <src/common/slurm_protocol_api.h> +#include <src/common/list.h> #include <src/slurmd/job.h> @@ -113,6 +114,14 @@ int shm_fini(void); */ void shm_cleanup(void); +/* + * Returns a list of job_step_t's currently recorded in shared memory. Presumably, + * these job steps are still running or have abnormally terminated. + * + * Caller must free the resulting list with list_destroy() + */ +List shm_get_steps(void); + /* * Insert a new step into shared memory, the step passed in by address * should be filled in with the appropriate values, excepting the diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index 2436c02c4b9dbb65a90a7d7cce3b0107322bdb1a..e6f3c3f7f35e46f3755e21ad649bac31813d6f49 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -1,9 +1,10 @@ /*****************************************************************************\ - * slurmd.c - main server machine daemon for slurm + * src/slurmd/slurmd.c - main slurm node server daemon + * $Id$ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Kevin Tew <tew1@llnl.gov> et. al. + * Written by Mark Grondona <mgrondona@llnl.gov>. * UCRL-CODE-2002-040. * * This file is part of SLURM, a resource management program. @@ -24,925 +25,488 @@ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. \*****************************************************************************/ -#ifdef HAVE_CONFIG_H +#if HAVE_CONFIG_H # include <config.h> #endif -#include <errno.h> -#include <stdio.h> -#include <stdlib.h> #include <string.h> -#include <getopt.h> -#include <stdio.h> -#include <time.h> -#include <signal.h> -#include <unistd.h> #include <pthread.h> -#include <src/common/hostlist.h> -#include <src/common/parse_spec.h> +#include <sys/param.h> + +#include <src/common/log.h> #include <src/common/xmalloc.h> #include <src/common/xstring.h> -#include <src/common/list.h> #include <src/common/slurm_protocol_api.h> -#include <src/common/log.h> +#include <src/common/xsignal.h> +#include <src/common/credential_utils.h> +#include <src/common/signature_utils.h> +#include <src/common/parse_spec.h> +#include <src/common/hostlist.h> #include <src/common/fd.h> -#include <src/slurmd/batch_mgr.h> -#include <src/slurmd/get_mach_stat.h> #include <src/slurmd/slurmd.h> -#include <src/slurmd/mgr.h> +#include <src/slurmd/req.h> #include <src/slurmd/shm.h> -#include <src/common/signature_utils.h> -#include <src/common/credential_utils.h> +#include <src/slurmd/get_mach_stat.h> -#define BUF_SIZE 1024 -#define MAX_NAME_LEN 1024 -#define PTHREAD_IMPL - -/* global variables */ -typedef struct slurmd_config { - log_options_t log_opts; - char *slurm_conf; - int daemonize; - slurm_fd serverfd; -} slurmd_config_t; - -typedef struct connection_arg { - int newsockfd; -} connection_arg_t; - -time_t init_time; -pid_t slurmd_pid; -time_t shutdown_time = (time_t) 0; -char hostname[MAX_NAME_LEN]; -slurm_ssl_key_ctx_t verify_ctx; -List credential_state_list; -slurmd_config_t slurmd_conf; - -/* function prototypes */ -static char *public_cert_filename(); -inline static void reset_cwd(void); -inline static void reset_addr(slurm_msg_t * msg); -inline static char *state_save_location (void); -static void slurmd_req(slurm_msg_t * msg); -static void *slurmd_msg_engine(void *args); -inline static int send_node_registration_status_msg(); - -inline static void slurm_rpc_kill_tasks(slurm_msg_t * msg); -inline static void slurm_rpc_launch_batch_job(slurm_msg_t * msg); -inline static void slurm_rpc_launch_tasks(slurm_msg_t * msg); -inline static void slurm_rpc_ping(slurm_msg_t * msg); -inline static void slurm_rpc_reattach_tasks_streams(slurm_msg_t * msg); -inline static void slurm_rpc_revoke_credential(slurm_msg_t * msg); -inline static void slurmd_rpc_shutdown_slurmd(slurm_msg_t * msg); - -inline static int -fill_in_node_registration_status_msg(slurm_node_registration_status_msg_t * node_reg_msg); -static void *service_connection(void *arg); -static void *slurmd_handle_signals(void *args); -inline int slurmd_init(); -inline int slurmd_destroy(); - -inline static int parse_commandline_args(int argc, char **argv, - slurmd_config_t * slurmd_config); -inline static int slurmd_shutdown(); - -int main(int argc, char *argv[]) -{ - int rc; - pthread_t sigthr; - char node_name[MAX_NAME_LEN]; - log_options_t log_opts_def = LOG_OPTS_STDERR_ONLY; +#define GETOPT_ARGS "L:f:Dvhc" - init_time = time(NULL); - slurmd_conf.log_opts = log_opts_def; - slurmd_conf.daemonize = false; +#ifndef MAXHOSTNAMELEN +#define MAXHOSTNAMELEN 64 +#endif +typedef struct connection { + slurm_fd fd; + slurm_addr *cli_addr; +} conn_t; - if (parse_commandline_args(argc, argv, &slurmd_conf)) - exit (1); - log_init(argv[0], slurmd_conf.log_opts, SYSLOG_FACILITY_DAEMON, NULL); - if (slurmd_conf.daemonize == true) { - daemon(false, true); - reset_cwd(); +/* + * static shutdown and reconfigure flags: + */ +static sig_atomic_t shutdown = 0; +static sig_atomic_t reconfig = 0; + +static void _term_handler(int); +static void _hup_handler(int); +static void _process_cmdline(int ac, char **av); +static void _create_msg_socket(); +static void _tid_free(pthread_t *); +static pthread_t *_tid_copy(pthread_t *); +static void _msg_engine(); +static int _slurmd_init(); +static int _slurmd_fini(); +static void _create_conf(); +static void _init_conf(); +static void _read_config(); +static void _usage(); +static void _handle_connection(slurm_fd fd, slurm_addr *client); +static void *_service_connection(void *); + +static void _fill_registration_msg(slurm_node_registration_status_msg_t *); + + +int +main (int argc, char *argv[]) +{ + _create_conf(); + _init_conf(); + _process_cmdline(argc, argv); + log_init(argv[0], conf->log_opts, LOG_DAEMON, conf->logfile); + _read_config(); + _create_msg_socket(); + + if (conf->daemonize) { + daemon(0,0); + chdir("/tmp"); } - /* shared memory init */ - if (slurmd_init() < 0) - exit (1); - - if ((rc = getnodename(node_name, MAX_NAME_LEN))) - fatal("getnodename: %m"); + conf->pid = getpid(); + + if (_slurmd_init() < 0) + exit(1); - strncpy(hostname, node_name, MAX_NAME_LEN); + if (send_registration_msg() < 0) + error("Unable to register with slurm controller"); - /* send registration message to slurmctld */ - send_node_registration_status_msg(); + xsignal(SIGTERM, &_term_handler); + xsignal(SIGINT, &_term_handler); + xsignal(SIGHUP, &_hup_handler ); - /* create attached thread to handle signals */ - { /* XXX fix this properly */ - pthread_attr_t attr; - pthread_attr_init(&attr); - if (pthread_create(&sigthr, &attr, &slurmd_handle_signals, - (void *)NULL) != 0) - fatal("pthread_create: %m"); - } + _msg_engine(); - slurmd_msg_engine((void *)NULL); + _slurmd_fini(); - slurmd_destroy(); - return SLURM_SUCCESS; + return 0; } -void *slurmd_handle_signals(void *args) +static void +_msg_engine() { - sigset_t set; - int error_code; - int sig; - - /* just watch for select signals */ - if (sigemptyset(&set)) - error("sigemptyset error: %m"); - if (sigaddset(&set, SIGHUP)) - error("sigaddset error on SIGHUP: %m"); - if (sigaddset(&set, SIGINT)) - error("sigaddset error on SIGINT: %m"); - if (sigaddset(&set, SIGTERM)) - error("sigaddset error on SIGTERM: %m"); - if (sigaddset(&set, SIGABRT)) - error("sigaddset error on SIGABRT: %m"); - - if (sigprocmask(SIG_BLOCK, &set, NULL) != 0) - fatal("sigprocmask error: %m"); + slurm_fd sock; + slurm_addr cli; while (1) { - if ((error_code = sigwait(&set, &sig))) - error("sigwait errno %d\n", error_code); - - switch (sig) { - case SIGINT: /* kill -2 or <CTRL-C> */ - case SIGTERM: /* kill -15 */ - info("Terminate signal (SIGINT or SIGTERM) received\n"); - shutdown_time = time(NULL); - /* send REQUEST_SHUTDOWN_IMMEDIATE RPC */ - slurmd_shutdown(); - /* pthread_join(thread_id_rpc, NULL); */ - pthread_exit((void *) 0); - break; - case SIGHUP: /* kill -1 */ - info("Reconfigure signal (SIGHUP) received\n"); - if (slurmd_conf.daemonize == true) - reset_cwd(); + if (shutdown) break; - default: - error("Invalid signal (%d) received", sig); - } - } -} - -int slurmd_init() -{ - slurmd_pid = getpid(); - shm_init(); - slurm_ssl_init(); - slurm_init_verifier(&verify_ctx, public_cert_filename()); - initialize_credential_state_list(&credential_state_list); - return SLURM_SUCCESS; -} - -static char *public_cert_filename() -{ - int i, j, error_code, line_num = 0; - FILE *slurm_spec_file; /* pointer to input data file */ - char in_line[BUF_SIZE]; /* input line */ - char *job_credential_public_certificate = NULL; - - slurm_spec_file = fopen(SLURM_CONFIG_FILE, "r"); - if (slurm_spec_file == NULL) - fatal("public_cert_filename error %d opening file %s", - errno, SLURM_CONFIG_FILE); - - while (fgets(in_line, BUF_SIZE, slurm_spec_file) != NULL) { - line_num++; - if (strlen(in_line) >= (BUF_SIZE - 1)) { - fatal("public_cert_filename line %d, " - "of input file %s too long", line_num, - SLURM_CONFIG_FILE); - } - - /* everything after a non-escaped "#" is a comment */ - /* replace comment flag "#" with an end of string (NULL) */ - for (i = 0; i < BUF_SIZE; i++) { - if (in_line[i] == (char) NULL) - break; - if (in_line[i] != '#') - continue; - if ((i > 0) && (in_line[i - 1] == '\\')) { /* escaped "#" */ - for (j = i; j < BUF_SIZE; j++) { - in_line[j - 1] = in_line[j]; + again: + if ((sock = slurm_accept_msg_conn(conf->lfd, &cli)) < 0) { + if (errno == EINTR) { + if (shutdown) { + verbose("got shutdown request"); + break; } - continue; + if (reconfig) { + /* _reconfigure(); */ + verbose("got reconfigure request"); + } + goto again; } - in_line[i] = (char) NULL; - break; + error("accept: %m"); + continue; } - - /* parse what is left */ - error_code = slurm_parser(in_line, - "JobCredentialPublicCertificate=", - 's', - &job_credential_public_certificate, - "END"); - if (error_code || job_credential_public_certificate) - break; + if (sock > 0) + _handle_connection(sock, &cli); } - fclose(slurm_spec_file); - return job_credential_public_certificate; + slurm_shutdown_msg_engine(conf->lfd); + return; } -int slurmd_destroy() +static pthread_t * +_tid_copy(pthread_t *tid) { - destroy_credential_state_list(credential_state_list); - shm_fini(); - slurm_destroy_ssl_key_ctx(&verify_ctx); - slurm_ssl_destroy(); - return SLURM_SUCCESS; + pthread_t *id = xmalloc(sizeof(*id)); + *id = *tid; + return id; } -/* sends a node_registration_status_msg to the slurmctld upon boot - * announcing availibility for computation */ -int send_node_registration_status_msg() +static void +_tid_free(pthread_t *tid) { - slurm_msg_t request_msg; - slurm_msg_t response_msg; - slurm_node_registration_status_msg_t node_reg_msg; - - fill_in_node_registration_status_msg(&node_reg_msg); - - request_msg.msg_type = MESSAGE_NODE_REGISTRATION_STATUS; - request_msg.data = &node_reg_msg; - - slurm_send_recv_controller_msg(&request_msg, &response_msg); - return SLURM_SUCCESS; + xfree(tid); } -/* calls machine dependent system info calls to fill structure - * node_reg_msg - structure to fill with system info - * returns - return code - */ -int -fill_in_node_registration_status_msg(slurm_node_registration_status_msg_t * - node_reg_msg) +static void +_handle_connection(slurm_fd fd, slurm_addr *cli) { - /* fill in data structure */ - node_reg_msg->timestamp = time(NULL); - node_reg_msg->node_name = xstrdup(hostname); - - get_procs(&node_reg_msg->cpus); - get_memory(&node_reg_msg->real_memory_size); - get_tmp_disk(&node_reg_msg->temporary_disk_space); - - /* FIXME: Need to set correct count of currently running job - * steps and their ID's below */ - /* This is needed to more reliably recover from restarts of daemons */ - - node_reg_msg->job_count = 0; - node_reg_msg->job_id = NULL; - node_reg_msg->step_id = NULL; - info("Configuration name=%s cpus=%u real_memory=%u, " - "tmp_disk=%u, job_count=%u", - hostname, node_reg_msg->cpus, - node_reg_msg->real_memory_size, - node_reg_msg->temporary_disk_space, - node_reg_msg->job_count); - return SLURM_SUCCESS; -} + int rc; + pthread_attr_t attr; + pthread_t id; + conn_t *arg = xmalloc(sizeof(*arg)); -/* accept thread for incomming slurm messages - * args - do nothing right now */ -void *slurmd_msg_engine(void *args) -{ - int rc; - slurm_fd newsockfd; - slurm_fd sockfd; - slurm_addr cli_addr; - pthread_t request_thread_id; - pthread_attr_t thread_attr; + arg->fd = fd; + arg->cli_addr = cli; - if ((rc = read_slurm_port_config()) != 0) - fatal("error code %d reading config file", rc); - - if ((sockfd = slurm_init_msg_engine_port(slurm_get_slurmd_port())) - == SLURM_SOCKET_ERROR) - fatal("slurm_init_msg_engine_port: %m"); - - fd_set_close_on_exec((int) sockfd); - slurmd_conf.serverfd = sockfd; - - if ((rc = pthread_attr_init(&thread_attr))) - error("pthread_attr_init returned %d", rc); - - if ((rc = pthread_attr_setdetachstate(&thread_attr, - PTHREAD_CREATE_DETACHED))) { - error("pthread_attr_setdetachstate return %d", rc); + if ((rc = pthread_attr_init(&attr)) != 0) { + error("pthread_attr_init: %s", slurm_strerror(rc)); + return; } - while (true) { - connection_arg_t *conn_arg = - xmalloc(sizeof(connection_arg_t)); + rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (rc != 0) { + error("Unable to set detachstate on attr: %s", + slurm_strerror(rc)); + return; + } - /* accept needed for stream implementation - * is a no-op in mongo implementation that just passes - * sockfd to newsockfd - */ - if ((newsockfd = slurm_accept_msg_conn(sockfd, &cli_addr)) == - SLURM_SOCKET_ERROR) { - error("slurmd_msg_engine: accept: %m"); - continue; - } + fd_set_close_on_exec(fd); - /* receive message call that must occur before thread - * spawn because in message implementation their is no - * connection and the message is the sign of a new connection - */ - conn_arg->newsockfd = newsockfd; + rc = pthread_create(&id, &attr, &_service_connection, (void *) arg); + if (rc != 0) { + error("msg_engine: pthread_create: %s", slurm_strerror(rc)); + _service_connection((void *) &arg); + return; + } - if (shutdown_time) { - service_connection((void *) conn_arg); - break; - } + list_append(conf->threads, (void *) _tid_copy(&id)); - if ((rc = pthread_create(&request_thread_id, - &thread_attr, - service_connection, - (void *) conn_arg ))) { - /* Do without threads on failure */ - error("slurmd_msg_engine: pthread_create: %m"); - service_connection((void *) conn_arg); - } + return; +} - } - slurm_shutdown_msg_engine(sockfd); - return NULL; +static int +_find_tid(pthread_t *tid, pthread_t *key) +{ + return (*tid == *key); } -/* worker thread method for accepted message connections - * arg - a slurm_msg_t representing the accepted incomming message - * returns - nothing, void * because of pthread def - */ -void *service_connection(void *arg) +static void * +_service_connection(void *arg) { int rc; - slurm_fd newsockfd = ((connection_arg_t *) arg)->newsockfd; - slurm_msg_t *msg = NULL; - - msg = xmalloc(sizeof(slurm_msg_t)); + pthread_t tid = pthread_self(); + conn_t *con = (conn_t *) arg; + slurm_msg_t *msg = xmalloc(sizeof(*msg)); - if ((rc = slurm_receive_msg(newsockfd, msg)) == SLURM_SOCKET_ERROR) { - error("service_connection: accept: %m"); + if ((rc = slurm_receive_msg(con->fd, msg)) < 0) { + error("slurm_recieve_msg: %m"); slurm_free_msg(msg); } else { - msg->conn_fd = newsockfd; - slurmd_req(msg); /* process the request */ + msg->conn_fd = con->fd; + slurmd_req(msg, con->cli_addr); } - - /* close should only be called when the stream implementation is being used - * the following call will be a no-op in the message implementation */ - slurm_close_accepted_conn(newsockfd); /* close the new socket */ - xfree(arg); + slurm_close_accepted_conn(con->fd); + xfree(con); + list_delete_all(conf->threads, (ListFindF) _find_tid, &tid); return NULL; } -/* multiplexing message handler - * msg - incomming request message - */ -void slurmd_req(slurm_msg_t * msg) +int +send_registration_msg() { + slurm_msg_t req; + slurm_msg_t resp; + slurm_node_registration_status_msg_t msg; - switch (msg->msg_type) { - case REQUEST_BATCH_JOB_LAUNCH: - slurm_rpc_launch_batch_job(msg); - slurm_free_job_launch_msg(msg->data); - break; - case REQUEST_LAUNCH_TASKS: - slurm_rpc_launch_tasks(msg); - slurm_free_launch_tasks_request_msg(msg->data); - break; - case REQUEST_KILL_TASKS: - slurm_rpc_kill_tasks(msg); - slurm_free_kill_tasks_msg(msg->data); - break; - case REQUEST_REATTACH_TASKS_STREAMS: - slurm_rpc_reattach_tasks_streams(msg); - slurm_free_reattach_tasks_streams_msg(msg->data); - break; - case REQUEST_REVOKE_JOB_CREDENTIAL: - slurm_rpc_revoke_credential(msg); - slurm_free_revoke_credential_msg(msg->data); - break; - case REQUEST_SHUTDOWN: - case REQUEST_SHUTDOWN_IMMEDIATE: - slurmd_rpc_shutdown_slurmd(msg); - break; - case REQUEST_NODE_REGISTRATION_STATUS: - /* Treat as ping (for slurmctld agent) */ - slurm_rpc_ping(msg); - /* Then initiate a separate node registration */ - send_node_registration_status_msg(); - break; - case REQUEST_PING: - slurm_rpc_ping(msg); - break; - default: - error("slurmd_req: invalid request msg type %d\n", - msg->msg_type); - slurm_send_rc_msg(msg, EINVAL); - break; - } - slurm_free_msg(msg); -} - + _fill_registration_msg(&msg); -/******************************/ -/* rpc methods */ -/******************************/ + req.msg_type = MESSAGE_NODE_REGISTRATION_STATUS; + req.data = &msg; -static int _launch_tasks(launch_tasks_request_msg_t *req) -{ - pid_t pid; - - switch ((pid = fork())) { - case -1: - error("launch_tasks: fork: %m"); - return SLURM_ERROR; - break; - case 0: /* child runs job */ - slurm_shutdown_msg_engine(slurmd_conf.serverfd); - destroy_credential_state_list(credential_state_list); - slurm_destroy_ssl_key_ctx(&verify_ctx); - slurm_ssl_destroy(); - mgr_launch_tasks(req); - break; - default: - verbose("created process %ld for job %d.%d", - pid, req->job_id, req->job_step_id); - break; + if (slurm_send_recv_controller_msg(&req, &resp) < 0) { + error("Unable to register: %m"); + return SLURM_FAILURE; } + /* XXX look at response msg + */ + return SLURM_SUCCESS; } -/* Launches tasks */ -void slurm_rpc_launch_tasks(slurm_msg_t * msg) +static void +_fill_registration_msg(slurm_node_registration_status_msg_t *msg) { - /* init */ - int rc = SLURM_SUCCESS; - clock_t start_time; - slurm_msg_t resp_msg; - launch_tasks_response_msg_t task_resp; - launch_tasks_request_msg_t *req = - (launch_tasks_request_msg_t *) msg->data; - - start_time = clock(); - info("slurmd_req: launch tasks message received"); - slurm_print_launch_task_msg(req); - - /* do RPC call */ - /* test credentials */ - /* rc = */ verify_credential(&verify_ctx, req->credential, - credential_state_list); - - if (rc == SLURM_SUCCESS) { - reset_addr(msg); - /* slurm_print_launch_task_msg(req); */ - rc = _launch_tasks(req); - } - task_resp.node_name = hostname; - task_resp.srun_node_id = req->srun_node_id; - - resp_msg.address = req->response_addr; - resp_msg.data = &task_resp; - resp_msg.msg_type = RESPONSE_LAUNCH_TASKS; - - task_resp.return_code = rc; - - /* return result */ - if (rc) { - error("slurmd_req: launch tasks error %d", rc); - slurm_send_only_node_msg(&resp_msg); - } else { - info("slurmd_req: launch authorization completed " - "successfully, time=%ld", (long) (clock() - start_time)); - slurm_send_only_node_msg(&resp_msg); + List steps; + ListIterator i; + job_step_t *s; + int n; + + msg->node_name = conf->hostname; + + get_procs(&msg->cpus); + get_memory(&msg->real_memory_size); + get_tmp_disk(&msg->temporary_disk_space); + + steps = shm_get_steps(); + msg->job_count = list_count(steps); + msg->job_id = xmalloc(msg->job_count * sizeof(*msg->job_id)); + msg->step_id = xmalloc(msg->job_count * sizeof(*msg->step_id)); + + i = list_iterator_create(steps); + n = 0; + while ((s = list_next(i))) { + debug("found currently running job %d.%d", + s->jobid, s->stepid); + msg->job_id[n] = s->jobid; + msg->step_id[n] = s->stepid; + n++; } -} + list_iterator_destroy(i); + list_destroy(steps); -/* reset_addr - the sender does not necesarily know its communication - * path, so it just sends its hostname in the return address. For example, - * the node's name might be "lx123", but it might use the ethernet port to - * communicate. In that case, the return address should be changed to - * something like "elx123". This function will get the source of the - * transmission and reset the return addresses imbedded within the message - */ -void reset_addr(slurm_msg_t * msg) -{ - launch_tasks_request_msg_t *req = - (launch_tasks_request_msg_t *) msg->data; - slurm_addr slurm_address; - char buf[128]; + msg->timestamp = time(NULL); - if (slurm_get_peer_addr(msg->conn_fd, &slurm_address)) { - error("slurm_get_peer_addr: %m"); - return; - } - slurm_print_slurm_addr(&slurm_address, buf, 128); - debug("peer_addr(host:port)=%s",buf); - reset_slurm_addr(&req->response_addr, slurm_address); - reset_slurm_addr(&req->streams, slurm_address); return; } -/* Just respond to ping */ -void slurm_rpc_ping(slurm_msg_t * msg) +static inline int +_free_and_set(char **confvar, char *newval) { - slurm_send_rc_msg(msg, SLURM_SUCCESS); + if (newval) { + if (*confvar) + xfree(*confvar); + *confvar = newval; + return 1; + } else + return 0; } -/* Kills Launched Tasks */ -void slurm_rpc_kill_tasks(slurm_msg_t * msg) +static void +_read_config() { - int rc; - kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; - - rc = shm_signal_step(req->job_id, req->job_step_id, req->signal); - - /* return result */ - if (rc) { - error("slurmd_req: kill tasks error %d", rc); - slurm_send_rc_msg(msg, rc); - } else { - verbose("slurmd_req: kill tasks completed"); - slurm_send_rc_msg(msg, SLURM_SUCCESS); + int rc, i, j; + int line = 0; + char in[BUFSIZ]; + char *epilog, *prolog, *tmpfs, *savedir, *pubkey; + FILE *fp; + + if ((fp = fopen(conf->conffile, "r")) == NULL) { + error("Unable to open config file `%s': %m", conf->conffile); + exit(1); } -} - -void slurm_rpc_reattach_tasks_streams(slurm_msg_t * msg) -{ - /* init */ - int error_code = SLURM_SUCCESS; - clock_t start_time; - reattach_tasks_streams_msg_t *reattach_tasks_steams_msg = - (reattach_tasks_streams_msg_t *) msg->data; - - start_time = clock(); - - /* do RPC call */ - /* error_code = reattach_tasks_streams(reattach_tasks_steams_msg);*/ - - /* return result */ - if (error_code) { - error("slurmd_req: reattach streams error %d, time=%ld", - error_code, (long) (clock() - start_time)); - slurm_send_rc_msg(msg, error_code); - } else { - info("slurmd_req: reattach_streams completed successfully, time=%ld", (long) (clock() - start_time)); - slurm_send_rc_msg(msg, SLURM_SUCCESS); - } - -} - -void slurm_rpc_revoke_credential(slurm_msg_t * msg) -{ - /* init */ - int rc = SLURM_SUCCESS; - clock_t start_time; - revoke_credential_msg_t *req = (revoke_credential_msg_t *) msg->data; - - start_time = clock(); - - /* do RPC call */ - rc = revoke_credential(req, credential_state_list); - - /* return result */ - if (rc) { - error("slurmd_req: error %m errno %d, time=%ld", - rc, (long) (clock() - start_time)); - slurm_send_rc_msg(msg, errno); - } else { - info("slurmd_req: completed successfully, time=%ld", - (long) (clock() - start_time)); - slurm_send_rc_msg(msg, SLURM_SUCCESS); - } - -} - -/* slurmd_rpc_shutdown_slurmd - process RPC to shutdown slurmd */ -void slurmd_rpc_shutdown_slurmd(slurm_msg_t * msg) -{ - /* do RPC call */ - /* must be user root */ - if (shutdown_time) - debug3("slurm_rpc_shutdown_controller again"); - else { - kill(slurmd_pid, SIGTERM); /* tell master to clean-up */ - info("slurm_rpc_shutdown_controller completed successfully"); - } - - /* return result */ - slurm_send_rc_msg(msg, SLURM_SUCCESS); -} + + line = 0; + while ((fgets(in, BUFSIZ, fp))) { + line++; + if (strlen(in) == BUFSIZ - 1) { + info("Warning: %s:line %d may be too long", + conf->conffile, line); + } + /* XXX this needs work, but it is how the rest of + * slurm reads config file so we keep it the same + * for now + */ + for (i = 0; i < BUFSIZ; i++) { + if (in[i] == (char) NULL) + break; + if (in[i] != '#') + continue; + if ((i > 0) && (in[i - 1] == '\\')) { + for (j = i; j < BUFSIZ; j++) { + in[j - 1] = in[j]; + } + continue; + } + in[i] = '\0'; + break; + } -/* slurm_shutdown - issue RPC to have slurmctld shutdown, knocks loose an accept() */ -int slurmd_shutdown() -{ - int rc; - slurm_msg_t request_msg; - slurm_msg_t response_msg; - return_code_msg_t *slurm_rc_msg; - slurm_addr slurmd_addr; - - /* kill_all_tasks();*/ - - /* init message connection for message communication with controller */ - slurm_set_addr_char(&slurmd_addr, slurm_get_slurmd_port(), - "localhost"); - - /* send request message */ - request_msg.address = slurmd_addr; - request_msg.msg_type = REQUEST_SHUTDOWN_IMMEDIATE; - - if ((rc = - slurm_send_recv_node_msg(&request_msg, - &response_msg)) == - SLURM_SOCKET_ERROR) { - error("slurm_send_recv_node_only_msg error"); - return SLURM_SOCKET_ERROR; + rc = slurm_parser(in, + "Epilog=", 's', &epilog, + "Prolog=", 's', &prolog, + "TmpFS=", 's', &tmpfs, + "JobCredentialPublicCertificate=", 's', &pubkey, + "StateSaveLocation=", 's', &savedir, + "HeartbeatInterval=", 'd', &conf->hbeat, + "SlurmdPort=", 'd', &conf->port, + "END"); } - switch (response_msg.msg_type) { - case RESPONSE_SLURM_RC: - slurm_rc_msg = (return_code_msg_t *) response_msg.data; - rc = slurm_rc_msg->return_code; - slurm_free_return_code_msg(slurm_rc_msg); - if (rc) { - error("slurm_shutdown_msg_conn error (%d)", rc); - return SLURM_PROTOCOL_ERROR; - } - break; - default: - error("slurm_shutdown_msg_conn type bad (%d)", - response_msg.msg_type); - return SLURM_UNEXPECTED_MSG_ERROR; - break; + debug3("Epilog = `%s'", epilog ); + _free_and_set(&conf->epilog, epilog ); + debug3("Prolog = `%s'", prolog ); + _free_and_set(&conf->prolog, prolog ); + debug3("TmpFS = `%s'", tmpfs ); + _free_and_set(&conf->tmpfs, tmpfs ); + debug3("Public Cert = `%s'", pubkey ); + _free_and_set(&conf->pubkey, pubkey ); + debug3("Save dir = `%s'", savedir); + _free_and_set(&conf->savedir, savedir); + + if (fclose(fp) < 0) { + error("Closing slurm log file `%s': %m", conf->conffile); } - - return SLURM_PROTOCOL_SUCCESS; } -void slurm_rpc_launch_batch_job(slurm_msg_t * msg) +static void +_create_conf() { - int rc; - batch_job_launch_msg_t *req = (batch_job_launch_msg_t *) msg->data ; - - rc = SLURM_SUCCESS; /* launch_batch_job(req); */ - - if (rc) { - error("slurmd_req: error %d", rc); - slurm_send_rc_msg(msg, rc); - } else { - info("slurmd_req: completed successfully"); - slurm_send_rc_msg(msg, SLURM_SUCCESS); - } + conf = xmalloc(sizeof(*conf)); } -void slurm_rpc_slurmd_template(slurm_msg_t * msg) +static void +_init_conf() { - /* init */ - int error_code = SLURM_SUCCESS; - clock_t start_time; - /*_msg_t * _msg = ( _msg_t * ) msg->data ; */ - - start_time = clock(); + char host[MAXHOSTNAMELEN]; + log_options_t lopts = LOG_OPTS_STDERR_ONLY; - /* do RPC call */ - - /*error_code = (); */ - - /* return result */ - if (error_code) { - error("slurmd_req: error %d, time=%ld", - error_code, (long) (clock() - start_time)); - slurm_send_rc_msg(msg, error_code); - } else { - info("slurmd_req: completed successfully, time=%ld", - (long) (clock() - start_time)); - slurm_send_rc_msg(msg, SLURM_SUCCESS); + if (getnodename(host, MAXHOSTNAMELEN) < 0) { + error("Unable to get my hostname: %m"); + exit(1); } - -} - -void usage(char *prog_name) -{ - printf("%s [OPTIONS]\n", prog_name); - printf(" -e <errlev> Set stderr logging to the specified level\n"); - printf(" -f <file> Use specified configuration file name\n"); - printf(" -d daemonize\n"); - printf(" -h Print a help message describing usage\n"); - printf(" -l <errlev> Set logfile logging to the specified level\n"); - printf(" -s <errlev> Set syslog logging to the specified level\n"); - printf("<errlev> is an integer between 0 and 7 with higher numbers providing more detail.\n"); + conf->hostname = xstrdup(host); + if (read_slurm_port_config() < 0) + error("Unable to get slurmd listen port"); + conf->conffile = xstrdup(SLURM_CONFIG_FILE); + conf->port = slurm_get_slurmd_port(); + conf->savedir = NULL; + conf->pubkey = NULL; + conf->prolog = NULL; + conf->epilog = NULL; + conf->daemonize = 0; + conf->lfd = -1; + conf->log_opts = lopts; + return; } -int parse_commandline_args(int argc, char **argv, - slurmd_config_t * slurmd_config) +static void +_process_cmdline(int ac, char **av) { int c; - int digit_optind = 0; - int errlev; - opterr = 0; - - while (1) { - int this_option_optind = optind ? optind : 1; - int option_index = 0; - static struct option long_options[] = { - {"error_level", 1, 0, 'e'}, - {"help", 0, 0, 'h'}, - {"daemonize", 0, 0, 'd'}, - {"config_file", 1, 0, 'f'}, - {"log_level", 1, 0, 'l'}, - {"syslog_level", 1, 0, 's'}, - {0, 0, 0, 0} - }; - - c = getopt_long(argc, argv, "cde:hf:l:s:", long_options, - &option_index); - if (c == -1) - break; + conf->prog = xbasename(av[0]); + while ((c = getopt(ac, av, GETOPT_ARGS)) > 0) { switch (c) { - case 'e': - errlev = strtol(optarg, (char **) NULL, 10); - if ((errlev < LOG_LEVEL_QUIET) || - (errlev > LOG_LEVEL_DEBUG3)) { - fprintf(stderr, - "invalid errlev argument\n"); - usage(argv[0]); - exit(1); - } - slurmd_config->log_opts.stderr_level = errlev; + case 'D': + conf->daemonize = 0; break; - case 'd': - slurmd_config->daemonize = true; + case 'v': + conf->log_opts.stderr_level++; break; case 'h': - usage(argv[0]); + _usage(); exit(0); break; case 'f': - slurmd_config->slurm_conf = optarg; - printf("slurmctrld.slurm_conf = %s\n", - slurmd_config->slurm_conf); + conf->conffile = xstrdup(optarg); break; - case 'l': - errlev = strtol(optarg, (char **) NULL, 10); - if ((errlev < LOG_LEVEL_QUIET) || - (errlev > LOG_LEVEL_DEBUG3)) { - fprintf(stderr, - "invalid errlev argument\n"); - usage(argv[0]); - exit(1); - } - slurmd_config->log_opts.logfile_level = errlev; - break; - case 's': - errlev = strtol(optarg, (char **) NULL, 10); - if ((errlev < LOG_LEVEL_QUIET) || - (errlev > LOG_LEVEL_DEBUG3)) { - fprintf(stderr, - "invalid errlev argument\n"); - usage(argv[0]); - exit(1); - } - slurmd_config->log_opts.syslog_level = errlev; + case 'L': + conf->logfile = xstrdup(optarg); break; case 'c': shm_cleanup(); break; - case 0: - info("option %s", long_options[option_index].name); - if (optarg) { - info(" with arg %s", optarg); - } - break; - - case '0': - case '1': - case '2': - if (digit_optind != 0 - && digit_optind != this_option_optind) { - info("digits occur in two different argv-elements."); - } - digit_optind = this_option_optind; - info("option %c\n", c); - break; default: - info("unknown option %c", c); - usage(argv[0]); + _usage(c); exit(1); + break; } + } +} - if (optind < argc) { - printf("non-option ARGV-elements: "); - while (optind < argc) { - printf("%s ", argv[optind++]); - } - printf("\n"); - usage(argv[0]); - exit(1); - } + +static void +_create_msg_socket() +{ + slurm_fd ld = slurm_init_msg_engine_port(conf->port); + + if (ld < 0) { + error("Unable to bind listen port (%d): %m", conf->port); + exit(1); } + + fd_set_close_on_exec(ld); + + conf->lfd = ld; + + return; +} + + +static int +_slurmd_init() +{ + slurm_ssl_init(); + slurm_init_verifier(&conf->vctx, conf->pubkey); + initialize_credential_state_list(&conf->cred_state_list); + conf->threads = list_create((ListDelF) _tid_free); + if (shm_init() < 0) + return SLURM_FAILURE; return SLURM_SUCCESS; } -/* reset_cwd - reset the current working directory per slurm configuration file - * this makes the core file go to StateSaveLocation if a daemon */ -void -reset_cwd(void) +static int +_slurmd_fini() { - char *dir; - - dir = state_save_location (); - if (dir == NULL) - error ("No state save location specified in configuration file"); - else { - if (chdir (dir)) - error ("chdir to %s error %m", dir); - debug ("chdir %s", dir); - xfree (dir); - } + list_destroy(conf->threads); + destroy_credential_state_list(conf->cred_state_list); + slurm_destroy_ssl_key_ctx(&conf->vctx); + slurm_ssl_destroy(); + shm_fini(); + return SLURM_SUCCESS; } -/* state_save_location - returns the value of StateSaveLocation from the slurm configuration file - * NOTE: The caller must xfree the return value */ -char * -state_save_location (void) +static void +_term_handler(int signum) { - FILE *slurm_spec_file; - char in_line[BUF_SIZE]; /* input line */ - char *dir = NULL; - int i, j, error_code, line_num = 0; - - slurm_spec_file = fopen (SLURM_CONFIG_FILE, "r"); - if (slurm_spec_file == NULL) { - error ( "state_save_location error %d opening file %s: %m", - errno, SLURM_CONFIG_FILE); - return NULL ; - } + if (signum == SIGTERM || signum == SIGINT) + shutdown = 1; +} - while (fgets (in_line, BUF_SIZE, slurm_spec_file) != NULL) { - line_num++; - if (strlen (in_line) >= (BUF_SIZE - 1)) { - error ("state_save_location line %d, of input file %s too long\n", - line_num, SLURM_CONFIG_FILE); - fclose (slurm_spec_file); - return NULL; - } - - /* everything after a non-escaped "#" is a comment */ - /* replace comment flag "#" with an end of string (NULL) */ - for (i = 0; i < BUF_SIZE; i++) { - if (in_line[i] == (char) NULL) - break; - if (in_line[i] != '#') - continue; - if ((i > 0) && (in_line[i - 1] == '\\')) { /* escaped "#" */ - for (j = i; j < BUF_SIZE; j++) { - in_line[j - 1] = in_line[j]; - } - continue; - } - in_line[i] = (char) NULL; - break; - } - - /* parse what is left */ - /* overall slurm configuration parameters */ - error_code = slurm_parser(in_line, - "StateSaveLocation=", 's', &dir, - "END"); - if (error_code) { - error ("error parsing configuration file input line %d", line_num); - fclose (slurm_spec_file); - return NULL; - } - - if ( dir ) { - fclose (slurm_spec_file); - return dir; - } - } - return NULL; +static void +_hup_handler(int signum) +{ + if (signum == SIGHUP) + reconfig = 1; +} + + +static void +_usage() +{ + fprintf(stderr, "Usage: %s [OPTIONS]\n", conf->prog); + fprintf(stderr, " -f file " + "\tUse `file' as slurmd config file.\n"); + fprintf(stderr, " -L logfile " + "\tLog messages to the file `logfile'\n"); + fprintf(stderr, " -v " + "\tVerbose mode. Multiple -v's increase verbosity.\n"); + fprintf(stderr, " -D " + "\tRun daemon in forground.\n"); + fprintf(stderr, " -c " + "\tForce cleanup of slurmd shared memory.\n"); + fprintf(stderr, " -h " + "\tPrint this help message.\n"); } diff --git a/src/slurmd/slurmd.h b/src/slurmd/slurmd.h index 752e4d4f11341ba5786f5b60a80b5f44bfb832af..fae5892e2b44d34410f9e188e7e6dfead88b1387 100644 --- a/src/slurmd/slurmd.h +++ b/src/slurmd/slurmd.h @@ -1,3 +1,29 @@ +/*****************************************************************************\ + * src/slurmd/slurmd.h - header for slurmd + * $Id$ + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ #ifndef _SLURMD_H #define _SLURMD_H @@ -14,4 +40,39 @@ # include <inttypes.h> #endif /* HAVE_CONFIG_H */ +#include <src/common/log.h> + +/* + * Global config type + */ +typedef slurm_ssl_key_ctx_t slurm_ssl_ctx; +typedef struct slurmd_config { + char *prog; /* Program basename */ + char *hostname; /* local hostname */ + char *conffile; /* config filename */ + char *logfile; /* slurmd logfile, if any */ + char *savedir; /* SaveStateLocation */ + char *nodename; /* this node's hostname */ + char *tmpfs; /* directory of tmp FS */ + char *pubkey; /* location of job cred public key */ + char *epilog; /* Path to Epilog script */ + char *prolog; /* Path to prolog script */ + int port; /* local slurmd port */ + int hbeat; /* heartbeat interval */ + slurm_fd lfd; /* slurmd listen file descriptor */ + pid_t pid; /* server pid */ + log_options_t log_opts; /* current logging options */ + int daemonize:1; /* daemonize flag */ + + List cred_state_list; /* credential stat list */ + List threads; /* list of active threads */ + slurm_ssl_ctx vctx; /* ssl context for cred utils */ +} slurmd_conf_t; + +slurmd_conf_t * conf; + +/* Send node registration message to controller + */ +int send_registration_msg(void); + #endif /* !_SLURMD_H */ diff --git a/src/srun/job.c b/src/srun/job.c index 5941e6caee9493a874e1b640e478d238bfe43f2d..f1edb8fc63a7f3ae4e3411651826f4cd9cd5a592 100644 --- a/src/srun/job.c +++ b/src/srun/job.c @@ -67,7 +67,8 @@ job_create(resource_allocation_response_msg_t *resp) job->nodelist = xstrdup(opt.nodelist); debug("nodelist=%s",job->nodelist); hl = hostlist_create(opt.nodelist); - job->jobid = 1; + srand48(getpid()); + job->jobid = (uint32_t) (lrand48() % 65550L + 1L); ncpu = 1; if (opt.nprocs <= 1) opt.nprocs = hostlist_count(hl); diff --git a/src/srun/job.h b/src/srun/job.h index 9048392aecbce71825fc18595c9a52ea7db059b3..78a86c5be4e52c6d7752dda23cdd26ca5633fab9 100644 --- a/src/srun/job.h +++ b/src/srun/job.h @@ -56,7 +56,7 @@ typedef struct srun_job { pthread_t jtid; /* job control thread id */ int njfds; /* number of job control info fds */ slurm_fd *jfd; /* job control info fd */ - slurm_addr *jaddr; /* job control info port */ + slurm_addr *jaddr; /* job control info ports */ pthread_t ioid; /* stdio thread id */ int niofds; /* Number of IO fds */ diff --git a/src/srun/launch.c b/src/srun/launch.c index 4773babae99b04a84ea0a78b4e96c4987dd66ac1..fd11c1d22110e7ba2d432bdaafd91427004fc9e6 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -73,7 +73,7 @@ launch(void *arg) if (gethostname(hostname, MAXHOSTNAMELEN) < 0) error("gethostname: %m"); - debug("going to launch %d tasks on %d hosts\n", opt.nprocs, job->nhosts); + debug("going to launch %d tasks on %d hosts", opt.nprocs, job->nhosts); /* thr = (launch_thr_t *) xmalloc(opt.nprocs * sizeof(*thr)); */ @@ -122,17 +122,12 @@ launch(void *arg) } for (i = 0; i < job->nhosts; i++) { - unsigned short port; msg.tasks_to_launch = job->ntask[i]; msg.global_task_ids = task_ids[i]; - msg.srun_node_id = (uint32_t)i; - - port = ntohs(job->ioport[i%job->niofds]); - slurm_set_addr_char(&msg.streams, port, hostname); - - port = ntohs(job->jaddr[i%job->njfds].sin_port); - slurm_set_addr_char(&msg.response_addr, port, hostname); + msg.srun_node_id = (uint32_t)i; + msg.io_port = ntohs(job->ioport[i%job->niofds]); + msg.resp_port = ntohs(job->jaddr[i%job->njfds].sin_port); memcpy(&req.address, &job->slurmd_addr[i], sizeof(slurm_addr)); @@ -142,12 +137,11 @@ launch(void *arg) error("%s: %m", job->host[i]); job->host_state[i] = SRUN_HOST_UNREACHABLE; } + xfree(task_ids[i]); - xfree(msg.global_task_ids); } xfree(task_ids); - update_job_state(job, SRUN_JOB_STARTING); return(void *)(0); @@ -158,14 +152,15 @@ launch(void *arg) static void print_launch_msg(launch_tasks_request_msg_t *msg) { int i; - debug("jobid = %ld" , msg->job_id); - debug("stepid = %ld" , msg->job_step_id); - debug("uid = %ld" , msg->uid); - debug("ntasks = %ld" , msg->tasks_to_launch); - debug("envc = %d" , msg->envc); - debug("cwd = `%s'", msg->cwd); - for (i = 0; i < msg->tasks_to_launch; i++) - debug("global_task_id[%d] = %d\n", i, msg->global_task_ids[i]); + char buf[4096]; + int len = 0; + + len = snprintf(buf, 4096, "%d.%d uid:%ld n:%ld `%s' %d [%d-%d]", + msg->job_id, msg->job_step_id, msg->uid, + msg->tasks_to_launch, msg->cwd, msg->srun_node_id, + msg->global_task_ids[0], + msg->global_task_ids[msg->tasks_to_launch-1]); + debug3("%s", buf); } static int diff --git a/src/srun/net.c b/src/srun/net.c index 4881eba31f653eb7890acb5d30ebc2cd91eb9691..56c05205d36c580e3c75edf21759fd604c09e633 100644 --- a/src/srun/net.c +++ b/src/srun/net.c @@ -118,7 +118,8 @@ int readn(int fd, void *buf, size_t nbytes) size_t nleft = nbytes; while (nleft > 0) { - if ((n = read(fd, (void *)pbuf, nleft)) < 0 && (errno != EINTR)) { + if ((n = read(fd, (void *)pbuf, nleft)) < 0 + && (errno != EINTR)) { /* eof */ return(0); } @@ -128,3 +129,13 @@ int readn(int fd, void *buf, size_t nbytes) return(n); } +int net_set_low_water(int sock, size_t size) +{ + if (setsockopt(sock, SOL_SOCKET, SO_RCVLOWAT, + (const void *) &size, sizeof(size)) < 0) { + error("Unable to set low water socket option: %m"); + return -1; + } + + return 0; +} diff --git a/src/srun/net.h b/src/srun/net.h index c5cec16b6ab356ec8358e185a6e2e94818c57c09..9f023b3868de4b3170473f43e2b996cbcd914aca 100644 --- a/src/srun/net.h +++ b/src/srun/net.h @@ -12,5 +12,9 @@ int net_stream_listen(int *fd, int *port); */ int net_accept_stream(int fd); +/* set low water mark on socket + */ +int net_set_low_water(int sock, size_t size); + #endif /* !_NET_H */ diff --git a/src/srun/srun.c b/src/srun/srun.c index 52cc1bd4cd6ee030a2186ec8d617daafc3297db9..0209d8c8b151209cc9bb6a08c578520c44db9af5 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -193,6 +193,7 @@ main(int ac, char **av) fatal("unable to initialize stdio server port: %m"); debug("initialized stdio server port %d\n", ntohs(job->ioport[i])); + net_set_low_water(job->iofd[i], 140); } pthread_attr_init(&attr); @@ -245,7 +246,7 @@ main(int ac, char **av) } /* kill signal thread */ - pthread_kill(job->sigid, SIGTERM); + pthread_cancel(job->sigid); /* wait for stdio */ n = 0; @@ -476,11 +477,13 @@ sig_thr(void *arg) int signo; while (1) { - sigemptyset(&set); - sigaddset(&set, SIGABRT); - sigaddset(&set, SIGSEGV); - sigaddset(&set, SIGQUIT); - sigaddset(&set, SIGINT); + sigfillset(&set); + sigdelset(&set, SIGABRT); + sigdelset(&set, SIGSEGV); + sigdelset(&set, SIGQUIT); + sigdelset(&set, SIGUSR1); + sigdelset(&set, SIGUSR2); + pthread_sigmask(SIG_BLOCK, &set, NULL); sigwait(&set, &signo); debug2("recvd signal %d", signo); switch (signo) {