diff --git a/src/common/global_srun.c b/src/common/global_srun.c new file mode 100644 index 0000000000000000000000000000000000000000..5af987e38e9607bbb238fc4eba5f3e2a80572567 --- /dev/null +++ b/src/common/global_srun.c @@ -0,0 +1,221 @@ +/*****************************************************************************\ + * src/common/global_srun.c - functions needed by more than just srun + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <mgrondona@llnl.gov>, and + * Moe Jette <jette@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if HAVE_PTHREAD +#include <pthread.h> +#endif + +#include <signal.h> +#include <string.h> + +#include <slurm/slurm_errno.h> + +#include "src/common/log.h" +#include "src/common/macros.h" +#include "src/common/slurm_protocol_api.h" +#include "src/common/slurm_protocol_defs.h" +#include "src/common/xmalloc.h" +#include "src/common/xsignal.h" +#include "src/common/global_srun.h" + +/* + * Static list of signals to block in srun: + */ +static int srun_sigarray[] = { + SIGINT, SIGQUIT, SIGTSTP, SIGCONT, SIGTERM, + SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, 0 +}; + +/* number of active threads */ +static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER; +static int active = 0; + +typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; + +typedef struct thd { + pthread_t thread; /* thread ID */ + pthread_attr_t attr; /* thread attributes */ + state_t state; /* thread state */ +} thd_t; + +typedef struct task_info { + slurm_msg_t *req_ptr; + srun_job_t *job_ptr; + int host_inx; +} task_info_t; + + +/* + * Static prototypes + */ +static void _p_fwd_signal(slurm_msg_t *, srun_job_t *); +static void * _p_signal_task(void *); + +void +fwd_signal(srun_job_t *job, int signo) +{ + int i; + slurm_msg_t *req; + kill_tasks_msg_t msg; + static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER; + + slurm_mutex_lock(&sig_mutex); + + if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) { + slurm_mutex_lock(&job->state_mutex); + job->signaled = true; + slurm_mutex_unlock(&job->state_mutex); + } + + debug2("forward signal %d to job", signo); + + /* common to all tasks */ + msg.job_id = job->jobid; + msg.job_step_id = job->stepid; + msg.signal = (uint32_t) signo; + + req = xmalloc(sizeof(slurm_msg_t) * job->nhosts); + + for (i = 0; i < job->nhosts; i++) { + if (job->host_state[i] != SRUN_HOST_REPLIED) { + debug2("%s has not yet replied\n", job->host[i]); + continue; + } + + if (job_active_tasks_on_host(job, i) == 0) + continue; + + req[i].msg_type = REQUEST_KILL_TASKS; + req[i].data = &msg; + memcpy( &req[i].address, + &job->slurmd_addr[i], sizeof(slurm_addr)); + } + + _p_fwd_signal(req, job); + + debug2("All tasks have been signalled"); + xfree(req); + slurm_mutex_unlock(&sig_mutex); +} + +int +job_active_tasks_on_host(srun_job_t *job, int hostid) +{ + int i; + int retval = 0; + + slurm_mutex_lock(&job->task_mutex); + for (i = 0; i < job->ntask[hostid]; i++) { + uint32_t tid = job->tids[hostid][i]; + if (job->task_state[tid] == SRUN_TASK_RUNNING) + retval++; + } + slurm_mutex_unlock(&job->task_mutex); + return retval; +} + +/* _p_fwd_signal - parallel (multi-threaded) task signaller */ +static void _p_fwd_signal(slurm_msg_t *req, srun_job_t *job) +{ + int i; + task_info_t *tinfo; + thd_t *thd; + + thd = xmalloc(job->nhosts * sizeof (thd_t)); + for (i = 0; i < job->nhosts; i++) { + if (req[i].msg_type == 0) + continue; /* inactive task */ + + slurm_mutex_lock(&active_mutex); + while (active >= opt.max_threads) { + pthread_cond_wait(&active_cond, &active_mutex); + } + active++; + slurm_mutex_unlock(&active_mutex); + + tinfo = (task_info_t *)xmalloc(sizeof(task_info_t)); + tinfo->req_ptr = &req[i]; + tinfo->job_ptr = job; + tinfo->host_inx = i; + + slurm_attr_init(&thd[i].attr); + if (pthread_attr_setdetachstate(&thd[i].attr, + PTHREAD_CREATE_DETACHED)) + error ("pthread_attr_setdetachstate failed"); + if (pthread_create( &thd[i].thread, &thd[i].attr, + _p_signal_task, (void *) tinfo )) { + error ("pthread_create failed"); + _p_signal_task((void *) tinfo); + } + } + + + slurm_mutex_lock(&active_mutex); + while (active > 0) { + pthread_cond_wait(&active_cond, &active_mutex); + } + slurm_mutex_unlock(&active_mutex); + xfree(thd); +} + +/* _p_signal_task - parallelized signal of a specific task */ +static void * _p_signal_task(void *args) +{ + int rc = SLURM_SUCCESS; + task_info_t *info = (task_info_t *)args; + slurm_msg_t *req = info->req_ptr; + srun_job_t *job = info->job_ptr; + char *host = job->host[info->host_inx]; + + debug3("sending signal to host %s", host); + if (slurm_send_recv_rc_msg(req, &rc, 0) < 0) { + error("%s: signal: %m", host); + goto done; + } + + /* + * Report error unless it is "Invalid job id" which + * probably just means the tasks exited in the meanwhile. + */ + if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) && (rc != ESRCH)) + error("%s: signal: %s", host, slurm_strerror(rc)); + + done: + slurm_mutex_lock(&active_mutex); + active--; + pthread_cond_signal(&active_cond); + slurm_mutex_unlock(&active_mutex); + xfree(args); + return NULL; +} + + diff --git a/src/common/global_srun.h b/src/common/global_srun.h new file mode 100644 index 0000000000000000000000000000000000000000..3607d68444c237d520fd3909b4e3a76351ab5473 --- /dev/null +++ b/src/common/global_srun.h @@ -0,0 +1,35 @@ +/*****************************************************************************\ + * src/common/global_srun.c - functions needed by more than just srun + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grodnona <mgrondona@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _GLOBAL_SRUN_H +#define _GLOBAL_SRUN_H + +#include "src/srun/srun_job.h" + +void fwd_signal(srun_job_t *job, int signal); +int job_active_tasks_on_host(srun_job_t *job, int hostid); + +#endif /* !_GLOBAL_SRUN_H */ diff --git a/src/common/mpi.c b/src/common/mpi.c new file mode 100644 index 0000000000000000000000000000000000000000..178bc735ceb21aecf12c53e894aeb901545908fc --- /dev/null +++ b/src/common/mpi.c @@ -0,0 +1,272 @@ +/*****************************************************************************\ + * src/common/mpi.c - Generic mpi selector for slurm + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <grondo1@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include "src/common/macros.h" +#include "src/common/plugin.h" +#include "src/common/plugrack.h" +#include "src/common/env.h" +#include "src/common/mpi.h" +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" + +#define MPI_DEFAULT "mpich-gm" + +/* + * WARNING: Do not change the order of these fields or add additional + * fields at the beginning of the structure. If you do, job completion + * logging plugins will stop working. If you need to add fields, add them + * at the end of the structure. + */ +typedef struct slurm_mpi_ops { + int (*init) (slurmd_job_t *job, int rank); + int (*create_thread) (srun_job_t *job); + int (*single_task) (void); + int (*exit) (void); +} slurm_mpi_ops_t; + +struct slurm_mpi_context { + char * mpi_type; + plugrack_t plugin_list; + plugin_handle_t cur_plugin; + int mpi_errno; + slurm_mpi_ops_t ops; +}; + +static slurm_mpi_context_t g_context = NULL; +static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER; + +static slurm_mpi_context_t +_slurm_mpi_context_create(const char *mpi_type) +{ + slurm_mpi_context_t c; + + if ( mpi_type == NULL ) { + debug3( "_slurm_mpi_context_create: no mpi type" ); + return NULL; + } + + c = xmalloc(sizeof(struct slurm_mpi_context)); + + c->mpi_errno = SLURM_SUCCESS; + + /* Copy the job completion authentication type. */ + c->mpi_type = xstrdup(mpi_type); + if (c->mpi_type == NULL ) { + debug3( "can't make local copy of mpi type" ); + xfree(c); + return NULL; + } + + /* Plugin rack is demand-loaded on first reference. */ + c->plugin_list = NULL; + c->cur_plugin = PLUGIN_INVALID_HANDLE; + + return c; +} + +static int +_slurm_mpi_context_destroy( slurm_mpi_context_t c ) +{ + /* + * Must check return code here because plugins might still + * be loaded and active. + */ + if ( c->plugin_list ) { + if ( plugrack_destroy( c->plugin_list ) != SLURM_SUCCESS ) { + return SLURM_ERROR; + } + } + + xfree(c->mpi_type); + xfree(c); + + return SLURM_SUCCESS; +} + +/* + * Resolve the operations from the plugin. + */ +static slurm_mpi_ops_t * +_slurm_mpi_get_ops( slurm_mpi_context_t c ) +{ + /* + * These strings must be kept in the same order as the fields + * declared for slurm_mpi_ops_t. + */ + static const char *syms[] = { + "mpi_p_init", + "mpi_p_thr_create", + "mpi_p_single_task", + "mpi_p_exit" + }; + int n_syms = sizeof( syms ) / sizeof( char * ); + char *plugin_dir = NULL; + + /* Get the plugin list, if needed. */ + if ( c->plugin_list == NULL ) { + c->plugin_list = plugrack_create(); + if ( c->plugin_list == NULL ) { + error("Unable to create a plugin manager"); + return NULL; + } + + plugrack_set_major_type(c->plugin_list, "mpi"); + plugrack_set_paranoia(c->plugin_list, + PLUGRACK_PARANOIA_NONE, + 0); + plugin_dir = slurm_get_plugin_dir(); + plugrack_read_dir(c->plugin_list, plugin_dir); + xfree(plugin_dir); + } + + if (strcasecmp (c->mpi_type, "mpi/list") == 0) { + plugrack_print_all_plugin(c->plugin_list); + exit(0); + } else { + /* Find the correct plugin. */ + c->cur_plugin = plugrack_use_by_type(c->plugin_list, + c->mpi_type); + if ( c->cur_plugin == PLUGIN_INVALID_HANDLE ) { + error("can't find a valid plugin for type %s", + c->mpi_type); + return NULL; + } + } + + /* Dereference the API. */ + if ( plugin_get_syms( c->cur_plugin, + n_syms, + syms, + (void **) &c->ops ) < n_syms ) { + error( "incomplete mpi plugin detected" ); + return NULL; + } + + return &c->ops; +} + +int _mpi_init (char *mpi_type) +{ + int retval = SLURM_SUCCESS; + char *full_type = NULL; + slurm_mutex_lock( &context_lock ); + + if ( g_context ) + goto done; + + if (mpi_type == NULL) + mpi_type = MPI_DEFAULT; + + setenvf (NULL, "SLURM_MPI_TYPE", "%s", mpi_type); + + full_type = xmalloc(sizeof(char) * (strlen(mpi_type)+5)); + sprintf(full_type,"mpi/%s\0",mpi_type); + + g_context = _slurm_mpi_context_create(full_type); + xfree(full_type); + if ( g_context == NULL ) { + error( "cannot create a context for %s", mpi_type); + retval = SLURM_ERROR; + goto done; + } + + if ( _slurm_mpi_get_ops( g_context ) == NULL ) { + error( "cannot resolve plugin operations for %s", mpi_type); + _slurm_mpi_context_destroy( g_context ); + g_context = NULL; + retval = SLURM_ERROR; + } + + +done: + slurm_mutex_unlock( &context_lock ); + return retval; +} + +int srun_mpi_init (char *mpi_type) +{ + debug("mpi type = %s", mpi_type); + + if(_mpi_init(mpi_type) == SLURM_ERROR) + return SLURM_ERROR; + + return SLURM_SUCCESS; +} + + +int slurmd_mpi_init (slurmd_job_t *job, int rank) +{ + char *mpi_type = getenvp (job->env, "SLURM_MPI_TYPE"); + + debug("mpi type = %s", mpi_type); + + if(_mpi_init(mpi_type) == SLURM_ERROR) + return SLURM_ERROR; + + unsetenvp (job->env, "SLURM_MPI_TYPE"); + return (*(g_context->ops.init))(job, rank); +} + +int mpi_fini (void) +{ + int rc; + + if (!g_context) + return SLURM_SUCCESS; + + rc = _slurm_mpi_context_destroy(g_context); + return rc; +} + +int slurm_mpi_thr_create(srun_job_t *job) +{ + if (_mpi_init(NULL) < 0) + return SLURM_ERROR; + + return (*(g_context->ops.create_thread))(job); +} + +int slurm_mpi_single_task_per_node (void) +{ + if (_mpi_init(NULL) < 0) + return SLURM_ERROR; + + return (*(g_context->ops.single_task))(); +} + +int slurm_mpi_exit (void) +{ + if (_mpi_init(NULL) < 0) + return SLURM_ERROR; + + return (*(g_context->ops.exit))(); +} + + diff --git a/src/common/mpi.h b/src/common/mpi.h new file mode 100644 index 0000000000000000000000000000000000000000..cce49e86f0e3a35df618db16bc7fce1b750e5276 --- /dev/null +++ b/src/common/mpi.h @@ -0,0 +1,48 @@ +/*****************************************************************************\ + * src/common/mpi.h - Generic mpi selector for slurm + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <grondo1@llnl.gov>. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifndef _SRUN_MPI_H +#define _SRUN_MPI_H + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include "src/srun/opt.h" +#include "src/srun/srun_job.h" +#include "src/slurmd/slurmd_job.h" + +typedef struct slurm_mpi_context *slurm_mpi_context_t; + +int srun_mpi_init (char *mpi_type); +int slurmd_mpi_init (slurmd_job_t *job, int rank); +int mpi_fini (void); +int slurm_mpi_thr_create(srun_job_t *job); +int slurm_mpi_single_task_per_node (void); +int slurm_mpi_exit (void); + + +#endif /* !_SRUN_MPI_H */ diff --git a/src/common/net.c b/src/common/net.c new file mode 100644 index 0000000000000000000000000000000000000000..32e8b1f1703c0ef8e6e68bdc1646326fd695a4c6 --- /dev/null +++ b/src/common/net.c @@ -0,0 +1,147 @@ +/*****************************************************************************\ + * net.c - basic network communications for user application I/O + ***************************************************************************** + * Copyright (C) 2002 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Mark Grondona <grondona1@llnl.gov>, Kevin Tew <tew1@llnl.gov>, + * et. al. + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/select.h> +#include <sys/time.h> +#include <fcntl.h> +#include <unistd.h> +#include <stdio.h> +#include <string.h> +#include <errno.h> + +#include "src/common/log.h" +#include "src/common/net.h" + +#ifndef NET_DEFAULT_BACKLOG +# define NET_DEFAULT_BACKLOG 1024 +#endif + +static int _sock_bind_wild(int sockfd) +{ + socklen_t len; + struct sockaddr_in sin; + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = htonl(INADDR_ANY); + sin.sin_port = htons(0); /* bind ephemeral port */ + + if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0) + return (-1); + len = sizeof(sin); + if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0) + return (-1); + return (sin.sin_port); +} + + + +int net_stream_listen(int *fd, int *port) +{ + int rc, val; + + if ((*fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + return -1; + + val = 1; + rc = setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int)); + if (rc > 0) + goto cleanup; + + *port = _sock_bind_wild(*fd); + if (*port < 0) + goto cleanup; +#undef SOMAXCONN +#define SOMAXCONN 1024 + rc = listen(*fd, NET_DEFAULT_BACKLOG); + if (rc < 0) + goto cleanup; + + return 1; + + cleanup: + close(*fd); + return -1; +} + + +int accept_stream(int fd) +{ + int sd; + + while ((sd = accept(fd, NULL, NULL)) < 0) { + if (errno == EINTR) + continue; + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) + return -1; + if (errno == ECONNABORTED) + return -1; + error("Unable to accept new connection"); + } + + return sd; +} + + +int readn(int fd, void *buf, size_t nbytes) +{ + int n = 0; + char *pbuf = (char *)buf; + size_t nleft = nbytes; + + while (nleft > 0) { + n = read(fd, (void *)pbuf, nleft); + if (n > 0) { + pbuf+=n; + nleft-=n; + } else if (n == 0) /* EOF */ + break; + else if (errno == EINTR) + continue; + else { + debug("read error: %m"); + break; + } + } + return(n); +} + +int net_set_low_water(int sock, size_t size) +{ + if (setsockopt(sock, SOL_SOCKET, SO_RCVLOWAT, + (const void *) &size, sizeof(size)) < 0) { + error("Unable to set low water socket option: %m"); + return -1; + } + + return 0; +} diff --git a/src/common/net.h b/src/common/net.h new file mode 100644 index 0000000000000000000000000000000000000000..9f023b3868de4b3170473f43e2b996cbcd914aca --- /dev/null +++ b/src/common/net.h @@ -0,0 +1,20 @@ + +#ifndef _NET_H +#define _NET_H + +/* open a stream socket on an ephemereal port and put it into + * the listen state. fd and port are filled in with the new + * socket's file descriptor and port #. + */ +int net_stream_listen(int *fd, int *port); + +/* accept the incoming connection on the stream socket fd + */ +int net_accept_stream(int fd); + +/* set low water mark on socket + */ +int net_set_low_water(int sock, size_t size); + + +#endif /* !_NET_H */