Skip to content
Snippets Groups Projects
Commit a9cdb939 authored by Danny Auble's avatar Danny Auble
Browse files

more files added for mpi.

parent bb6b3761
No related branches found
No related tags found
No related merge requests found
/*****************************************************************************\
* src/common/global_srun.c - functions needed by more than just srun
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <mgrondona@llnl.gov>, and
* Moe Jette <jette@llnl.gov>
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#if HAVE_PTHREAD
#include <pthread.h>
#endif
#include <signal.h>
#include <string.h>
#include <slurm/slurm_errno.h>
#include "src/common/log.h"
#include "src/common/macros.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/xmalloc.h"
#include "src/common/xsignal.h"
#include "src/common/global_srun.h"
/*
* Static list of signals to block in srun:
*/
static int srun_sigarray[] = {
SIGINT, SIGQUIT, SIGTSTP, SIGCONT, SIGTERM,
SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, 0
};
/* number of active threads */
static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t active_cond = PTHREAD_COND_INITIALIZER;
static int active = 0;
typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;
typedef struct thd {
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
state_t state; /* thread state */
} thd_t;
typedef struct task_info {
slurm_msg_t *req_ptr;
srun_job_t *job_ptr;
int host_inx;
} task_info_t;
/*
* Static prototypes
*/
static void _p_fwd_signal(slurm_msg_t *, srun_job_t *);
static void * _p_signal_task(void *);
void
fwd_signal(srun_job_t *job, int signo)
{
int i;
slurm_msg_t *req;
kill_tasks_msg_t msg;
static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER;
slurm_mutex_lock(&sig_mutex);
if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) {
slurm_mutex_lock(&job->state_mutex);
job->signaled = true;
slurm_mutex_unlock(&job->state_mutex);
}
debug2("forward signal %d to job", signo);
/* common to all tasks */
msg.job_id = job->jobid;
msg.job_step_id = job->stepid;
msg.signal = (uint32_t) signo;
req = xmalloc(sizeof(slurm_msg_t) * job->nhosts);
for (i = 0; i < job->nhosts; i++) {
if (job->host_state[i] != SRUN_HOST_REPLIED) {
debug2("%s has not yet replied\n", job->host[i]);
continue;
}
if (job_active_tasks_on_host(job, i) == 0)
continue;
req[i].msg_type = REQUEST_KILL_TASKS;
req[i].data = &msg;
memcpy( &req[i].address,
&job->slurmd_addr[i], sizeof(slurm_addr));
}
_p_fwd_signal(req, job);
debug2("All tasks have been signalled");
xfree(req);
slurm_mutex_unlock(&sig_mutex);
}
int
job_active_tasks_on_host(srun_job_t *job, int hostid)
{
int i;
int retval = 0;
slurm_mutex_lock(&job->task_mutex);
for (i = 0; i < job->ntask[hostid]; i++) {
uint32_t tid = job->tids[hostid][i];
if (job->task_state[tid] == SRUN_TASK_RUNNING)
retval++;
}
slurm_mutex_unlock(&job->task_mutex);
return retval;
}
/* _p_fwd_signal - parallel (multi-threaded) task signaller */
static void _p_fwd_signal(slurm_msg_t *req, srun_job_t *job)
{
int i;
task_info_t *tinfo;
thd_t *thd;
thd = xmalloc(job->nhosts * sizeof (thd_t));
for (i = 0; i < job->nhosts; i++) {
if (req[i].msg_type == 0)
continue; /* inactive task */
slurm_mutex_lock(&active_mutex);
while (active >= opt.max_threads) {
pthread_cond_wait(&active_cond, &active_mutex);
}
active++;
slurm_mutex_unlock(&active_mutex);
tinfo = (task_info_t *)xmalloc(sizeof(task_info_t));
tinfo->req_ptr = &req[i];
tinfo->job_ptr = job;
tinfo->host_inx = i;
slurm_attr_init(&thd[i].attr);
if (pthread_attr_setdetachstate(&thd[i].attr,
PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate failed");
if (pthread_create( &thd[i].thread, &thd[i].attr,
_p_signal_task, (void *) tinfo )) {
error ("pthread_create failed");
_p_signal_task((void *) tinfo);
}
}
slurm_mutex_lock(&active_mutex);
while (active > 0) {
pthread_cond_wait(&active_cond, &active_mutex);
}
slurm_mutex_unlock(&active_mutex);
xfree(thd);
}
/* _p_signal_task - parallelized signal of a specific task */
static void * _p_signal_task(void *args)
{
int rc = SLURM_SUCCESS;
task_info_t *info = (task_info_t *)args;
slurm_msg_t *req = info->req_ptr;
srun_job_t *job = info->job_ptr;
char *host = job->host[info->host_inx];
debug3("sending signal to host %s", host);
if (slurm_send_recv_rc_msg(req, &rc, 0) < 0) {
error("%s: signal: %m", host);
goto done;
}
/*
* Report error unless it is "Invalid job id" which
* probably just means the tasks exited in the meanwhile.
*/
if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) && (rc != ESRCH))
error("%s: signal: %s", host, slurm_strerror(rc));
done:
slurm_mutex_lock(&active_mutex);
active--;
pthread_cond_signal(&active_cond);
slurm_mutex_unlock(&active_mutex);
xfree(args);
return NULL;
}
/*****************************************************************************\
* src/common/global_srun.c - functions needed by more than just srun
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grodnona <mgrondona@llnl.gov>.
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#ifndef _GLOBAL_SRUN_H
#define _GLOBAL_SRUN_H
#include "src/srun/srun_job.h"
void fwd_signal(srun_job_t *job, int signal);
int job_active_tasks_on_host(srun_job_t *job, int hostid);
#endif /* !_GLOBAL_SRUN_H */
/*****************************************************************************\
* src/common/mpi.c - Generic mpi selector for slurm
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <grondo1@llnl.gov>.
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include "src/common/macros.h"
#include "src/common/plugin.h"
#include "src/common/plugrack.h"
#include "src/common/env.h"
#include "src/common/mpi.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#define MPI_DEFAULT "mpich-gm"
/*
* WARNING: Do not change the order of these fields or add additional
* fields at the beginning of the structure. If you do, job completion
* logging plugins will stop working. If you need to add fields, add them
* at the end of the structure.
*/
typedef struct slurm_mpi_ops {
int (*init) (slurmd_job_t *job, int rank);
int (*create_thread) (srun_job_t *job);
int (*single_task) (void);
int (*exit) (void);
} slurm_mpi_ops_t;
struct slurm_mpi_context {
char * mpi_type;
plugrack_t plugin_list;
plugin_handle_t cur_plugin;
int mpi_errno;
slurm_mpi_ops_t ops;
};
static slurm_mpi_context_t g_context = NULL;
static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER;
static slurm_mpi_context_t
_slurm_mpi_context_create(const char *mpi_type)
{
slurm_mpi_context_t c;
if ( mpi_type == NULL ) {
debug3( "_slurm_mpi_context_create: no mpi type" );
return NULL;
}
c = xmalloc(sizeof(struct slurm_mpi_context));
c->mpi_errno = SLURM_SUCCESS;
/* Copy the job completion authentication type. */
c->mpi_type = xstrdup(mpi_type);
if (c->mpi_type == NULL ) {
debug3( "can't make local copy of mpi type" );
xfree(c);
return NULL;
}
/* Plugin rack is demand-loaded on first reference. */
c->plugin_list = NULL;
c->cur_plugin = PLUGIN_INVALID_HANDLE;
return c;
}
static int
_slurm_mpi_context_destroy( slurm_mpi_context_t c )
{
/*
* Must check return code here because plugins might still
* be loaded and active.
*/
if ( c->plugin_list ) {
if ( plugrack_destroy( c->plugin_list ) != SLURM_SUCCESS ) {
return SLURM_ERROR;
}
}
xfree(c->mpi_type);
xfree(c);
return SLURM_SUCCESS;
}
/*
* Resolve the operations from the plugin.
*/
static slurm_mpi_ops_t *
_slurm_mpi_get_ops( slurm_mpi_context_t c )
{
/*
* These strings must be kept in the same order as the fields
* declared for slurm_mpi_ops_t.
*/
static const char *syms[] = {
"mpi_p_init",
"mpi_p_thr_create",
"mpi_p_single_task",
"mpi_p_exit"
};
int n_syms = sizeof( syms ) / sizeof( char * );
char *plugin_dir = NULL;
/* Get the plugin list, if needed. */
if ( c->plugin_list == NULL ) {
c->plugin_list = plugrack_create();
if ( c->plugin_list == NULL ) {
error("Unable to create a plugin manager");
return NULL;
}
plugrack_set_major_type(c->plugin_list, "mpi");
plugrack_set_paranoia(c->plugin_list,
PLUGRACK_PARANOIA_NONE,
0);
plugin_dir = slurm_get_plugin_dir();
plugrack_read_dir(c->plugin_list, plugin_dir);
xfree(plugin_dir);
}
if (strcasecmp (c->mpi_type, "mpi/list") == 0) {
plugrack_print_all_plugin(c->plugin_list);
exit(0);
} else {
/* Find the correct plugin. */
c->cur_plugin = plugrack_use_by_type(c->plugin_list,
c->mpi_type);
if ( c->cur_plugin == PLUGIN_INVALID_HANDLE ) {
error("can't find a valid plugin for type %s",
c->mpi_type);
return NULL;
}
}
/* Dereference the API. */
if ( plugin_get_syms( c->cur_plugin,
n_syms,
syms,
(void **) &c->ops ) < n_syms ) {
error( "incomplete mpi plugin detected" );
return NULL;
}
return &c->ops;
}
int _mpi_init (char *mpi_type)
{
int retval = SLURM_SUCCESS;
char *full_type = NULL;
slurm_mutex_lock( &context_lock );
if ( g_context )
goto done;
if (mpi_type == NULL)
mpi_type = MPI_DEFAULT;
setenvf (NULL, "SLURM_MPI_TYPE", "%s", mpi_type);
full_type = xmalloc(sizeof(char) * (strlen(mpi_type)+5));
sprintf(full_type,"mpi/%s\0",mpi_type);
g_context = _slurm_mpi_context_create(full_type);
xfree(full_type);
if ( g_context == NULL ) {
error( "cannot create a context for %s", mpi_type);
retval = SLURM_ERROR;
goto done;
}
if ( _slurm_mpi_get_ops( g_context ) == NULL ) {
error( "cannot resolve plugin operations for %s", mpi_type);
_slurm_mpi_context_destroy( g_context );
g_context = NULL;
retval = SLURM_ERROR;
}
done:
slurm_mutex_unlock( &context_lock );
return retval;
}
int srun_mpi_init (char *mpi_type)
{
debug("mpi type = %s", mpi_type);
if(_mpi_init(mpi_type) == SLURM_ERROR)
return SLURM_ERROR;
return SLURM_SUCCESS;
}
int slurmd_mpi_init (slurmd_job_t *job, int rank)
{
char *mpi_type = getenvp (job->env, "SLURM_MPI_TYPE");
debug("mpi type = %s", mpi_type);
if(_mpi_init(mpi_type) == SLURM_ERROR)
return SLURM_ERROR;
unsetenvp (job->env, "SLURM_MPI_TYPE");
return (*(g_context->ops.init))(job, rank);
}
int mpi_fini (void)
{
int rc;
if (!g_context)
return SLURM_SUCCESS;
rc = _slurm_mpi_context_destroy(g_context);
return rc;
}
int slurm_mpi_thr_create(srun_job_t *job)
{
if (_mpi_init(NULL) < 0)
return SLURM_ERROR;
return (*(g_context->ops.create_thread))(job);
}
int slurm_mpi_single_task_per_node (void)
{
if (_mpi_init(NULL) < 0)
return SLURM_ERROR;
return (*(g_context->ops.single_task))();
}
int slurm_mpi_exit (void)
{
if (_mpi_init(NULL) < 0)
return SLURM_ERROR;
return (*(g_context->ops.exit))();
}
/*****************************************************************************\
* src/common/mpi.h - Generic mpi selector for slurm
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <grondo1@llnl.gov>.
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#ifndef _SRUN_MPI_H
#define _SRUN_MPI_H
#if HAVE_CONFIG_H
# include "config.h"
#endif
#include "src/srun/opt.h"
#include "src/srun/srun_job.h"
#include "src/slurmd/slurmd_job.h"
typedef struct slurm_mpi_context *slurm_mpi_context_t;
int srun_mpi_init (char *mpi_type);
int slurmd_mpi_init (slurmd_job_t *job, int rank);
int mpi_fini (void);
int slurm_mpi_thr_create(srun_job_t *job);
int slurm_mpi_single_task_per_node (void);
int slurm_mpi_exit (void);
#endif /* !_SRUN_MPI_H */
/*****************************************************************************\
* net.c - basic network communications for user application I/O
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <grondona1@llnl.gov>, Kevin Tew <tew1@llnl.gov>,
* et. al.
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <sys/time.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include "src/common/log.h"
#include "src/common/net.h"
#ifndef NET_DEFAULT_BACKLOG
# define NET_DEFAULT_BACKLOG 1024
#endif
static int _sock_bind_wild(int sockfd)
{
socklen_t len;
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(0); /* bind ephemeral port */
if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0)
return (-1);
len = sizeof(sin);
if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0)
return (-1);
return (sin.sin_port);
}
int net_stream_listen(int *fd, int *port)
{
int rc, val;
if ((*fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
return -1;
val = 1;
rc = setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int));
if (rc > 0)
goto cleanup;
*port = _sock_bind_wild(*fd);
if (*port < 0)
goto cleanup;
#undef SOMAXCONN
#define SOMAXCONN 1024
rc = listen(*fd, NET_DEFAULT_BACKLOG);
if (rc < 0)
goto cleanup;
return 1;
cleanup:
close(*fd);
return -1;
}
int accept_stream(int fd)
{
int sd;
while ((sd = accept(fd, NULL, NULL)) < 0) {
if (errno == EINTR)
continue;
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
return -1;
if (errno == ECONNABORTED)
return -1;
error("Unable to accept new connection");
}
return sd;
}
int readn(int fd, void *buf, size_t nbytes)
{
int n = 0;
char *pbuf = (char *)buf;
size_t nleft = nbytes;
while (nleft > 0) {
n = read(fd, (void *)pbuf, nleft);
if (n > 0) {
pbuf+=n;
nleft-=n;
} else if (n == 0) /* EOF */
break;
else if (errno == EINTR)
continue;
else {
debug("read error: %m");
break;
}
}
return(n);
}
int net_set_low_water(int sock, size_t size)
{
if (setsockopt(sock, SOL_SOCKET, SO_RCVLOWAT,
(const void *) &size, sizeof(size)) < 0) {
error("Unable to set low water socket option: %m");
return -1;
}
return 0;
}
#ifndef _NET_H
#define _NET_H
/* open a stream socket on an ephemereal port and put it into
* the listen state. fd and port are filled in with the new
* socket's file descriptor and port #.
*/
int net_stream_listen(int *fd, int *port);
/* accept the incoming connection on the stream socket fd
*/
int net_accept_stream(int fd);
/* set low water mark on socket
*/
int net_set_low_water(int sock, size_t size);
#endif /* !_NET_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment