diff --git a/Makefile.am b/Makefile.am index 42a9d9e9ed64036ba8d2afe4492f353273b5558e..4169ee91a2ec8ff4ffd5e011fb56eb262393efca 100644 --- a/Makefile.am +++ b/Makefile.am @@ -21,7 +21,7 @@ EXTRA_DIST = \ NEWS \ ChangeLog \ META - + pkginclude_HEADERS = \ slurm/bnr.h \ slurm/slurm.h \ diff --git a/configure.ac b/configure.ac index ecd80d7495fc7306ba9b3ca3c9e5911368e7ca55..c4e30957ff5f95a4a937cadd33175f2f34f16e88 100644 --- a/configure.ac +++ b/configure.ac @@ -227,6 +227,10 @@ AC_CONFIG_FILES([Makefile src/plugins/switch/elan/Makefile src/plugins/switch/none/Makefile src/plugins/switch/federation/Makefile + src/plugins/mpi/Makefile + src/plugins/mpi/mpichgm/Makefile + src/plugins/mpi/mvapich/Makefile + src/plugins/mpi/lam/Makefile doc/Makefile doc/man/Makefile testsuite/Makefile diff --git a/doc/man/man1/srun.1 b/doc/man/man1/srun.1 index da3eff89ead4dccb47274512db97f9dae3297bcb..c9204943e5acf2c642e2b6e1d3533a4c3fc65aab 100644 --- a/doc/man/man1/srun.1 +++ b/doc/man/man1/srun.1 @@ -181,12 +181,18 @@ Identify the type of MPI to be used. May result in unique initiation procedures. .RS .TP +.B list +Lists avaliable mpi types to choose from. +.TP +.B mpich-gm +For use with Myrinet. +.TP +.B mvapich +For use with Infiniband. +.TP .B lam Initiates one 'lamd' process per node and establishes necessary environment variables for LAM/MPI. -.TP -.B qsw -Establishes necessary environment variables for Quadrics MPI. .RE .TP \fB\-\-jobid\fR=\fIid\fR diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 73f4fa2e21d0ed024dcaf9f150573e51c02baf4d..50d280b1a2d03588d21f380b937da8ad203a5eae 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -23,13 +23,17 @@ libcommon_la_SOURCES = \ xassert.c xassert.h \ xstring.c xstring.h \ xsignal.c xsignal.h \ + global_srun.c \ + global_srun.h \ strlcpy.c strlcpy.h \ list.c list.h \ + net.c net.h \ fd.c fd.h \ log.c log.h \ cbuf.c cbuf.h \ safeopen.c safeopen.h \ bitstring.c bitstring.h \ + mpi.c mpi.h \ pack.c pack.h \ parse_spec.c parse_spec.h \ plugin.c plugin.h \ diff --git a/src/common/plugin.c b/src/common/plugin.c index 274f4d9867cf88d313a24d61779100c7556ea43f..a7a43dbb6bf8ee1cf4bae629c29c5c9fd08e0853 100644 --- a/src/common/plugin.c +++ b/src/common/plugin.c @@ -65,17 +65,17 @@ plugin_peek( const char *fq_path, debug3( "plugin_peek: dlopen(%s): %s", fq_path, _dlerror() ); return SLURM_ERROR; } - if ( ( type = dlsym( plug, PLUGIN_TYPE ) ) != NULL ) { if ( plugin_type != NULL ) { strncpy( plugin_type, type, type_len ); } - } else { + } else { dlclose( plug ); /* could be vestigial library, don't treat as an error */ verbose( "%s: not a SLURM plugin", fq_path ); return SLURM_ERROR; } + if ( ( version = (uint32_t *) dlsym( plug, PLUGIN_VERSION ) ) != NULL ) { if ( plugin_version != NULL ) { *plugin_version = *version; @@ -107,7 +107,7 @@ plugin_load_from_file( const char *fq_path ) */ plug = dlopen( fq_path, RTLD_NOW ); if ( plug == NULL ) { - debug( "plugin_load_from_file: dlopen(%s): %s", + error( "plugin_load_from_file: dlopen(%s): %s", fq_path, _dlerror() ); return PLUGIN_INVALID_HANDLE; @@ -128,7 +128,7 @@ plugin_load_from_file( const char *fq_path ) */ if ( ( init = dlsym( plug, "init" ) ) != NULL ) { if ( (*init)() != 0 ) { - debug( "plugin_load_from_file(%s): init() returned SLURM_ERROR", + error( "plugin_load_from_file(%s): init() returned SLURM_ERROR", fq_path ); (void) dlclose( plug ); return PLUGIN_INVALID_HANDLE; diff --git a/src/common/plugrack.c b/src/common/plugrack.c index 9caaead1e315bcc617f5d08ccad9ecfcae7972ad..16593f4d68bdbede5957214e38311d058bcc9878 100644 --- a/src/common/plugrack.c +++ b/src/common/plugrack.c @@ -222,7 +222,7 @@ accept_paranoia( plugrack_t rack, const char *fq_path ) if ( ! rack->paranoia ) return 1; /* Make a local copy of the path name so we can write into it. */ - local = malloc( strlen( fq_path ) + 1 ); + local = xmalloc( strlen( fq_path ) + 1 ); strcpy( local, fq_path ); if ( ! accept_path_paranoia( rack, @@ -231,7 +231,7 @@ accept_paranoia( plugrack_t rack, const char *fq_path ) PLUGRACK_PARANOIA_FILE_OWN, rack->paranoia & PLUGRACK_PARANOIA_FILE_WRITABLE ) ) { - free( local ); + xfree( local ); return 0; } @@ -253,7 +253,7 @@ accept_paranoia( plugrack_t rack, const char *fq_path ) PLUGRACK_PARANOIA_DIR_OWN, rack->paranoia & PLUGRACK_PARANOIA_DIR_WRITABLE ); - free( local ); + xfree( local ); return rc; } @@ -385,7 +385,7 @@ plugrack_add_plugin_path( plugrack_t rack, const char *fq_path ) { plugrack_entry_t *e; - + if ( ! rack ) return SLURM_ERROR; if ( ! fq_path ) return SLURM_ERROR; @@ -395,8 +395,7 @@ plugrack_add_plugin_path( plugrack_t rack, e->fq_path = xstrdup( fq_path ); e->plug = PLUGIN_INVALID_HANDLE; e->refcount = 0; - - list_append( rack->entries, e ); + list_append( rack->entries, e ); return SLURM_SUCCESS; } @@ -447,7 +446,7 @@ plugrack_read_dir( plugrack_t rack, const char *dir ) if ( ( ! rack ) || (! dir ) ) return SLURM_ERROR; - dir_array = malloc( strlen( dir ) + 1 ); + dir_array = xmalloc( strlen( dir ) + 1 ); xassert( dir_array ); strcpy( dir_array, dir ); head = dir_array; @@ -466,7 +465,7 @@ plugrack_read_dir( plugrack_t rack, const char *dir ) head = dir_array + i + 1; } } - free( dir_array ); + xfree( dir_array ); return rc; } @@ -488,7 +487,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) if (max_path_len <= 0) max_path_len = 256; } - fq_path = malloc( strlen( dir ) + max_path_len + 1 ); + fq_path = xmalloc( strlen( dir ) + max_path_len + 1 ); xassert( fq_path ); /* @@ -500,7 +499,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) tail = &fq_path[ strlen( dir ) ]; *tail = '/'; ++tail; - + /* Check whether we should be paranoid about this directory. */ if ( ! accept_path_paranoia( rack, dir, @@ -508,7 +507,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) PLUGRACK_PARANOIA_DIR_OWN, rack->paranoia & PLUGRACK_PARANOIA_DIR_WRITABLE ) ) { - free( fq_path ); + xfree( fq_path ); return SLURM_ERROR; } @@ -516,7 +515,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) dirp = opendir( dir ); if ( dirp == NULL ) { error( "cannot open plugin directory %s", dir ); - free( fq_path ); + xfree( fq_path ); return SLURM_ERROR; } @@ -531,7 +530,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) * should work. */ strcpy( tail, e->d_name ); - + /* Check only regular files. */ if ( strncmp(e->d_name, ".", 1) == 0) continue; if ( stat( fq_path, &st ) < 0 ) continue; @@ -547,7 +546,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) if ((rack->major_type) && (!_match_major(e->d_name, rack->major_type))) continue; - + /* See if we should be paranoid about this file. */ if (!accept_path_paranoia( rack, fq_path, @@ -559,7 +558,7 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) "reasons", fq_path ); continue; } - + /* Test the type. */ if ( plugin_peek( fq_path, plugin_type, @@ -567,20 +566,21 @@ _plugrack_read_single_dir( plugrack_t rack, char *dir ) NULL ) == SLURM_ERROR ) { continue; } + if ( rack->major_type && ( strncmp( rack->major_type, plugin_type, strlen( rack->major_type ) ) != 0 ) ) { continue; } - + /* Add it to the list. */ (void) plugrack_add_plugin_path( rack, plugin_type, fq_path ); } closedir( dirp ); - free( fq_path ); + xfree( fq_path ); return SLURM_SUCCESS; } @@ -689,12 +689,11 @@ plugrack_use_by_type( plugrack_t rack, it = list_iterator_create( rack->entries ); while ( ( e = list_next( it ) ) != NULL ) { - if ( strcmp( full_type, e->full_type ) != 0 ) continue; - + if ( strcmp( full_type, e->full_type ) != 0 ) continue; + /* See if plugin is loaded. */ - if ( e->plug == PLUGIN_INVALID_HANDLE ) { - e->plug = plugin_load_from_file( e->fq_path ); - } + if ( e->plug == PLUGIN_INVALID_HANDLE ) + e->plug = plugin_load_from_file( e->fq_path ); /* If load was successful, increment the reference count. */ if ( e->plug == PLUGIN_INVALID_HANDLE ) @@ -705,7 +704,7 @@ plugrack_use_by_type( plugrack_t rack, * as an error return value. */ list_iterator_destroy( it ); - return e->plug; + return e->plug; } /* Couldn't find a suitable plugin. */ @@ -739,3 +738,16 @@ plugrack_finished_with_plugin( plugrack_t rack, plugin_handle_t plug ) list_iterator_destroy( it ); return SLURM_ERROR; } + +int +plugrack_print_all_plugin(plugrack_t rack) +{ + ListIterator itr; + plugrack_entry_t *e = NULL; + itr = list_iterator_create(rack->entries); + info("MPI types are..."); + while ((e = list_next(itr)) != NULL ) { + info("%s",e->full_type); + } + return SLURM_SUCCESS; +} diff --git a/src/common/plugrack.h b/src/common/plugrack.h index c5e729ef644fee025aeb0b643156514f7848be50..d2b6ca5ee6f686b6ecde319d9f8da1ce067a4956 100644 --- a/src/common/plugrack.h +++ b/src/common/plugrack.h @@ -170,5 +170,12 @@ plugin_handle_t plugrack_use_by_type( plugrack_t rack, */ int plugrack_finished_with_plugin( plugrack_t rack, plugin_handle_t plug ); +/* + * print all plugins in rack + * + * Returns a SLURM errno. + */ +int plugrack_print_all_plugin( plugrack_t rack); + #endif /*__PLUGRACK_H__*/ diff --git a/src/common/slurm_jobacct.c b/src/common/slurm_jobacct.c index f1634bd98da2e3f96a29e83fd516d783a464be90..60f36509200d0668d1ea9f73acaf346cccb450b7 100644 --- a/src/common/slurm_jobacct.c +++ b/src/common/slurm_jobacct.c @@ -48,7 +48,7 @@ #include "src/common/xassert.h" #include "src/common/xstring.h" #include "src/slurmctld/slurmctld.h" -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" /* * WARNING: Do not change the order of these fields or add additional diff --git a/src/common/slurm_jobacct.h b/src/common/slurm_jobacct.h index 6f01c8df4b8d27fac79293e09822b709aceb1ca2..98dc89c71d4c196f6e0f1c0106e8eea0af1c28a9 100644 --- a/src/common/slurm_jobacct.h +++ b/src/common/slurm_jobacct.h @@ -51,7 +51,7 @@ #include <unistd.h> #include "src/slurmctld/slurmctld.h" -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" typedef struct slurm_jobacct_context * slurm_jobacct_context_t; diff --git a/src/common/switch.c b/src/common/switch.c index 1d8a99cdc815cf0b44fd4dbddd7faecaef281115..0355917cd86867e32f340eff300110dc600b6a51 100644 --- a/src/common/switch.c +++ b/src/common/switch.c @@ -107,7 +107,7 @@ static pthread_mutex_t context_lock = PTHREAD_MUTEX_INITIALIZER; static slurm_switch_context_t -_slurm_switch_context_create( const char *switch_type) +_slurm_switch_context_create(const char *switch_type) { slurm_switch_context_t c; diff --git a/src/common/unsetenv.c b/src/common/unsetenv.c index 6018243df56cfbf8542a896e464702b60ad16aac..01028f71eb93719cd5cc5af4ae15436e98be2cb0 100644 --- a/src/common/unsetenv.c +++ b/src/common/unsetenv.c @@ -32,7 +32,7 @@ extern int unsetenv (const char *name) int len, rc; char *tmp; - if (getenv(name) == NULL) /* Nothing to clear */ + if (!getenv(name)) /* Nothing to clear */ return 0; len = strlen(name); diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index 8d2032362468bfc9b79417e16b9e8c33d7bcd80d..e4419065dd296f8cf3ee96147cbdf36b197a5f56 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -1,3 +1,3 @@ # $Id$ -SUBDIRS = auth checkpoint jobacct jobcomp proctrack sched select switch +SUBDIRS = auth checkpoint jobacct jobcomp proctrack sched select switch mpi diff --git a/src/plugins/mpi/Makefile.am b/src/plugins/mpi/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..54833b9ec9924d5ae09ea60385a403c02fd05267 --- /dev/null +++ b/src/plugins/mpi/Makefile.am @@ -0,0 +1,4 @@ +# $Id: Makefile.am,v 1.6 2005/03/08 14:25:17 jking Exp $ +# Makefile for mpi plugins + +SUBDIRS = mpichgm mvapich lam diff --git a/src/plugins/mpi/lam/Makefile.am b/src/plugins/mpi/lam/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..c83de353365f15bc1a92cba7ed1a9bd45823e401 --- /dev/null +++ b/src/plugins/mpi/lam/Makefile.am @@ -0,0 +1,17 @@ +# $Id: Makefile.am,v 1.4 2004/07/26 23:43:53 jette Exp $ +# Makefile for mpi/lam plugin + +AUTOMAKE_OPTIONS = foreign + +PLUGIN_FLAGS = -module -avoid-version --export-dynamic + +INCLUDES = -I$(top_srcdir) -I$(top_srcdir)/src/common + +pkglib_LTLIBRARIES = mpi_lam.la + +# Null switch plugin. +mpi_lam_la_SOURCES = mpi_lam.c +mpi_lam_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) +mpi_lam_la_LIBADD = \ + $(top_builddir)/src/common/libcommon.la -lpthread \ + $(top_builddir)/src/api/libslurm.la diff --git a/src/srun/gmpi.h b/src/plugins/mpi/lam/lam.h similarity index 71% rename from src/srun/gmpi.h rename to src/plugins/mpi/lam/lam.h index 6bd754022763c00eb8df62beda25b243c7bca07a..13238125401b02df0d6cf861d2e4a6ce48721759 100644 --- a/src/srun/gmpi.h +++ b/src/plugins/mpi/lam/lam.h @@ -1,9 +1,10 @@ /*****************************************************************************\ - * gmpi.h - srun support for MPICH-GM (GMPI) + ** lam.h - Library routines for initiating jobs on with lam type mpi + ** $Id: mpi_gmpi.c,v 1.7 2005/06/07 18:25:32 morrone Exp $ ***************************************************************************** * Copyright (C) 2004 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Takao Hatazaki <takao.hatazaki@hp.com> + * Written by Danny Auble <da@llnl.gov> * UCRL-CODE-2002-040. * * This file is part of SLURM, a resource management program. @@ -23,25 +24,12 @@ * with SLURM; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. \*****************************************************************************/ +#if HAVE_CONFIG_H +# include "config.h" +#endif -#ifndef _HAVE_GMPI_H -#define _HAVE_GMPI_H - -#include "src/srun/job.h" - -typedef struct { - int defined; - unsigned int port_board_id; - unsigned int unique_high_id; - unsigned int unique_low_id; - unsigned int numanode; - unsigned int remote_pid; - unsigned int remote_port; -} gm_slave_t; - -#define GMPI_RECV_BUF_LEN 65536 - -extern int gmpi_thr_create(job_t *job, char **port); - -#endif /* _HAVE_GMPI_H */ +#include "src/srun/srun_job.h" +#include "src/slurmd/slurmd_job.h" +#include "src/common/env.h" +//extern int lam_thr_create(srun_job_t *job); diff --git a/src/plugins/mpi/lam/mpi_lam.c b/src/plugins/mpi/lam/mpi_lam.c new file mode 100644 index 0000000000000000000000000000000000000000..493c82a4cb675c786cbd7a3e892a41c37cbedea1 --- /dev/null +++ b/src/plugins/mpi/lam/mpi_lam.c @@ -0,0 +1,89 @@ +/*****************************************************************************\ + ** mpi_lam.c - Library routines for initiating jobs on with lam type mpi + ** $Id: mpi_gmpi.c,v 1.7 2005/06/07 18:25:32 morrone Exp $ + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Danny Auble <da@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <fcntl.h> +#include <signal.h> +#include <sys/types.h> + +#include <slurm/slurm_errno.h> +#include "src/common/slurm_xlator.h" +#include "src/plugins/mpi/lam/lam.h" + +/* + * These variables are required by the generic plugin interface. If they + * are not found in the plugin, the plugin loader will ignore it. + * + * plugin_name - a string giving a human-readable description of the + * plugin. There is no maximum length, but the symbol must refer to + * a valid string. + * + * plugin_type - a string suggesting the type of the plugin or its + * applicability to a particular form of data or method of data handling. + * If the low-level plugin API is used, the contents of this string are + * unimportant and may be anything. SLURM uses the higher-level plugin + * interface which requires this string to be of the form + * + * <application>/<method> + * + * where <application> is a description of the intended application of + * the plugin (e.g., "switch" for SLURM switch) and <method> is a description + * of how this plugin satisfies that application. SLURM will only load + * a switch plugin if the plugin_type string has a prefix of "switch/". + * + * plugin_version - an unsigned 32-bit integer giving the version number + * of the plugin. If major and minor revisions are desired, the major + * version number may be multiplied by a suitable magnitude constant such + * as 100 or 1000. Various SLURM versions will likely require a certain + * minimum versions for their plugins as this API matures. + */ +const char plugin_name[] = "mpi LAM plugin"; +const char plugin_type[] = "mpi/lam"; +const uint32_t plugin_version = 100; + +int mpi_p_init(slurmd_job_t *job) +{ + return 0; +} + +int mpi_p_thr_create(srun_job_t *job) +{ + return 0; +} + +int mpi_p_single_task() +{ + return true; +} + +int mpi_p_exit() +{ + return 0; +} diff --git a/src/plugins/mpi/mpichgm/Makefile.am b/src/plugins/mpi/mpichgm/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..abc64f133069814b1ee1312470bb4d6371dbce2d --- /dev/null +++ b/src/plugins/mpi/mpichgm/Makefile.am @@ -0,0 +1,17 @@ +# $Id: Makefile.am,v 1.4 2004/07/26 23:43:53 jette Exp $ +# Makefile for mpi/gmpi plugin + +AUTOMAKE_OPTIONS = foreign + +PLUGIN_FLAGS = -module -avoid-version --export-dynamic + +INCLUDES = -I$(top_srcdir) -I$(top_srcdir)/src/common + +pkglib_LTLIBRARIES = mpi_mpichgm.la + +# Null switch plugin. +mpi_mpichgm_la_SOURCES = mpi_mpichgm.c mpichgm.c +mpi_mpichgm_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) +mpi_mpichgm_la_LIBADD = \ + $(top_builddir)/src/common/libcommon.la -lpthread \ + $(top_builddir)/src/api/libslurm.la diff --git a/src/plugins/mpi/mpichgm/mpi_mpichgm.c b/src/plugins/mpi/mpichgm/mpi_mpichgm.c new file mode 100644 index 0000000000000000000000000000000000000000..a4f0eb5909d656a06fa0353fdda69a54dec15042 --- /dev/null +++ b/src/plugins/mpi/mpichgm/mpi_mpichgm.c @@ -0,0 +1,101 @@ +/*****************************************************************************\ + ** mpi_gmpi.c - Library routines for initiating jobs on with gmpi type mpi + ** $Id: mpi_gmpi.c,v 1.7 2005/06/07 18:25:32 morrone Exp $ + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Danny Auble <da@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <fcntl.h> +#include <signal.h> +#include <sys/types.h> + +#include <slurm/slurm_errno.h> +#include "src/common/slurm_xlator.h" +#include "src/plugins/mpi/mpichgm/mpichgm.h" + +/* + * These variables are required by the generic plugin interface. If they + * are not found in the plugin, the plugin loader will ignore it. + * + * plugin_name - a string giving a human-readable description of the + * plugin. There is no maximum length, but the symbol must refer to + * a valid string. + * + * plugin_type - a string suggesting the type of the plugin or its + * applicability to a particular form of data or method of data handling. + * If the low-level plugin API is used, the contents of this string are + * unimportant and may be anything. SLURM uses the higher-level plugin + * interface which requires this string to be of the form + * + * <application>/<method> + * + * where <application> is a description of the intended application of + * the plugin (e.g., "switch" for SLURM switch) and <method> is a description + * of how this plugin satisfies that application. SLURM will only load + * a switch plugin if the plugin_type string has a prefix of "switch/". + * + * plugin_version - an unsigned 32-bit integer giving the version number + * of the plugin. If major and minor revisions are desired, the major + * version number may be multiplied by a suitable magnitude constant such + * as 100 or 1000. Various SLURM versions will likely require a certain + * minimum versions for their plugins as this API matures. + */ +const char plugin_name[] = "mpi MPICH-GM plugin"; +const char plugin_type[] = "mpi/mpich-gm"; +const uint32_t plugin_version = 100; + +int mpi_p_init(slurmd_job_t *job, int rank) +{ + char addrbuf[1024]; + char *p; + char *addr = getenvp (job->env, "SLURM_LAUNCH_NODE_IPADDR"); + + slurm_print_slurm_addr (job->envtp->self, addrbuf, sizeof(addrbuf)); + + if ((p = strchr (addrbuf, ':')) != NULL) + *p = '\0'; + + setenvf (&job->env, "GMPI_MASTER", "%s", addr); + setenvf (&job->env, "GMPI_SLAVE", "%s", addrbuf); + + return (0); +} + +int mpi_p_thr_create(srun_job_t *job) +{ + return gmpi_thr_create(job); +} + +int mpi_p_single_task() +{ + return false; +} + +int mpi_p_exit() +{ + return 0; +} diff --git a/src/srun/gmpi.c b/src/plugins/mpi/mpichgm/mpichgm.c similarity index 80% rename from src/srun/gmpi.c rename to src/plugins/mpi/mpichgm/mpichgm.c index 0c774d3bb61046e34e45dec94fb7022020a4fb07..2b0e1cba6958aa54d9c2525226d8ca22fa3ba2ae 100644 --- a/src/srun/gmpi.c +++ b/src/plugins/mpi/mpichgm/mpichgm.c @@ -40,28 +40,31 @@ #include "src/common/xmalloc.h" #include "src/common/xstring.h" +#include "src/common/net.h" -#include "src/srun/allocate.h" -//#include "src/srun/env.h" -#include "src/srun/io.h" -#include "src/srun/job.h" -#include "src/srun/gmpi.h" -#include "src/srun/launch.h" -#include "src/srun/msg.h" -#include "src/srun/net.h" -#include "src/srun/opt.h" -#include "src/srun/signals.h" -#include "src/srun/sigstr.h" -#include "src/srun/reattach.h" -#include "src/srun/attach.h" +#include "src/plugins/mpi/mpichgm/mpichgm.h" +typedef struct { + int defined; + unsigned int port_board_id; + unsigned int unique_high_id; + unsigned int unique_low_id; + unsigned int numanode; + unsigned int remote_pid; + unsigned int remote_port; +} gm_slave_t; -static int _gmpi_parse_init_recv_msg(job_t *job, char *rbuf, - gm_slave_t *slave_data); +#define GMPI_RECV_BUF_LEN 65536 -static int _gmpi_parse_init_recv_msg(job_t *job, char *rbuf, - gm_slave_t *slave_data) +static int _gmpi_parse_init_recv_msg(srun_job_t *job, char *rbuf, + gm_slave_t *slave_data); + +static int gmpi_fd = -1; +static int gmpi_port = -1; + +static int _gmpi_parse_init_recv_msg(srun_job_t *job, char *rbuf, + gm_slave_t *slave_data) { unsigned int magic, id, port_board_id, unique_high_id, unique_low_id, numanode, remote_pid, remote_port; @@ -105,7 +108,7 @@ static int _gmpi_parse_init_recv_msg(job_t *job, char *rbuf, } -static int _gmpi_establish_map(job_t *job) +static int _gmpi_establish_map(srun_job_t *job) { struct sockaddr_in addr; socklen_t addrlen; @@ -114,12 +117,12 @@ static int _gmpi_establish_map(job_t *job) char *p, *rbuf = NULL, *gmap = NULL, *lmap = NULL, *map = NULL; char tmp[128]; gm_slave_t *slave_data = NULL, *dp; - + /* * Collect info from slaves. * Will never finish unless slaves are GMPI processes. */ - accfd = job->gmpi_fd; + accfd = gmpi_fd; addrlen = sizeof(addr); nprocs = opt.nprocs; slave_data = (gm_slave_t *)xmalloc(sizeof(*slave_data)*nprocs); @@ -127,6 +130,7 @@ static int _gmpi_establish_map(job_t *job) slave_data[i].defined = 0; i = 0; rbuf = (char *)xmalloc(GMPI_RECV_BUF_LEN); + while (i < nprocs) { newfd = accept(accfd, (struct sockaddr *)&addr, &addrlen); if (newfd == -1) { @@ -221,7 +225,7 @@ static int _gmpi_establish_map(job_t *job) } -static void _gmpi_wait_abort(job_t *job) +static void _gmpi_wait_abort(srun_job_t *job) { struct sockaddr_in addr; socklen_t addrlen; @@ -232,7 +236,7 @@ static void _gmpi_wait_abort(job_t *job) rbuf = (char *)xmalloc(GMPI_RECV_BUF_LEN); addrlen = sizeof(addr); while (1) { - newfd = accept(job->gmpi_fd, (struct sockaddr *)&addr, + newfd = accept(gmpi_fd, (struct sockaddr *)&addr, &addrlen); if (newfd == -1) { fatal("GMPI master failed to accept (abort-wait)"); @@ -260,8 +264,8 @@ static void _gmpi_wait_abort(job_t *job) fwd_signal(job, SIGKILL); #if 0 xfree(rbuf); - close(job->gmpi_fd); - job->gmpi_fd = -1; + close(jgmpi_fd); + gmpi_fd = -1; return; #endif } @@ -270,9 +274,9 @@ static void _gmpi_wait_abort(job_t *job) static void *_gmpi_thr(void *arg) { - job_t *job; + srun_job_t *job; - job = (job_t *) arg; + job = (srun_job_t *) arg; debug3("GMPI master thread pid=%lu", (unsigned long) getpid()); _gmpi_establish_map(job); @@ -284,47 +288,42 @@ static void *_gmpi_thr(void *arg) } -extern int gmpi_thr_create(job_t *job, char **port) +extern int gmpi_thr_create(srun_job_t *job) { - int fd; - struct sockaddr_in addr; - char name[128]; - socklen_t namelen; + int port; pthread_attr_t attr; + pthread_t gtid; /* - * Prepare for accepting GMPI processes. + * It is possible for one to modify the mpirun command in + * MPICH-GM distribution so that it calls srun, instead of + * rsh, for remote process invocations. In that case, we + * should not override envs nor open the master port. */ - if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - return -1; - } - bzero(&addr, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(0); - if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + if (getenv("GMPI_PORT")) + return (0); + + if (net_stream_listen (&gmpi_fd, &port) < 0) { + error ("Unable to create GMPI listen port: %m"); return -1; } - if (listen(fd, 5) == -1) - return -1; - - /* - * Get the port name to communicate. - */ - namelen = sizeof(addr); - getsockname(fd, (struct sockaddr *)&addr, &namelen); - sprintf(name, "%u", ntohs(addr.sin_port)); - *port = xstrdup(name); /* * Accept in a separate thread. */ slurm_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - job->gmpi_fd = fd; - if (pthread_create(&job->gtid, &attr, &_gmpi_thr, (void *)job)) + if (pthread_create(>id, &attr, &_gmpi_thr, (void *)job)) return -1; - debug("Started GMPI master thread (%lu)", (unsigned long) job->gtid); + + setenvf (NULL, "GMPI_PORT", "%u", ntohs (port)); + setenvf (NULL, "GMPI_MAGIC", "%u", job->jobid); + setenvf (NULL, "GMPI_NP", "%d", opt.nprocs); + setenvf (NULL, "GMPI_SHMEM", "1"); + /* FIXME for multi-board config. */ + setenvf (NULL, "GMPI_BOARD", "-1"); + + debug("Started GMPI master thread (%lu)", (unsigned long) gtid); return 0; } diff --git a/src/plugins/mpi/mpichgm/mpichgm.h b/src/plugins/mpi/mpichgm/mpichgm.h new file mode 100644 index 0000000000000000000000000000000000000000..ad6b318c6caef996995773112c1df7aa64d25733 --- /dev/null +++ b/src/plugins/mpi/mpichgm/mpichgm.h @@ -0,0 +1,35 @@ +/*****************************************************************************\ + ** gmpi.h - Library routines for initiating jobs on with gmpi type mpi + ** $Id: mpi_gmpi.c,v 1.7 2005/06/07 18:25:32 morrone Exp $ + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Danny Auble <da@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include "src/srun/srun_job.h" +#include "src/slurmd/slurmd_job.h" +#include "src/common/env.h" + +extern int gmpi_thr_create(srun_job_t *job); diff --git a/src/plugins/mpi/mvapich/Makefile.am b/src/plugins/mpi/mvapich/Makefile.am new file mode 100644 index 0000000000000000000000000000000000000000..ca57696a78de5cce4d475c1dc8ac93962ef8a640 --- /dev/null +++ b/src/plugins/mpi/mvapich/Makefile.am @@ -0,0 +1,17 @@ +# $Id: Makefile.am,v 1.4 2004/07/26 23:43:53 jette Exp $ +# Makefile for mpi/mvapich plugin + +AUTOMAKE_OPTIONS = foreign + +PLUGIN_FLAGS = -module -avoid-version --export-dynamic + +INCLUDES = -I$(top_srcdir) -I$(top_srcdir)/src/common + +pkglib_LTLIBRARIES = mpi_mvapich.la + +# Null switch plugin. +mpi_mvapich_la_SOURCES = mpi_mvapich.c mvapich.c +mpi_mvapich_la_LDFLAGS = $(SO_LDFLAGS) $(PLUGIN_FLAGS) +mpi_mvapich_la_LIBADD = \ + $(top_builddir)/src/common/libcommon.la -lpthread \ + $(top_builddir)/src/api/libslurm.la diff --git a/src/plugins/mpi/mvapich/mpi_mvapich.c b/src/plugins/mpi/mvapich/mpi_mvapich.c new file mode 100644 index 0000000000000000000000000000000000000000..bb0741f8e3a7b7c5faf90b31747e23f712f6a922 --- /dev/null +++ b/src/plugins/mpi/mvapich/mpi_mvapich.c @@ -0,0 +1,107 @@ +/*****************************************************************************\ + ** mpi_mvapich.c - Library routines for initiating jobs on with mvapich + ** type mpi. + ** $Id: mpi_gmpi.c,v 1.7 2005/06/07 18:25:32 morrone Exp $ + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Danny Auble <da@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include <fcntl.h> +#include <signal.h> +#include <sys/types.h> + +#include <slurm/slurm_errno.h> +#include "src/common/slurm_xlator.h" +#include "src/plugins/mpi/mvapich/mvapich.h" +/* + * These variables are required by the generic plugin interface. If they + * are not found in the plugin, the plugin loader will ignore it. + * + * plugin_name - a string giving a human-readable description of the + * plugin. There is no maximum length, but the symbol must refer to + * a valid string. + * + * plugin_type - a string suggesting the type of the plugin or its + * applicability to a particular form of data or method of data handling. + * If the low-level plugin API is used, the contents of this string are + * unimportant and may be anything. SLURM uses the higher-level plugin + * interface which requires this string to be of the form + * + * <application>/<method> + * + * where <application> is a description of the intended application of + * the plugin (e.g., "switch" for SLURM switch) and <method> is a description + * of how this plugin satisfies that application. SLURM will only load + * a switch plugin if the plugin_type string has a prefix of "switch/". + * + * plugin_version - an unsigned 32-bit integer giving the version number + * of the plugin. If major and minor revisions are desired, the major + * version number may be multiplied by a suitable magnitude constant such + * as 100 or 1000. Various SLURM versions will likely require a certain + * minimum versions for their plugins as this API matures. + */ +const char plugin_name[] = "mpi MVAPICH plugin"; +const char plugin_type[] = "mpi/mvapich"; +const uint32_t plugin_version = 100; + +int mpi_p_init (slurmd_job_t *job, int rank) +{ + int i; + char *processes = NULL; + char *addr = getenvp (job->env, "SLURM_LAUNCH_NODE_IPADDR"); + + setenvf (&job->env, "MPIRUN_HOST", "%s", addr); + setenvf (&job->env, "MPIRUN_RANK", "%d", rank); + setenvf (&job->env, "MPIRUN_MPD", "0"); + + info ("init for mpi rank %d\n", rank); + /* + * Fake MPIRUN_PROCESSES env var -- we don't need this for + * SLURM at this time. (what a waste) + */ + for (i = 0; i < job->nprocs; i++) + xstrcat (processes, "x:"); + + setenvf (&job->env, "MPIRUN_PROCESSES", "%s", processes); + + return (0); +} + +int mpi_p_thr_create(srun_job_t *job) +{ + return mvapich_thr_create(job); +} + +int mpi_p_single_task() +{ + return false; +} + +int mpi_p_exit() +{ + return 0; +} diff --git a/src/plugins/mpi/mvapich/mvapich.c b/src/plugins/mpi/mvapich/mvapich.c new file mode 100644 index 0000000000000000000000000000000000000000..29f511981e08e313034a8408eef3f660921ab332 --- /dev/null +++ b/src/plugins/mpi/mvapich/mvapich.c @@ -0,0 +1,360 @@ +/*****************************************************************************\ + * mvapich.c - srun support for MPICH-IB (MVAPICH 0.9.4 and 0.9.5) + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef WITH_PTHREADS +# include <pthread.h> +#endif + +#include <signal.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <strings.h> + +#include "src/common/xmalloc.h" +#include "src/common/xstring.h" +#include "src/common/net.h" + +#include "src/plugins/mpi/mvapich/mvapich.h" + + + +/* + * Arguments passed to mvapich support thread. + */ +struct mvapich_args { + srun_job_t *job; /* SRUN job information */ + int fd; /* fd on which to accept new connections */ +}; + +/* + * Information read from each MVAPICH process + */ +struct mvapich_info +{ + int fd; /* fd for socket connection to MPI task */ + int version; /* Version of mvapich startup protocol */ + int rank; /* This process' MPI rank */ + int pid; /* This rank's local pid (V3 only) */ + int addrlen; /* Length of addr array in bytes */ + + int *addr; /* This process' address array, which for + * process rank N in an M process job + * looks like: + * + * qp0,qp1,..,lid,qpN+1,..,qpM-1, hostid + * + * Where position N is this rank's lid, + * and the hostid is tacked onto the end + * of the array + */ +}; + +/* Globals for the mvapich thread. + */ +static struct mvapich_info **mvarray = NULL; +static int mvapich_fd = -1; +static int nprocs = -1; +static int protocol_version = -1; + +static void mvapich_info_destroy (struct mvapich_info *mvi); + +#define E_RET(msg, args...) \ + do { \ + error (msg, ## args); \ + mvapich_info_destroy (mvi); \ + return (NULL); \ + } while (0); + +/* + * Create an mvapich_info object by reading information from + * file descriptor `fd' + */ +static struct mvapich_info * mvapich_info_create (int fd) +{ + int n; + unsigned char host[4]; + struct mvapich_info *mvi = xmalloc (sizeof (*mvi)); + + mvi->fd = fd; + mvi->addr = NULL; + + if (fd_read_n (fd, &mvi->version, sizeof (int)) < 0) + E_RET ("mvapich: Unable to read version from task: %m"); + + if (protocol_version == -1) + protocol_version = mvi->version; + else if (protocol_version != mvi->version) + E_RET ("mvapich: version %d != %d", mvi->version, protocol_version); + + if (fd_read_n (fd, &mvi->rank, sizeof (int)) < 0) + E_RET ("mvapich: Unable to read rank id: %m", mvi->rank); + + if (mvi->version != 2 && mvi->version != 3) + E_RET ("Unsupported version %d from rank %d", mvi->version, mvi->rank); + + if (fd_read_n (fd, &mvi->addrlen, sizeof (int)) < 0) + E_RET ("mvapich: Unable to read addrlen for rank %d: %m", mvi->rank); + + mvi->addr = xmalloc (mvi->addrlen); + + if (fd_read_n (fd, mvi->addr, mvi->addrlen) < 0) + E_RET ("mvapich: Unable to read addr info for rank %d: %m", mvi->rank); + + if (mvi->version == 3) { + int pidlen; + if (fd_read_n (fd, &pidlen, sizeof (int)) < 0) + E_RET ("mvapich: Unable to read pidlen for rank %d: %m", mvi->rank); + + if (pidlen != sizeof (mvi->pid)) + E_RET ("mvapich: Confused. Rank %d pidlen of %d not what I expected", + mvi->rank, pidlen); + + if (fd_read_n (fd, &mvi->pid, pidlen) < 0) + E_RET ("mvapich: Unable to read pid for rank %d: %m", mvi->rank); + } + + + return (mvi); +} + +static void mvapich_info_destroy (struct mvapich_info *mvi) +{ + xfree (mvi->addr); + xfree (mvi); + return; +} + + +/* + * Broadcast addr information to all connected mvapich processes. + * The format of the information sent back to each process is: + * + * for rank N in M process job: + * + * lid info : lid0,lid1,...lidM-1 + * qp info : qp0, qp1, ..., -1, qpN+1, ...,qpM-1 + * hostids : hostid0,hostid1,...,hostidM-1 + * + * total of 3*nprocs ints. + * + */ +static void mvapich_bcast (void) +{ + struct mvapich_info *m; + int out_addrs_len = 3 * nprocs * sizeof (int); + int *out_addrs = xmalloc (out_addrs_len); + int i = 0; + int j = 0; + + for (i = 0; i < nprocs; i++) { + m = mvarray[i]; + /* + * lids are found in addrs[rank] for each process + */ + out_addrs[i] = m->addr[m->rank]; + + /* + * hostids are the last entry in addrs + */ + out_addrs[2 * nprocs + i] = m->addr[(m->addrlen/sizeof (int)) - 1]; + } + + for (i = 0; i < nprocs; i++) { + m = mvarray[i]; + + /* + * qp array is tailored to each process. + */ + for (j = 0; j < nprocs; j++) + out_addrs[nprocs + j] = (i == j) ? -1 : mvarray[j]->addr[i]; + + fd_write_n (m->fd, out_addrs, out_addrs_len); + + /* + * Protocol version 3 requires pid list to be sent next + */ + if (protocol_version == 3) { + for (j = 0; j < nprocs; j++) + fd_write_n (m->fd, &mvarray[j]->pid, sizeof (int)); + } + + } + + xfree (out_addrs); + return; +} + +static void mvapich_barrier (void) +{ + int i; + struct mvapich_info *m; + /* + * Simple barrier to wait for qp's to come up. + * Once all processes have written their rank over the socket, + * simply write their rank right back to them. + */ + + debug ("mvapich: starting barrier"); + + for (i = 0; i < nprocs; i++) { + int j; + m = mvarray[i]; + fd_read_n (m->fd, &j, sizeof (j)); + } + + debug ("mvapich: completed barrier for all tasks"); + + for (i = 0; i < nprocs; i++) { + m = mvarray[i]; + fd_write_n (m->fd, &i, sizeof (i)); + close (m->fd); + m->fd = -1; + } + + return; +} + +static void mvapich_wait_for_abort(srun_job_t *job) +{ + int rlen; + char rbuf[1024]; + + /* + * Wait for abort notification from any process. + * For mvapich 0.9.4, it appears that an MPI_Abort is registered + * simply by connecting to this socket and immediately closing + * the connection. In other versions, the process may write + * its rank. + */ + while (1) { + slurm_addr addr; + int newfd = slurm_accept_msg_conn (mvapich_fd, &addr); + + if (newfd == -1) { + fatal("MPI master failed to accept (abort-wait)"); + } + + fd_set_blocking (newfd); + + if ((rlen = fd_read_n (newfd, rbuf, sizeof (rbuf))) < 0) { + error("MPI recv (abort-wait) returned %d", rlen); + close(newfd); + continue; + } + close(newfd); + if (protocol_version == 3) { + int rank = (int) (*rbuf); + info ("mvapich: Received ABORT message from MPI Rank %d", rank); + } else + info ("mvapich: Received ABORT message from an MPI process."); + fwd_signal(job, SIGKILL); + } + + return; /* but not reached */ +} + + + +static void *mvapich_thr(void *arg) +{ + srun_job_t *job = arg; + int i = 0; + + mvarray = xmalloc (nprocs * sizeof (*mvarray)); + + debug ("mvapich-0.9.[45]: thread started: %ld", pthread_self ()); + + while (i < nprocs) { + struct mvapich_info *mvi = NULL; + slurm_addr addr; + int newfd = slurm_accept_msg_conn (mvapich_fd, &addr); + + if (newfd < 0) { + fatal ("Failed to accept connection from mvapich task: %m"); + continue; + } + + if ((mvi = mvapich_info_create (newfd)) == NULL) { + error ("mvapich: MPI task failed to check in"); + return NULL; + } + + if (mvarray[mvi->rank] != NULL) { + job_fatal (job, "mvapich: MPI task checked in more than once"); + return NULL; + } + + debug ("mvapich: rank %d checked in", mvi->rank); + mvarray[mvi->rank] = mvi; + i++; + } + + mvapich_bcast (); + + mvapich_barrier (); + + mvapich_wait_for_abort (job); + + return (void *)0; +} + +extern int mvapich_thr_create(srun_job_t *job) +{ + int port; + char name[128]; + pthread_attr_t attr; + pthread_t tid; + + nprocs = opt.nprocs; + + if (net_stream_listen(&mvapich_fd, &port) < 0) + error ("Unable to create ib listen port: %m"); + + /* + * Accept in a separate thread. + */ + slurm_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (pthread_create(&tid, &attr, &mvapich_thr, (void *)job)) + return -1; + + /* + * Set some environment variables in current env so they'll get + * passed to all remote tasks + */ + setenvf (NULL, "MPIRUN_PORT", "%d", ntohs (port)); + setenvf (NULL, "MPIRUN_NPROCS", "%d", nprocs); + setenvf (NULL, "MPIRUN_ID", "%d", job->jobid); + + verbose ("mvapich-0.9.[45] master listening on port %d", ntohs (port)); + + return 0; +} diff --git a/src/plugins/mpi/mvapich/mvapich.h b/src/plugins/mpi/mvapich/mvapich.h new file mode 100644 index 0000000000000000000000000000000000000000..647e999506a230f8ec391e3ada4603b57bc7fcfc --- /dev/null +++ b/src/plugins/mpi/mvapich/mvapich.h @@ -0,0 +1,36 @@ +/*****************************************************************************\ + ** mpi_mvapich.c - Library routines for initiating jobs on with mvapich + ** type mpi. + ** $Id: mpi_gmpi.c,v 1.7 2005/06/07 18:25:32 morrone Exp $ + ***************************************************************************** + * Copyright (C) 2004 The Regents of the University of California. + * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). + * Written by Danny Auble <da@llnl.gov> + * UCRL-CODE-2002-040. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://www.llnl.gov/linux/slurm/>. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +\*****************************************************************************/ +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include "src/srun/srun_job.h" +#include "src/slurmd/slurmd_job.h" +#include "src/common/env.h" + +extern int mvapich_thr_create(srun_job_t *job); diff --git a/src/slurmd/Makefile.am b/src/slurmd/Makefile.am index 0275e3a01c041fb3d61cf3478cb4993bee19acc9..9476e69bf4f9385ccafa2b6750cdaf19112756fc 100644 --- a/src/slurmd/Makefile.am +++ b/src/slurmd/Makefile.am @@ -22,7 +22,7 @@ slurmd_SOURCES = \ smgr.c smgr.h \ get_mach_stat.c get_mach_stat.h \ read_proc.c \ - job.c job.h \ + slurmd_job.c slurmd_job.h \ io.c io.h \ semaphore.c semaphore.h \ shm.c shm.h \ @@ -30,7 +30,7 @@ slurmd_SOURCES = \ ulimits.c ulimits.h \ kill_tree.c kill_tree.h \ proctrack.c proctrack.h \ - setproctitle.c setproctitle.h + setproctitle.c setproctitle.h slurmd_LDFLAGS = -export-dynamic $(CMD_LDFLAGS) $(FEDERATION_LDFLAGS) diff --git a/src/slurmd/fname.c b/src/slurmd/fname.c index 05a244c05ecd6200b9420ee76ec4e1fdc5fbc219..a701e951a14081b122a0bea3585b762240c1ad28 100644 --- a/src/slurmd/fname.c +++ b/src/slurmd/fname.c @@ -33,7 +33,6 @@ #include <fcntl.h> #include <unistd.h> -#include "src/slurmd/job.h" #include "src/slurmd/fname.h" #include "src/slurmd/slurmd.h" diff --git a/src/slurmd/fname.h b/src/slurmd/fname.h index ecc33f1b6e45bd10ede147c06922d981ddd386b6..e81c7ac93e59779cf15b7497865bfd81ad6329e6 100644 --- a/src/slurmd/fname.h +++ b/src/slurmd/fname.h @@ -27,7 +27,7 @@ #ifndef _SLURMD_FNAME_H #define _SLURMD_FNAME_H -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" char *fname_create(slurmd_job_t *job, const char *fmt, int taskid); int fname_trunc_all(slurmd_job_t *job, const char *fmt); diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 20d51434e16dee050a08d6e0b69c9f391ab2fca0..1dedc7a3ab6a5b52029f14f3bafaf75aefcc5a4a 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -58,7 +58,6 @@ #include "src/common/xmalloc.h" #include "src/common/xsignal.h" -#include "src/slurmd/job.h" #include "src/slurmd/shm.h" #include "src/slurmd/io.h" #include "src/slurmd/fname.h" @@ -108,7 +107,7 @@ struct io_info { uint32_t id; /* global task id */ io_obj_t *obj; /* pointer back to eio object */ slurmd_job_t *job; /* pointer back to job data */ - task_info_t *task; /* pointer back to task data */ + slurmd_task_info_t *task; /* pointer back to task data */ cbuf_t buf; /* IO buffer */ List readers; /* list of current readers */ List writers; /* list of current writers */ @@ -133,7 +132,7 @@ struct io_info { static void _fatal_cleanup(void *); static int find_obj(void *obj, void *key); /* static int find_fd(void *obj, void *key); */ -static int _io_init_pipes(task_info_t *t); +static int _io_init_pipes(slurmd_task_info_t *t); static int _io_prepare_tasks(slurmd_job_t *); static void * _io_thr(void *); static int _io_write_header(struct io_info *, srun_info_t *); @@ -142,14 +141,14 @@ static void _io_client_attach(io_obj_t *, io_obj_t *, io_obj_t *, static void _io_connect_objs(io_obj_t *, io_obj_t *); static int _shutdown_task_obj(struct io_info *t); static bool _isa_client(struct io_info *io); -static int _open_output_file(slurmd_job_t *job, task_info_t *t, +static int _open_output_file(slurmd_job_t *job, slurmd_task_info_t *t, char *fname, slurmd_io_type_t type); -static int _open_stdin_file(slurmd_job_t *job, task_info_t *t, +static int _open_stdin_file(slurmd_job_t *job, slurmd_task_info_t *t, srun_info_t *srun); static struct io_obj * _io_obj_create(int fd, void *arg); static struct io_info * _io_info_create(uint32_t id); -static struct io_obj * _io_obj(slurmd_job_t *, task_info_t *, int, int); +static struct io_obj * _io_obj(slurmd_job_t *, slurmd_task_info_t *, int, int); static void * _io_thr(void *arg); static void _clear_error_state(struct io_info *io); @@ -274,7 +273,7 @@ _xclose(int fd) * */ static void -_io_finalize(task_info_t *t) +_io_finalize(slurmd_task_info_t *t) { struct io_info *in = t->in->arg; @@ -355,7 +354,7 @@ static void _handle_unprocessed_output(slurmd_job_t *job) { int i; - task_info_t *t; + slurmd_task_info_t *t; struct io_info *io; List readers; size_t n = 0; @@ -408,7 +407,7 @@ static int _io_prepare_tasks(slurmd_job_t *job) { int i; - task_info_t *t; + slurmd_task_info_t *t; io_obj_t *obj; for (i = 0; i < job->ntasks; i++) { @@ -475,7 +474,7 @@ _local_filename (char *fname, int taskid) } static int -_io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, +_io_add_connecting(slurmd_job_t *job, slurmd_task_info_t *t, srun_info_t *srun, slurmd_io_type_t type) { io_obj_t *obj = NULL; @@ -519,7 +518,7 @@ _io_add_connecting(slurmd_job_t *job, task_info_t *t, srun_info_t *srun, * otherwise create a connecting client back to srun process. */ static int -_io_prepare_one(slurmd_job_t *j, task_info_t *t, srun_info_t *s) +_io_prepare_one(slurmd_job_t *j, slurmd_task_info_t *t, srun_info_t *s) { int retval = SLURM_SUCCESS; char *fname = NULL; @@ -626,7 +625,7 @@ _open_task_file(char *filename, int flags) } static int -_open_output_file(slurmd_job_t *job, task_info_t *t, char *fmt, +_open_output_file(slurmd_job_t *job, slurmd_task_info_t *t, char *fmt, slurmd_io_type_t type) { int fd = -1; @@ -662,7 +661,7 @@ _open_output_file(slurmd_job_t *job, task_info_t *t, char *fmt, } static int -_open_stdin_file(slurmd_job_t *job, task_info_t *t, srun_info_t *srun) +_open_stdin_file(slurmd_job_t *job, slurmd_task_info_t *t, srun_info_t *srun) { int fd = -1; io_obj_t *obj = NULL; @@ -951,7 +950,7 @@ _ops_copy(struct io_operations *ops) io_obj_t * -_io_obj(slurmd_job_t *job, task_info_t *t, int fd, int type) +_io_obj(slurmd_job_t *job, slurmd_task_info_t *t, int fd, int type) { struct io_info *io = _io_info_create(t->gtid); struct io_obj *obj = _io_obj_create(fd, (void *)io); @@ -1118,7 +1117,7 @@ _io_write_header(struct io_info *client, srun_info_t *srun) } static int -_io_init_pipes(task_info_t *t) +_io_init_pipes(slurmd_task_info_t *t) { if ( (pipe(t->pin) < 0) || (pipe(t->pout) < 0) @@ -1143,7 +1142,7 @@ _io_init_pipes(task_info_t *t) * close write end of stdin, and read end of stdout/err */ int -io_prepare_child(task_info_t *t) +io_prepare_child(slurmd_task_info_t *t) { if (dup2(t->pin[0], STDIN_FILENO ) < 0) { error("dup2(stdin): %m"); @@ -1309,7 +1308,7 @@ _write(io_obj_t *obj, List objs) static void _do_attach(struct io_info *io) { - task_info_t *t; + slurmd_task_info_t *t; struct io_operations *opsptr; xassert(io != NULL); diff --git a/src/slurmd/io.h b/src/slurmd/io.h index 8442601f6d16bfc8d08ac87ee9a9ea926be9546a..9b3f09ff213a316896f0730f91b35f5445d2a538 100644 --- a/src/slurmd/io.h +++ b/src/slurmd/io.h @@ -28,7 +28,7 @@ #ifndef _IO_H #define _IO_H -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" #include "src/common/eio.h" /* @@ -52,7 +52,7 @@ int io_new_clients(slurmd_job_t *job); void io_obj_destroy(io_obj_t *obj); int io_init_pipes(slurmd_job_t *job); -int io_prepare_child(task_info_t *t); +int io_prepare_child(slurmd_task_info_t *t); void io_close_all(slurmd_job_t *job); diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 53a8c56e5bdd3ffc7d35fa0ef401290f4057bbd2..8afe87feb2de5a3c90d7acdad7c1f8d3ebab46f3 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -721,7 +721,7 @@ _handle_task_exit(slurmd_job_t *job) * read at most ntask task exit codes from session manager */ for (i = 0; i < job->ntasks; i++) { - task_info_t *t; + slurmd_task_info_t *t; if ((len = read(job->fdpair[0], &e, sizeof(e))) < 0) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) @@ -765,7 +765,7 @@ _send_pending_exit_msgs(slurmd_job_t *job) * single message. */ for (i = 0; i < job->ntasks; i++) { - task_info_t *t = job->task[i]; + slurmd_task_info_t *t = job->task[i]; if (!t->exited || t->esent) continue; @@ -862,7 +862,7 @@ _set_unexited_task_status(slurmd_job_t *job, int status) { int i; for (i = 0; i < job->ntasks; i++) { - task_info_t *t = job->task[i]; + slurmd_task_info_t *t = job->task[i]; if (t->exited) continue; diff --git a/src/slurmd/mgr.h b/src/slurmd/mgr.h index 9bdda9c5c4db8637f3d301907b3dedee9de19812..3ddc9d14f59dd614ffdb7eacecded41175833f81 100644 --- a/src/slurmd/mgr.h +++ b/src/slurmd/mgr.h @@ -32,7 +32,7 @@ #include "src/common/slurm_protocol_defs.h" -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" /* Spawn a task / job step on this node */ diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 9cab3e5f80e670b1e38158ae1aa3d0e6f3527638..2bdf8694b74988f895cc0f42ffb915537d128cda 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -913,7 +913,7 @@ _rpc_reattach_tasks(slurm_msg_t *msg, slurm_addr *cli) char host[MAXHOSTNAMELEN]; int i; job_step_t *step; - job_state_t *state; + slurmd_job_state_t *state; task_t *t; uid_t req_uid; gid_t req_gid; diff --git a/src/slurmd/shm.c b/src/slurmd/shm.c index 514f7aa601d02d5ddf601b8b1f68af34002509d4..4acffa5b7e3338312a69c022aa44ca6b95989959 100644 --- a/src/slurmd/shm.c +++ b/src/slurmd/shm.c @@ -557,7 +557,8 @@ shm_update_step_cont_id(uint32_t jobid, uint32_t stepid, uint32_t cont_id) } int -shm_update_step_state(uint32_t jobid, uint32_t stepid, job_state_t state) +shm_update_step_state(uint32_t jobid, uint32_t stepid, + slurmd_job_state_t state) { int i, retval = SLURM_SUCCESS; _shm_lock(); @@ -571,11 +572,11 @@ shm_update_step_state(uint32_t jobid, uint32_t stepid, job_state_t state) return retval; } -job_state_t * +slurmd_job_state_t * shm_lock_step_state(uint32_t jobid, uint32_t stepid) { int i; - job_state_t *state = NULL; + slurmd_job_state_t *state = NULL; _shm_lock(); if ((i = _shm_find_step(jobid, stepid)) >= 0) state = &slurmd_shm->step[i].state; diff --git a/src/slurmd/shm.h b/src/slurmd/shm.h index 78e0d4e220609f8d6570c735465af09430430e7f..cc77b8d12c161185f3c6a84f80c0f821db00e342 100644 --- a/src/slurmd/shm.h +++ b/src/slurmd/shm.h @@ -51,7 +51,7 @@ #include "src/common/slurm_protocol_api.h" #include "src/common/list.h" -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" typedef struct task task_t; typedef struct job_step job_step_t; @@ -88,7 +88,7 @@ struct job_step { srun_key_t key; /* last key from srun client */ - job_state_t state; /* Job step status */ + slurmd_job_state_t state; /* Job step status */ time_t timelimit; /* job time limit */ task_t *task_list; /* list of this step's tasks */ }; @@ -208,7 +208,9 @@ int shm_update_step_spid(uint32_t jobid, uint32_t stepid, int spid); /* * update job step state */ -int shm_update_step_state(uint32_t jobid, uint32_t stepid, job_state_t state); +int shm_update_step_state(uint32_t jobid, + uint32_t stepid, + slurmd_job_state_t state); /* @@ -219,7 +221,7 @@ int shm_update_step_state(uint32_t jobid, uint32_t stepid, job_state_t state); * it returns a pointer into the shared memory region instead of a copy * of the data. Callers should remain cognizant of this fact. ) */ -job_state_t *shm_lock_step_state(uint32_t jobid, uint32_t stepid); +slurmd_job_state_t *shm_lock_step_state(uint32_t jobid, uint32_t stepid); /* unlock job step state */ diff --git a/src/slurmd/job.c b/src/slurmd/slurmd_job.c similarity index 97% rename from src/slurmd/job.c rename to src/slurmd/slurmd_job.c index ef4dc135ff7fa3028080827bf289ec4a976830b3..898d6adce9c4852714f59a79ae39915e4b2a1b03 100644 --- a/src/slurmd/job.c +++ b/src/slurmd/slurmd_job.c @@ -1,6 +1,6 @@ /*****************************************************************************\ * src/slurmd/job.c - slurmd_job_t routines - * $Id$ + * $Id: job.c,v 1.51 2005/06/28 19:12:48 da Exp $ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -45,7 +45,7 @@ #include "src/common/eio.h" #include "src/common/slurm_protocol_api.h" -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" #include "src/slurmd/shm.h" #include "src/slurmd/io.h" #include "src/slurmd/fname.h" @@ -392,7 +392,8 @@ _job_init_task_info(slurmd_job_t *job, uint32_t *gtid) int i; int n = job->ntasks; - job->task = (task_info_t **) xmalloc(n * sizeof(task_info_t *)); + job->task = (slurmd_task_info_t **) + xmalloc(n * sizeof(slurmd_task_info_t *)); for (i = 0; i < n; i++){ job->task[i] = task_info_create(i, gtid[i]); @@ -422,7 +423,7 @@ job_signal_tasks(slurmd_job_t *job, int signal) void job_kill(slurmd_job_t *job, int rc) { - job_state_t *state; + slurmd_job_state_t *state; xassert(job != NULL); @@ -533,10 +534,10 @@ srun_info_destroy(struct srun_info *srun) xfree(srun); } -task_info_t * +slurmd_task_info_t * task_info_create(int taskid, int gtaskid) { - task_info_t *t = (task_info_t *) xmalloc(sizeof(*t)); + slurmd_task_info_t *t = (slurmd_task_info_t *) xmalloc(sizeof(*t)); xassert(taskid >= 0); xassert(gtaskid >= 0); @@ -564,7 +565,7 @@ task_info_create(int taskid, int gtaskid) void -task_info_destroy(task_info_t *t) +task_info_destroy(slurmd_task_info_t *t) { slurm_mutex_lock(&t->mutex); list_destroy(t->srun_list); @@ -607,7 +608,7 @@ job_update_shm(slurmd_job_t *job) } int -job_update_state(slurmd_job_t *job, job_state_t s) +job_update_state(slurmd_job_t *job, slurmd_job_state_t s) { return shm_update_step_state(job->jobid, job->stepid, s); } diff --git a/src/slurmd/job.h b/src/slurmd/slurmd_job.h similarity index 93% rename from src/slurmd/job.h rename to src/slurmd/slurmd_job.h index 9f27d24e277942bf8e1b49b462bef868562e7f08..af3df66b6713fcb3ce98a8cffc048dce40fdd0e0 100644 --- a/src/slurmd/job.h +++ b/src/slurmd/slurmd_job.h @@ -1,6 +1,6 @@ /*****************************************************************************\ * src/slurmd/job.h slurmd_job_t definition - * $Id$ + * $Id: job.h,v 1.29 2005/06/24 18:08:30 da Exp $ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -50,12 +50,22 @@ typedef struct srun_key { unsigned char data[SLURM_IO_KEY_SIZE]; } srun_key_t; +typedef struct srun_info { + srun_key_t *key; /* srun key for IO verification */ + slurm_addr resp_addr; /* response addr for task exit msg */ + slurm_addr ioaddr; /* Address to connect on for I/O */ + char * ofname; /* output file (if any) */ + char * efname; /* error file (if any) */ + char * ifname; /* input file (if any) */ + +} srun_info_t; + typedef enum task_state { SLURMD_TASK_INIT, SLURMD_TASK_STARTING, SLURMD_TASK_RUNNING, SLURMD_TASK_COMPLETE -} task_state_t; +} slurmd_task_state_t; /* local job states */ typedef enum job_state { @@ -65,11 +75,11 @@ typedef enum job_state { SLURMD_JOB_STARTED, SLURMD_JOB_ENDING, SLURMD_JOB_COMPLETE -} job_state_t; +} slurmd_job_state_t; typedef struct task_info { pthread_mutex_t mutex; /* mutex to protect task state */ - task_state_t state; /* task state */ + slurmd_task_state_t state; /* task state */ int id; /* local task id */ uint32_t gtid; /* global task id */ @@ -86,18 +96,7 @@ typedef struct task_info { int estatus; /* this task's exit status */ List srun_list; /* List of srun objs for this task */ -} task_info_t; - - -typedef struct srun_info { - srun_key_t *key; /* srun key for IO verification */ - slurm_addr resp_addr; /* response addr for task exit msg */ - slurm_addr ioaddr; /* Address to connect on for I/O */ - char * ofname; /* output file (if any) */ - char * efname; /* error file (if any) */ - char * ifname; /* input file (if any) */ - -} srun_info_t; +} slurmd_task_info_t; typedef struct slurmd_job { uint32_t jobid; /* Current SLURM job id */ @@ -122,7 +121,7 @@ typedef struct slurmd_job { time_t timelimit; /* time at which job must stop */ struct passwd *pwd; /* saved passwd struct for user job */ - task_info_t **task; /* list of task information pointers */ + slurmd_task_info_t **task; /* list of task information pointers */ eio_t eio; List objs; /* list of IO objects */ List sruns; /* List of sruns */ @@ -154,13 +153,13 @@ struct srun_info * srun_info_create(slurm_cred_t cred, slurm_addr *respaddr, void srun_info_destroy(struct srun_info *srun); -struct task_info * task_info_create(int taskid, int gtaskid); +slurmd_task_info_t * task_info_create(int taskid, int gtaskid); -void task_info_destroy(struct task_info *t); +void task_info_destroy(slurmd_task_info_t *t); int job_update_shm(slurmd_job_t *job); -int job_update_state(slurmd_job_t *job, job_state_t s); +int job_update_state(slurmd_job_t *job, slurmd_job_state_t s); void job_delete_shm(slurmd_job_t *job); diff --git a/src/slurmd/smgr.c b/src/slurmd/smgr.c index 62ad39767a9fcaee2c5a1ca626e2699525663205..d67b7c91d1bdd961a622f4ce5450c621908e1f6a 100644 --- a/src/slurmd/smgr.c +++ b/src/slurmd/smgr.c @@ -247,7 +247,7 @@ _cleanup_file_descriptors(slurmd_job_t *j) { int i; for (i = 0; i < j->ntasks; i++) { - task_info_t *t = j->task[i]; + slurmd_task_info_t *t = j->task[i]; /* * Ignore errors on close() */ @@ -370,7 +370,7 @@ _exec_all_tasks(slurmd_job_t *job) static void _exec_task(slurmd_job_t *job, int i) { - task_info_t *t = NULL; + slurmd_task_info_t *t = NULL; if (xsignal_unblock(smgr_sigarray) < 0) { error("unable to unblock signals"); exit(1); @@ -399,10 +399,12 @@ _exec_task(slurmd_job_t *job, int i) error("Unable to attach to interconnect: %m"); exit(1); } + + slurmd_mpi_init (job, job->task[i]->gtid); _pdebug_stop_current(job); } - + /* * If io_prepare_child() is moved above interconnect_attach() * this causes EBADF from qsw_attach(). Why? diff --git a/src/slurmd/smgr.h b/src/slurmd/smgr.h index f1c1a7a3549c66f74caff0a739377a9e366eadc4..f44549c42230aa1befdc533f76456fa7c6aca64b 100644 --- a/src/slurmd/smgr.h +++ b/src/slurmd/smgr.h @@ -38,7 +38,7 @@ # include <sys/types.h> #endif /* HAVE_SYS_TYPES_H */ -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" /* * Task exit code information diff --git a/src/slurmd/ulimits.c b/src/slurmd/ulimits.c index 8cce5606f8bfaa248d53843dcc07608ea8627cf9..b7f5e570fd76192e03e84048e375bffb85971cde 100644 --- a/src/slurmd/ulimits.c +++ b/src/slurmd/ulimits.c @@ -39,7 +39,7 @@ #include "src/common/strlcpy.h" #include "src/common/xmalloc.h" -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" struct userlim { char *var; diff --git a/src/slurmd/ulimits.h b/src/slurmd/ulimits.h index a89f5f8b794a2d01d5315611ffed5513d66d43f4..18063866cebb4417e9bdc384b790fced53b2881f 100644 --- a/src/slurmd/ulimits.h +++ b/src/slurmd/ulimits.h @@ -27,7 +27,7 @@ #ifndef _SLURMD_ULIMITS_H #define _SLURMD_ULIMITS_H -#include "src/slurmd/job.h" +#include "src/slurmd/slurmd_job.h" /* * Set user resource limits as defined by SLURM_RLIMIT* environment diff --git a/src/srun/Makefile.am b/src/srun/Makefile.am index 800e8127d0a19c1304eef3e9dde91b559752da95..af334630a0d5b41d51d4478fc0daed4a966b43b5 100644 --- a/src/srun/Makefile.am +++ b/src/srun/Makefile.am @@ -10,10 +10,9 @@ bin_PROGRAMS = srun srun_SOURCES = \ srun.c \ opt.c opt.h \ - job.c job.h \ - gmpi.c gmpi.h \ - net.c net.h \ + srun_job.c srun_job.h \ msg.c msg.h \ + signals.c signals.h \ io.c io.h \ launch.c \ launch.h \ @@ -23,8 +22,6 @@ srun_SOURCES = \ reattach.h \ fname.c \ fname.h \ - signals.c \ - signals.h \ sigstr.c \ sigstr.h \ allocate.c \ diff --git a/src/srun/allocate.c b/src/srun/allocate.c index 13a7b815ae5c296db661a973c659fdb6074aafd5..91fbf5cb74ba3159f9c113712dc7e589f1d83b68 100644 --- a/src/srun/allocate.c +++ b/src/srun/allocate.c @@ -44,7 +44,6 @@ #include "src/srun/allocate.h" #include "src/srun/msg.h" #include "src/srun/opt.h" -//#include "src/srun/env.h" #include "src/srun/attach.h" #define MAX_ALLOC_WAIT 60 /* seconds */ @@ -64,7 +63,7 @@ static void _wait_for_resources(resource_allocation_response_msg_t **resp); static bool _retry(); static void _intr_handler(int signo); -static job_step_create_request_msg_t * _step_req_create(job_t *j); +static job_step_create_request_msg_t * _step_req_create(srun_job_t *j); static void _step_req_destroy(job_step_create_request_msg_t *r); static sig_atomic_t destroy_job = 0; @@ -458,7 +457,7 @@ job_desc_msg_destroy(job_desc_msg_t *j) } static job_step_create_request_msg_t * -_step_req_create(job_t *j) +_step_req_create(srun_job_t *j) { job_step_create_request_msg_t *r = xmalloc(sizeof(*r)); r->job_id = j->jobid; @@ -501,7 +500,7 @@ _step_req_destroy(job_step_create_request_msg_t *r) } int -create_job_step(job_t *job) +create_job_step(srun_job_t *job) { job_step_create_request_msg_t *req = NULL; job_step_create_response_msg_t *resp = NULL; diff --git a/src/srun/allocate.h b/src/srun/allocate.h index e0b8223494f6c5ea633a7a5a489b50e03124e5b8..c5b9afbf65fec2613dfaa624dcee3b369c63c49d 100644 --- a/src/srun/allocate.h +++ b/src/srun/allocate.h @@ -30,7 +30,7 @@ #include <slurm/slurm.h> -#include "src/srun/job.h" +#include "src/srun/srun_job.h" /* * Allocate nodes from the slurm controller -- retrying the attempt @@ -83,7 +83,7 @@ uint32_t jobid_from_env(void); * * Returns -1 if job step creation failure, 0 otherwise */ -int create_job_step(job_t *j); +int create_job_step(srun_job_t *j); #endif /* !_HAVE_ALLOCATE_H */ diff --git a/src/srun/fname.c b/src/srun/fname.c index a332238a21bf36fee2a6d11435569623bd31d72d..df8e0ad85ee81c417600f58720ed4a073a9843bf 100644 --- a/src/srun/fname.c +++ b/src/srun/fname.c @@ -5,9 +5,8 @@ #include <string.h> #include <ctype.h> -#include "src/srun/job.h" #include "src/srun/fname.h" -#include "src/srun/opt.h" +#include "src/srun/srun_job.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" @@ -24,7 +23,7 @@ * filename type to one of the io types ALL, NONE, PER_TASK, ONE */ io_filename_t * -fname_create(job_t *job, char *format) +fname_create(srun_job_t *job, char *format) { unsigned long int wid = 0; unsigned long int taskid = 0; diff --git a/src/srun/fname.h b/src/srun/fname.h index 32752d35f34b108efc40440171b9605e59833b9a..4679c2c472e8270c44733a26012f1c1876c123e0 100644 --- a/src/srun/fname.h +++ b/src/srun/fname.h @@ -39,9 +39,6 @@ typedef struct io_filename { int taskid; /* taskid for IO if IO_ONE */ } io_filename_t; -/* need to predeclare srun_job to resolve declaration dependencies - */ -typedef struct srun_job * srun_job_t; /* * Create an filename from a (probably user supplied) filename format. @@ -49,7 +46,9 @@ typedef struct srun_job * srun_job_t; * leaving node or task specific format specifiers for the remote * slurmd to handle. */ -io_filename_t * fname_create(srun_job_t job, char *format); +typedef struct srun_job fname_job_t; + +io_filename_t *fname_create(fname_job_t *job, char *format); void fname_destroy(io_filename_t *fname); char * fname_remote_string (io_filename_t *fname); diff --git a/src/srun/io.c b/src/srun/io.c index 6cd2249d5864d6cedd34fe0df26c16ce91268732..72c78b07cd578893a70c8743a4c2edc8f820ddc8 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -49,10 +49,10 @@ #include "src/common/xmalloc.h" #include "src/common/xsignal.h" #include "src/common/io_hdr.h" +#include "src/common/net.h" #include "src/srun/io.h" -#include "src/srun/job.h" -#include "src/srun/net.h" +#include "src/srun/srun_job.h" #include "src/srun/opt.h" static int fmt_width = 0; @@ -67,18 +67,18 @@ typedef struct fd_info { cbuf_t buf; } fd_info_t; -static void _accept_io_stream(job_t *job, int i); -static void _bcast_stdin(int fd, job_t *job); +static void _accept_io_stream(srun_job_t *job, int i); +static void _bcast_stdin(int fd, srun_job_t *job); static int _close_stream(int *fd, FILE *out, int tasknum); static int _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum); static int _do_task_output_poll(fd_info_t *info); -static int _do_task_input(job_t *job, int taskid); -static int _do_task_input_poll(job_t *job, fd_info_t *info); -static inline bool _io_thr_done(job_t *job); +static int _do_task_input(srun_job_t *job, int taskid); +static int _do_task_input_poll(srun_job_t *job, fd_info_t *info); +static inline bool _io_thr_done(srun_job_t *job); static int _handle_pollerr(fd_info_t *info); static ssize_t _readx(int fd, char *buf, size_t maxbytes); -static int _read_io_header(int fd, job_t *job, char *host); -static void _terminate_node_io(int node_inx, job_t *job); +static int _read_io_header(int fd, srun_job_t *job, char *host); +static void _terminate_node_io(int node_inx, srun_job_t *job); #define _poll_set_rd(_pfd, _fd) do { \ (_pfd).fd = _fd; \ (_pfd).events = POLLIN; \ @@ -108,7 +108,7 @@ _do_task_output_poll(fd_info_t *info) } static int -_do_task_input_poll(job_t *job, fd_info_t *info) +_do_task_input_poll(srun_job_t *job, fd_info_t *info) { return _do_task_input(job, info->taskid); } @@ -138,7 +138,7 @@ _handle_pollerr(fd_info_t *info) } static void -_set_iofds_nonblocking(job_t *job) +_set_iofds_nonblocking(srun_job_t *job) { int i; for (i = 0; i < job->niofds; i++) @@ -153,7 +153,7 @@ _set_iofds_nonblocking(job_t *job) } static void -_update_task_io_state(job_t *job, int taskid) +_update_task_io_state(srun_job_t *job, int taskid) { slurm_mutex_lock(&job->task_mutex); if (job->task_state[taskid] == SRUN_TASK_IO_WAIT) @@ -208,7 +208,7 @@ _do_output(cbuf_t buf, FILE *out, int tasknum) } static void -_flush_io(job_t *job) +_flush_io(srun_job_t *job) { int i; @@ -246,7 +246,7 @@ _initial_fd_state (io_filename_t *f, int task) } static void -_io_thr_init(job_t *job, struct pollfd *fds) +_io_thr_init(srun_job_t *job, struct pollfd *fds) { int i; sigset_t set; @@ -286,7 +286,7 @@ _fd_info_init(fd_info_t *info, int taskid, int *pfd, FILE *fp, cbuf_t buf) } static int -_stdin_buffer_space (job_t *job) +_stdin_buffer_space (srun_job_t *job) { int i, nfree, len = 0; for (i = 0; i < opt.nprocs; i++) { @@ -299,7 +299,7 @@ _stdin_buffer_space (job_t *job) } static nfds_t -_setup_pollfds(job_t *job, struct pollfd *fds, fd_info_t *map) +_setup_pollfds(srun_job_t *job, struct pollfd *fds, fd_info_t *map) { int eofcnt = 0; int i; @@ -357,8 +357,9 @@ _setup_pollfds(job_t *job, struct pollfd *fds, fd_info_t *map) /* exit if we have received EOF on all streams */ if (eofcnt) { - if ( (eofcnt == opt.nprocs) || - ((opt.mpi_type == MPI_LAM) && (eofcnt == job->nhosts)) ) { + if ((eofcnt == opt.nprocs) + || (slurm_mpi_single_task_per_node() + && (eofcnt == job->nhosts))) { debug("got EOF on all streams"); _flush_io(job); pthread_exit(0); @@ -372,7 +373,7 @@ static void * _io_thr_poll(void *job_arg) { int i, rc; - job_t *job = (job_t *) job_arg; + srun_job_t *job = (srun_job_t *) job_arg; int numfds = (opt.nprocs*2) + job->niofds + 3; nfds_t nfds = 0; struct pollfd fds[numfds]; @@ -458,7 +459,7 @@ _io_thr_poll(void *job_arg) } static inline bool -_io_thr_done(job_t *job) +_io_thr_done(srun_job_t *job) { bool retval; slurm_mutex_lock(&job->state_mutex); @@ -508,7 +509,7 @@ _is_local_file (io_filename_t *fname) int -open_streams(job_t *job) +open_streams(srun_job_t *job) { if (_is_local_file (job->ifname)) job->stdinfd = _stdin_open(job->ifname->name); @@ -556,7 +557,7 @@ _wid(int n) } int -io_thr_create(job_t *job) +io_thr_create(srun_job_t *job) { int i; pthread_attr_t attr; @@ -606,7 +607,7 @@ _is_fd_ready(int fd) static int -_read_io_header(int fd, job_t *job, char *host) +_read_io_header(int fd, srun_job_t *job, char *host) { int size = io_hdr_packed_size(); cbuf_t cb = cbuf_create(size, size); @@ -660,7 +661,7 @@ _read_io_header(int fd, job_t *job, char *host) static void -_accept_io_stream(job_t *job, int i) +_accept_io_stream(srun_job_t *job, int i) { int j; int fd = job->iofd[i]; @@ -768,7 +769,7 @@ _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) } static int -_do_task_input(job_t *job, int taskid) +_do_task_input(srun_job_t *job, int taskid) { int len = 0; cbuf_t buf = job->inbuf[taskid]; @@ -809,7 +810,7 @@ _readx(int fd, char *buf, size_t maxbytes) static void -_write_all(job_t *job, cbuf_t cb, char *buf, size_t len, int taskid) +_write_all(srun_job_t *job, cbuf_t cb, char *buf, size_t len, int taskid) { int n = 0; int dropped = 0; @@ -827,7 +828,7 @@ _write_all(job_t *job, cbuf_t cb, char *buf, size_t len, int taskid) } static void -_close_stdin(job_t *j) +_close_stdin(srun_job_t *j) { close(j->stdinfd); j->stdinfd = IO_DONE; @@ -836,7 +837,7 @@ _close_stdin(job_t *j) } static void -_bcast_stdin(int fd, job_t *job) +_bcast_stdin(int fd, srun_job_t *job) { int i; char buf[4096]; @@ -887,7 +888,7 @@ _bcast_stdin(int fd, job_t *job) * io_thr_wake - Wake the I/O thread if it is blocking in poll(). */ void -io_thr_wake(job_t *job) +io_thr_wake(srun_job_t *job) { char c; @@ -901,7 +902,7 @@ io_thr_wake(job_t *job) * Flag them as done and signal the I/O thread. */ extern int -io_node_fail(char *nodelist, job_t *job) +io_node_fail(char *nodelist, srun_job_t *job) { hostlist_t fail_list = hostlist_create(nodelist); char *node_name; @@ -927,7 +928,7 @@ io_node_fail(char *nodelist, job_t *job) } static void -_terminate_node_io(int node_inx, job_t *job) +_terminate_node_io(int node_inx, srun_job_t *job) { int i; diff --git a/src/srun/io.h b/src/srun/io.h index 1485743461fca0db3c793fa48fbc5b9b68a3a73b..4b41c58db0f86b2d3a53b8a854eddacd19335bcf 100644 --- a/src/srun/io.h +++ b/src/srun/io.h @@ -27,15 +27,15 @@ #ifndef _HAVE_IO_H #define _HAVE_IO_H -#include "src/srun/job.h" +#include "src/srun/srun_job.h" #define WAITING_FOR_IO -1 #define IO_DONE -9 -int io_node_fail(char *nodelist, job_t *job); +int io_node_fail(char *nodelist, srun_job_t *job); void *io_thr(void *arg); -int io_thr_create(job_t *job); -void io_thr_wake(job_t *job); -int open_streams(job_t *job); +int io_thr_create(srun_job_t *job); +void io_thr_wake(srun_job_t *job); +int open_streams(srun_job_t *job); #endif /* !_HAVE_IO_H */ diff --git a/src/srun/launch.c b/src/srun/launch.c index 795177a716bf62e7850a69cbff294a5c550adc24..0fa380d607fc31de679e652ea1d27c2e1cc0c544 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -41,10 +41,9 @@ #include "src/common/xmalloc.h" #include "src/common/xsignal.h" -#include "src/srun/job.h" +#include "src/srun/srun_job.h" #include "src/srun/launch.h" #include "src/srun/opt.h" -//#include "src/srun/env.h" extern char **environ; @@ -58,7 +57,7 @@ typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; typedef struct task_info { slurm_msg_t *req; - job_t *job; + srun_job_t *job; } task_info_t; typedef struct thd { @@ -70,14 +69,14 @@ typedef struct thd { static int _check_pending_threads(thd_t *thd, int count); static void _spawn_launch_thr(thd_t *th); -static int _wait_on_active(thd_t *thd, job_t *job); -static void _p_launch(slurm_msg_t *req_array_ptr, job_t *job); +static int _wait_on_active(thd_t *thd, srun_job_t *job); +static void _p_launch(slurm_msg_t *req_array_ptr, srun_job_t *job); static void * _p_launch_task(void *args); static void _print_launch_msg(launch_tasks_request_msg_t *msg, char * hostname); int -launch_thr_create(job_t *job) +launch_thr_create(srun_job_t *job) { int e; pthread_attr_t attr; @@ -96,7 +95,7 @@ launch(void *arg) { slurm_msg_t *req_array_ptr; launch_tasks_request_msg_t *msg_array_ptr; - job_t *job = (job_t *) arg; + srun_job_t *job = (srun_job_t *) arg; int i, my_envc; char hostname[MAXHOSTNAMELEN]; @@ -139,8 +138,8 @@ launch(void *arg) r->task_flags |= TASK_PARALLEL_DEBUG; /* Node specific message contents */ - if (opt.mpi_type == MPI_LAM) - r->tasks_to_launch = 1; /* just launch one task */ + if (slurm_mpi_single_task_per_node ()) + r->tasks_to_launch = 1; else r->tasks_to_launch = job->ntask[i]; r->global_task_ids = job->tids[i]; @@ -159,7 +158,7 @@ launch(void *arg) xfree(req_array_ptr); if (fail_launch_cnt) { - job_state_t jstate; + srun_job_state_t jstate; slurm_mutex_lock(&job->state_mutex); jstate = job->state; @@ -169,7 +168,7 @@ launch(void *arg) error("%d launch request%s failed", fail_launch_cnt, fail_launch_cnt > 1 ? "s" : ""); job->rc = 124; - job_kill(job); + srun_job_kill(job); } } else { @@ -249,7 +248,7 @@ static void _spawn_launch_thr(thd_t *th) return; } -static int _wait_on_active(thd_t *thd, job_t *job) +static int _wait_on_active(thd_t *thd, srun_job_t *job) { struct timeval now; struct timespec timeout; @@ -270,7 +269,7 @@ static int _wait_on_active(thd_t *thd, job_t *job) } /* _p_launch - parallel (multi-threaded) task launcher */ -static void _p_launch(slurm_msg_t *req, job_t *job) +static void _p_launch(slurm_msg_t *req, srun_job_t *job) { int i; thd_t *thd; @@ -351,7 +350,7 @@ _send_msg_rc(slurm_msg_t *msg) } static void -_update_failed_node(job_t *j, int id) +_update_failed_node(srun_job_t *j, int id) { int i; pthread_mutex_lock(&j->task_mutex); @@ -365,7 +364,7 @@ _update_failed_node(job_t *j, int id) } static void -_update_contacted_node(job_t *j, int id) +_update_contacted_node(srun_job_t *j, int id) { pthread_mutex_lock(&j->task_mutex); if (j->host_state[id] == SRUN_HOST_INIT) @@ -381,7 +380,7 @@ static void * _p_launch_task(void *arg) task_info_t *tp = &(th->task); slurm_msg_t *req = tp->req; launch_tasks_request_msg_t *msg = req->data; - job_t *job = tp->job; + srun_job_t *job = tp->job; int nodeid = msg->srun_node_id; int failure = 0; int retry = 3; /* retry thrice */ diff --git a/src/srun/launch.h b/src/srun/launch.h index 8acf6ed4a2fd7eb239824a6a93cc570b2e18cefe..c7d99b5d466ae2dc36235ab5be50399e2042a605 100644 --- a/src/srun/launch.h +++ b/src/srun/launch.h @@ -39,18 +39,18 @@ #include "src/common/slurm_protocol_api.h" #include "src/srun/opt.h" -#include "src/srun/job.h" +#include "src/srun/srun_job.h" typedef struct launch_thr { pthread_t thread; pthread_attr_t attr; - char *host; /* name of host on which to run */ - int ntasks; /* number of tasks to initiate on host*/ - int *taskid; /* list of global task ids */ - int i; /* temporary index into array */ + char *host; /* name of host on which to run */ + int ntasks; /* number of tasks to initiate on host*/ + int *taskid; /* list of global task ids */ + int i; /* temporary index into array */ } launch_thr_t; -int launch_thr_create(job_t *job); +int launch_thr_create(srun_job_t *job); void * launch(void *arg); #endif /* !_HAVE_LAUNCH_H */ diff --git a/src/srun/msg.c b/src/srun/msg.c index 71bae06f51f0499aea0205226a14029b20d687c0..9d091a16f62b307510032b27b41ea2d78d4a9a8c 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -54,11 +54,10 @@ #include "src/common/xassert.h" #include "src/common/xmalloc.h" -#include "src/srun/job.h" +#include "src/srun/srun_job.h" #include "src/srun/opt.h" #include "src/srun/io.h" #include "src/srun/msg.h" -#include "src/srun/signals.h" #include "src/srun/sigstr.h" #include "src/srun/attach.h" @@ -73,21 +72,21 @@ static slurm_fd slurmctld_fd = (slurm_fd) NULL; /* * Static prototypes */ -static void _accept_msg_connection(job_t *job, int fdnum); -static void _confirm_launch_complete(job_t *job); -static void _dump_proctable(job_t *job); -static void _exit_handler(job_t *job, slurm_msg_t *exit_msg); -static void _handle_msg(job_t *job, slurm_msg_t *msg); -static inline bool _job_msg_done(job_t *job); -static void _launch_handler(job_t *job, slurm_msg_t *resp); -static void _do_poll_timeout(job_t *job); -static int _get_next_timeout(job_t *job); -static void _msg_thr_poll(job_t *job); -static void _set_jfds_nonblocking(job_t *job); +static void _accept_msg_connection(srun_job_t *job, int fdnum); +static void _confirm_launch_complete(srun_job_t *job); +static void _dump_proctable(srun_job_t *job); +static void _exit_handler(srun_job_t *job, slurm_msg_t *exit_msg); +static void _handle_msg(srun_job_t *job, slurm_msg_t *msg); +static inline bool _job_msg_done(srun_job_t *job); +static void _launch_handler(srun_job_t *job, slurm_msg_t *resp); +static void _do_poll_timeout(srun_job_t *job); +static int _get_next_timeout(srun_job_t *job); +static void _msg_thr_poll(srun_job_t *job); +static void _set_jfds_nonblocking(srun_job_t *job); static void _print_pid_list(const char *host, int ntasks, uint32_t *pid, char *executable_name); static void _timeout_handler(time_t timeout); -static void _node_fail_handler(char *nodelist, job_t *job); +static void _node_fail_handler(char *nodelist, srun_job_t *job); #define _poll_set_rd(_pfd, _fd) do { \ (_pfd).fd = _fd; \ @@ -109,7 +108,7 @@ static void _node_fail_handler(char *nodelist, job_t *job); * and the number of tasks `ntasks' with pid array `pid' */ static void -_build_proctable(job_t *job, char *host, int nodeid, int ntasks, uint32_t *pid) +_build_proctable(srun_job_t *job, char *host, int nodeid, int ntasks, uint32_t *pid) { int i; static int tasks_recorded = 0; @@ -139,7 +138,7 @@ _build_proctable(job_t *job, char *host, int nodeid, int ntasks, uint32_t *pid) } } -static void _dump_proctable(job_t *job) +static void _dump_proctable(srun_job_t *job) { int node_inx, task_inx, taskid, max_task = 0; MPIR_PROCDESC *tv; @@ -195,7 +194,7 @@ static void _timeout_handler(time_t timeout) * not. The job will continue to execute given the --no-kill option. * Otherwise all of the job's tasks and the job itself are killed.. */ -static void _node_fail_handler(char *nodelist, job_t *job) +static void _node_fail_handler(char *nodelist, srun_job_t *job) { if ( (opt.no_kill) && (io_node_fail(nodelist, job) == SLURM_SUCCESS) ) { @@ -211,13 +210,13 @@ static void _node_fail_handler(char *nodelist, job_t *job) io_thr_wake(job); } -static bool _job_msg_done(job_t *job) +static bool _job_msg_done(srun_job_t *job) { return (job->state >= SRUN_JOB_TERMINATED); } static void -_process_launch_resp(job_t *job, launch_tasks_response_msg_t *msg) +_process_launch_resp(srun_job_t *job, launch_tasks_response_msg_t *msg) { if ((msg->srun_node_id < 0) || (msg->srun_node_id >= job->nhosts)) { error ("Bad launch response from %s", msg->node_name); @@ -235,7 +234,7 @@ _process_launch_resp(job_t *job, launch_tasks_response_msg_t *msg) } static void -update_running_tasks(job_t *job, uint32_t nodeid) +update_running_tasks(srun_job_t *job, uint32_t nodeid) { int i; debug2("updating %d running tasks for node %d", @@ -249,7 +248,7 @@ update_running_tasks(job_t *job, uint32_t nodeid) } static void -update_failed_tasks(job_t *job, uint32_t nodeid) +update_failed_tasks(srun_job_t *job, uint32_t nodeid) { int i; slurm_mutex_lock(&job->task_mutex); @@ -267,7 +266,7 @@ update_failed_tasks(job_t *job, uint32_t nodeid) } static void -_launch_handler(job_t *job, slurm_msg_t *resp) +_launch_handler(srun_job_t *job, slurm_msg_t *resp) { launch_tasks_response_msg_t *msg = resp->data; @@ -304,7 +303,7 @@ _launch_handler(job_t *job, slurm_msg_t *resp) * confirm that all tasks registers a sucessful launch * pthread_exit with job kill on failure */ static void -_confirm_launch_complete(job_t *job) +_confirm_launch_complete(srun_job_t *job) { int i; @@ -327,7 +326,7 @@ _confirm_launch_complete(job_t *job) } static void -_reattach_handler(job_t *job, slurm_msg_t *msg) +_reattach_handler(srun_job_t *job, slurm_msg_t *msg) { int i; reattach_tasks_response_msg_t *resp = msg->data; @@ -389,7 +388,7 @@ _reattach_handler(job_t *job, slurm_msg_t *msg) static void -_print_exit_status(job_t *job, hostlist_t hl, char *host, int status) +_print_exit_status(srun_job_t *job, hostlist_t hl, char *host, int status) { char buf[1024]; char *corestr = ""; @@ -431,7 +430,7 @@ _print_exit_status(job_t *job, hostlist_t hl, char *host, int status) } static void -_die_if_signaled(job_t *job, int status) +_die_if_signaled(srun_job_t *job, int status) { bool signaled = false; @@ -446,7 +445,7 @@ _die_if_signaled(job_t *job, int status) } static void -_exit_handler(job_t *job, slurm_msg_t *exit_msg) +_exit_handler(srun_job_t *job, slurm_msg_t *exit_msg) { task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data; hostlist_t hl = hostlist_create(NULL); @@ -484,8 +483,9 @@ _exit_handler(job_t *job, slurm_msg_t *exit_msg) slurm_mutex_unlock(&job->task_mutex); tasks_exited++; - if ( (tasks_exited == opt.nprocs) || - ((opt.mpi_type == MPI_LAM) && (tasks_exited == job->nhosts)) ) { + if ((tasks_exited == opt.nprocs) + || (slurm_mpi_single_task_per_node () + && (tasks_exited == job->nhosts))) { debug2("All tasks exited"); update_job_state(job, SRUN_JOB_TERMINATED); } @@ -513,13 +513,13 @@ _exit_handler(job_t *job, slurm_msg_t *exit_msg) first_time = 0; - job_kill(job); + srun_job_kill(job); } } } static void -_handle_msg(job_t *job, slurm_msg_t *msg) +_handle_msg(srun_job_t *job, slurm_msg_t *msg) { uid_t req_uid = g_slurm_auth_get_uid(msg->cred); uid_t uid = getuid(); @@ -580,7 +580,7 @@ _handle_msg(job_t *job, slurm_msg_t *msg) /* NOTE: One extra FD for incoming slurmctld messages */ static void -_accept_msg_connection(job_t *job, int fdnum) +_accept_msg_connection(srun_job_t *job, int fdnum) { slurm_fd fd = (slurm_fd) NULL; slurm_msg_t *msg = NULL; @@ -632,7 +632,7 @@ _accept_msg_connection(job_t *job, int fdnum) static void -_set_jfds_nonblocking(job_t *job) +_set_jfds_nonblocking(srun_job_t *job) { int i; for (i = 0; i < job->njfds; i++) @@ -644,7 +644,7 @@ _set_jfds_nonblocking(job_t *job) * NOTE: One extra FD for incoming slurmctld messages */ static int -_do_poll(job_t *job, struct pollfd *fds, int timeout) +_do_poll(srun_job_t *job, struct pollfd *fds, int timeout) { nfds_t nfds = (job->njfds + 1); int rc, to; @@ -674,7 +674,7 @@ _do_poll(job_t *job, struct pollfd *fds, int timeout) * Get the next timeout in seconds from now. */ static int -_get_next_timeout(job_t *job) +_get_next_timeout(srun_job_t *job) { int timeout = -1; @@ -699,7 +699,7 @@ _get_next_timeout(job_t *job) * 2. Exit timeout has expired (either print a message or kill job) */ static void -_do_poll_timeout(job_t *job) +_do_poll_timeout(srun_job_t *job) { time_t now = time(NULL); @@ -721,7 +721,7 @@ _do_poll_timeout(job_t *job) /* NOTE: One extra FD for incoming slurmctld messages */ static void -_msg_thr_poll(job_t *job) +_msg_thr_poll(srun_job_t *job) { struct pollfd *fds; int i; @@ -757,7 +757,7 @@ _msg_thr_poll(job_t *job) void * msg_thr(void *arg) { - job_t *job = (job_t *) arg; + srun_job_t *job = (srun_job_t *) arg; debug3("msg thread pid = %lu", (unsigned long) getpid()); @@ -769,7 +769,7 @@ msg_thr(void *arg) } int -msg_thr_create(job_t *job) +msg_thr_create(srun_job_t *job) { int i; pthread_attr_t attr; diff --git a/src/srun/msg.h b/src/srun/msg.h index fcff6d551c04adbc762a91026640069b9f37b315..17cfe40fb87bf90ab1083b91b82bec859b110bdc 100644 --- a/src/srun/msg.h +++ b/src/srun/msg.h @@ -25,13 +25,13 @@ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. \*****************************************************************************/ -#include "src/srun/job.h" +#include "src/srun/srun_job.h" #ifndef _HAVE_MSG_H #define _HAVE_MSG_H void *msg_thr(void *arg); -int msg_thr_create(job_t *job); +int msg_thr_create(srun_job_t *job); slurm_fd slurmctld_msg_init(void); typedef struct slurmctld_communication_addr { diff --git a/src/srun/net.c b/src/srun/net.c deleted file mode 100644 index f00c9eb5dc4abf9a0bdf7e9ce206d6e7a19d5ca6..0000000000000000000000000000000000000000 --- a/src/srun/net.c +++ /dev/null @@ -1,147 +0,0 @@ -/*****************************************************************************\ - * net.c - basic network communications for user application I/O - ***************************************************************************** - * Copyright (C) 2002 The Regents of the University of California. - * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). - * Written by Mark Grondona <grondona1@llnl.gov>, Kevin Tew <tew1@llnl.gov>, - * et. al. - * UCRL-CODE-2002-040. - * - * This file is part of SLURM, a resource management program. - * For details, see <http://www.llnl.gov/linux/slurm/>. - * - * SLURM is free software; you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) - * any later version. - * - * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along - * with SLURM; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -\*****************************************************************************/ - - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <sys/select.h> -#include <sys/time.h> -#include <fcntl.h> -#include <unistd.h> -#include <stdio.h> -#include <string.h> -#include <errno.h> - -#include "src/common/log.h" -#include "src/srun/net.h" - -#ifndef NET_DEFAULT_BACKLOG -# define NET_DEFAULT_BACKLOG 1024 -#endif - -static int _sock_bind_wild(int sockfd) -{ - socklen_t len; - struct sockaddr_in sin; - - memset(&sin, 0, sizeof(sin)); - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = htonl(INADDR_ANY); - sin.sin_port = htons(0); /* bind ephemeral port */ - - if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0) - return (-1); - len = sizeof(sin); - if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0) - return (-1); - return (sin.sin_port); -} - - - -int net_stream_listen(int *fd, int *port) -{ - int rc, val; - - if ((*fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) - return -1; - - val = 1; - rc = setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int)); - if (rc > 0) - goto cleanup; - - *port = _sock_bind_wild(*fd); - if (*port < 0) - goto cleanup; -#undef SOMAXCONN -#define SOMAXCONN 1024 - rc = listen(*fd, NET_DEFAULT_BACKLOG); - if (rc < 0) - goto cleanup; - - return 1; - - cleanup: - close(*fd); - return -1; -} - - -int accept_stream(int fd) -{ - int sd; - - while ((sd = accept(fd, NULL, NULL)) < 0) { - if (errno == EINTR) - continue; - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) - return -1; - if (errno == ECONNABORTED) - return -1; - error("Unable to accept new connection"); - } - - return sd; -} - - -int readn(int fd, void *buf, size_t nbytes) -{ - int n = 0; - char *pbuf = (char *)buf; - size_t nleft = nbytes; - - while (nleft > 0) { - n = read(fd, (void *)pbuf, nleft); - if (n > 0) { - pbuf+=n; - nleft-=n; - } else if (n == 0) /* EOF */ - break; - else if (errno == EINTR) - continue; - else { - debug("read error: %m"); - break; - } - } - return(n); -} - -int net_set_low_water(int sock, size_t size) -{ - if (setsockopt(sock, SOL_SOCKET, SO_RCVLOWAT, - (const void *) &size, sizeof(size)) < 0) { - error("Unable to set low water socket option: %m"); - return -1; - } - - return 0; -} diff --git a/src/srun/net.h b/src/srun/net.h deleted file mode 100644 index 9f023b3868de4b3170473f43e2b996cbcd914aca..0000000000000000000000000000000000000000 --- a/src/srun/net.h +++ /dev/null @@ -1,20 +0,0 @@ - -#ifndef _NET_H -#define _NET_H - -/* open a stream socket on an ephemereal port and put it into - * the listen state. fd and port are filled in with the new - * socket's file descriptor and port #. - */ -int net_stream_listen(int *fd, int *port); - -/* accept the incoming connection on the stream socket fd - */ -int net_accept_stream(int fd); - -/* set low water mark on socket - */ -int net_set_low_water(int sock, size_t size); - - -#endif /* !_NET_H */ diff --git a/src/srun/opt.c b/src/srun/opt.c index da1e3c507a997ef730b06267682996f3a1040f91..e7773b18307b67f7c5b4dacb46a17b1a5af7d7c4 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -64,6 +64,7 @@ #include "src/srun/opt.h" #include "src/srun/attach.h" +#include "src/common/mpi.h" /* generic OPT_ definitions -- mainly for use with env vars */ #define OPT_NONE 0x00 @@ -78,6 +79,7 @@ #define OPT_NODE_USE 0x09 #define OPT_NO_ROTATE 0x0a #define OPT_GEOMETRY 0x0b +#define OPT_MPI 0x0c /* generic getopt_long flags, integers and *not* valid characters */ #define LONG_OPT_HELP 0x100 @@ -466,7 +468,6 @@ static void _opt_default() opt.job_name = NULL; opt.jobid = NO_VAL; - opt.mpi_type = MPI_UNKNOWN; opt.dependency = NO_VAL; opt.account = NULL; @@ -583,6 +584,7 @@ env_vars_t env_vars[] = { {"SLURM_TIMELIMIT", OPT_INT, &opt.time_limit, NULL }, {"SLURM_WAIT", OPT_INT, &opt.max_wait, NULL }, {"SLURM_DISABLE_STATUS",OPT_INT, &opt.disable_status,NULL }, + {"SLURM_MPI_TYPE", OPT_MPI, NULL, NULL }, {NULL, 0, NULL, NULL} }; @@ -683,6 +685,14 @@ _process_env_var(env_vars_t *e, const char *val) } break; + case OPT_MPI: + if (srun_mpi_init((char *)val) == SLURM_ERROR) { + fatal("\"%s=%s\" -- invalid MPI type, " + "--mpi=list for acceptable types.", + e->var, val); + } + break; + default: /* do nothing */ break; @@ -1005,8 +1015,11 @@ static void _opt_args(int argc, char **argv) } break; case LONG_OPT_MPI: - if (strncasecmp(optarg, "lam", 3) == 0) - opt.mpi_type = MPI_LAM; + if (srun_mpi_init((char *)optarg) == SLURM_ERROR) { + fatal("\"--mpi=%s\" -- long invalid MPI type, " + "--mpi=list for acceptable types.", + optarg); + } break; case LONG_OPT_NOSHELL: opt.noshell = true; diff --git a/src/srun/opt.h b/src/srun/opt.h index 133d103f7c07abd8c6814ad645036a4c1f81b036..7fcfb87d87174e6a7430fcbee59c67ceeb3decde 100644 --- a/src/srun/opt.h +++ b/src/srun/opt.h @@ -37,10 +37,12 @@ #include "src/common/macros.h" /* true and false */ #include "src/srun/core-format.h" #include "src/common/env.h" +//#include "src/common/mpi.h" #define MAX_THREADS 64 #define MAX_USERNAME 9 + /* global variables relating to user options */ char **remote_argv; int remote_argc; @@ -58,11 +60,6 @@ enum modes { enum modes mode; -enum mpi_t { - MPI_UNKNOWN = 0, - MPI_LAM = 1 -}; - #define format_distribution_t(t) (t == SRUN_DIST_BLOCK) ? "block" : \ (t == SRUN_DIST_CYCLIC) ? "cyclic" : \ "unknown" @@ -76,6 +73,7 @@ enum io_t { #define format_io_t(t) (t == IO_ONE) ? "one" : (t == IO_ALL) ? \ "all" : "per task" +//typedef struct srun_job fname_job_t; typedef struct srun_options { @@ -101,7 +99,7 @@ typedef struct srun_options { distribution; /* --distribution=, -m dist */ char *job_name; /* --job-name=, -J name */ unsigned int jobid; /* --jobid=jobid */ - enum mpi_t mpi_type; /* --mpi=type */ + char *mpi_type; /* --mpi=type */ unsigned int dependency;/* --dependency, -P jobid */ char *account; /* --account, -U acct_name */ diff --git a/src/srun/reattach.c b/src/srun/reattach.c index 982fb2132ae74e1cb55f7c6cc2b0f506aad35e0e..d4add6acfda4638d418819b57027c3fddbdcdbe5 100644 --- a/src/srun/reattach.c +++ b/src/srun/reattach.c @@ -46,12 +46,12 @@ #include "src/common/slurm_protocol_api.h" #include "src/common/read_config.h" -#include "src/srun/job.h" +#include "src/srun/srun_job.h" #include "src/srun/launch.h" #include "src/srun/opt.h" #include "src/srun/io.h" #include "src/srun/msg.h" -#include "src/srun/signals.h" + /* number of active threads */ @@ -66,11 +66,11 @@ typedef struct thd { pthread_attr_t attr; /* thread attributes */ state_t state; /* thread state */ slurm_msg_t *msg; - job_t *job; + srun_job_t *job; } thd_t; -static inline bool _job_all_done(job_t *job); -static void _p_reattach(slurm_msg_t *req, job_t *job); +static inline bool _job_all_done(srun_job_t *job); +static void _p_reattach(slurm_msg_t *req, srun_job_t *job); static void *_p_reattach_task(void *args); typedef struct _srun_step { @@ -288,7 +288,7 @@ _get_attach_info(srun_step_t *s) } static int -_attach_to_job(job_t *job) +_attach_to_job(srun_job_t *job) { int i; reattach_tasks_request_msg_t *req; @@ -329,7 +329,7 @@ _attach_to_job(job_t *job) } static void -_p_reattach(slurm_msg_t *msg, job_t *job) +_p_reattach(slurm_msg_t *msg, srun_job_t *job) { int i; thd_t *thd = xmalloc(job->nhosts * sizeof(thd_t)); @@ -402,7 +402,7 @@ int reattach() { List steplist = _step_list_create(opt.attach); srun_step_t *s = NULL; - job_t *job = NULL; + srun_job_t *job = NULL; if ((steplist == NULL) || (list_count(steplist) == 0)) { info("No job/steps in attach"); @@ -488,7 +488,7 @@ int reattach() } -static bool _job_all_done(job_t *job) +static bool _job_all_done(srun_job_t *job) { return (job->state >= SRUN_JOB_TERMINATED); } diff --git a/src/srun/signals.c b/src/srun/signals.c index 39a2cb1b260c54086ea2974533dd3d78f26b7a67..dc092ac7a339cc8853a35d663e98a42c7f9a71d3 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -45,8 +45,7 @@ #include "src/common/xmalloc.h" #include "src/common/xsignal.h" -#include "src/srun/job.h" -#include "src/srun/io.h" +#include "src/srun/srun_job.h" /* * Static list of signals to block in srun: @@ -71,7 +70,7 @@ typedef struct thd { typedef struct task_info { slurm_msg_t *req_ptr; - job_t *job_ptr; + srun_job_t *job_ptr; int host_inx; } task_info_t; @@ -80,14 +79,11 @@ typedef struct task_info { * Static prototypes */ static void _sigterm_handler(int); -static void _handle_intr(job_t *, time_t *, time_t *); +static void _handle_intr(srun_job_t *, time_t *, time_t *); static void * _sig_thr(void *); -static void _p_fwd_signal(slurm_msg_t *, job_t *); -static void * _p_signal_task(void *); - static inline bool -_sig_thr_done(job_t *job) +_sig_thr_done(srun_job_t *job) { bool retval; slurm_mutex_lock(&job->state_mutex); @@ -96,7 +92,7 @@ _sig_thr_done(job_t *job) return retval; } -int +int sig_setup_sigmask(void) { if (xsignal_block(srun_sigarray) < 0) @@ -114,7 +110,7 @@ sig_unblock_signals(void) } int -sig_thr_create(job_t *job) +sig_thr_create(srun_job_t *job) { int e; pthread_attr_t attr; @@ -130,53 +126,6 @@ sig_thr_create(job_t *job) } -void -fwd_signal(job_t *job, int signo) -{ - int i; - slurm_msg_t *req; - kill_tasks_msg_t msg; - static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER; - - slurm_mutex_lock(&sig_mutex); - - if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) { - slurm_mutex_lock(&job->state_mutex); - job->signaled = true; - slurm_mutex_unlock(&job->state_mutex); - } - - debug2("forward signal %d to job", signo); - - /* common to all tasks */ - msg.job_id = job->jobid; - msg.job_step_id = job->stepid; - msg.signal = (uint32_t) signo; - - req = xmalloc(sizeof(slurm_msg_t) * job->nhosts); - - for (i = 0; i < job->nhosts; i++) { - if (job->host_state[i] != SRUN_HOST_REPLIED) { - debug2("%s has not yet replied\n", job->host[i]); - continue; - } - - if (job_active_tasks_on_host(job, i) == 0) - continue; - - req[i].msg_type = REQUEST_KILL_TASKS; - req[i].data = &msg; - memcpy( &req[i].address, - &job->slurmd_addr[i], sizeof(slurm_addr)); - } - - _p_fwd_signal(req, job); - - debug2("All tasks have been signalled"); - xfree(req); - slurm_mutex_unlock(&sig_mutex); -} - static void _sigterm_handler(int signum) @@ -184,7 +133,7 @@ _sigterm_handler(int signum) } static void -_handle_intr(job_t *job, time_t *last_intr, time_t *last_intr_sent) +_handle_intr(srun_job_t *job, time_t *last_intr, time_t *last_intr_sent) { if (opt.quit_on_intr) { job_force_termination(job); @@ -220,7 +169,7 @@ _handle_intr(job_t *job, time_t *last_intr, time_t *last_intr_sent) static void * _sig_thr(void *arg) { - job_t *job = (job_t *)arg; + srun_job_t *job = (srun_job_t *)arg; sigset_t set; time_t last_intr = 0; time_t last_intr_sent = 0; @@ -261,79 +210,5 @@ _sig_thr(void *arg) return NULL; } -/* _p_fwd_signal - parallel (multi-threaded) task signaller */ -static void _p_fwd_signal(slurm_msg_t *req, job_t *job) -{ - int i; - task_info_t *tinfo; - thd_t *thd; - - thd = xmalloc(job->nhosts * sizeof (thd_t)); - for (i = 0; i < job->nhosts; i++) { - if (req[i].msg_type == 0) - continue; /* inactive task */ - - slurm_mutex_lock(&active_mutex); - while (active >= opt.max_threads) { - pthread_cond_wait(&active_cond, &active_mutex); - } - active++; - slurm_mutex_unlock(&active_mutex); - - tinfo = (task_info_t *)xmalloc(sizeof(task_info_t)); - tinfo->req_ptr = &req[i]; - tinfo->job_ptr = job; - tinfo->host_inx = i; - - slurm_attr_init(&thd[i].attr); - if (pthread_attr_setdetachstate(&thd[i].attr, - PTHREAD_CREATE_DETACHED)) - error ("pthread_attr_setdetachstate failed"); - if (pthread_create( &thd[i].thread, &thd[i].attr, - _p_signal_task, (void *) tinfo )) { - error ("pthread_create failed"); - _p_signal_task((void *) tinfo); - } - } - - - slurm_mutex_lock(&active_mutex); - while (active > 0) { - pthread_cond_wait(&active_cond, &active_mutex); - } - slurm_mutex_unlock(&active_mutex); - xfree(thd); -} - -/* _p_signal_task - parallelized signal of a specific task */ -static void * _p_signal_task(void *args) -{ - int rc = SLURM_SUCCESS; - task_info_t *info = (task_info_t *)args; - slurm_msg_t *req = info->req_ptr; - job_t *job = info->job_ptr; - char *host = job->host[info->host_inx]; - - debug3("sending signal to host %s", host); - if (slurm_send_recv_rc_msg(req, &rc, 0) < 0) { - error("%s: signal: %m", host); - goto done; - } - - /* - * Report error unless it is "Invalid job id" which - * probably just means the tasks exited in the meanwhile. - */ - if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) && (rc != ESRCH)) - error("%s: signal: %s", host, slurm_strerror(rc)); - - done: - slurm_mutex_lock(&active_mutex); - active--; - pthread_cond_signal(&active_cond); - slurm_mutex_unlock(&active_mutex); - xfree(args); - return NULL; -} diff --git a/src/srun/signals.h b/src/srun/signals.h index ee30dcfa49e6fb064ad49a74771d140974315da3..db6a79e528bbd90a8af3edca3708935f1273b193 100644 --- a/src/srun/signals.h +++ b/src/srun/signals.h @@ -27,11 +27,10 @@ #ifndef _SIGNALS_H #define _SIGNALS_H -#include "src/srun/job.h" +typedef struct srun_job signal_job_t; -void sig_setup_sigmask(void); +int sig_setup_sigmask(void); int sig_unblock_signals(void); -int sig_thr_create(job_t *job); -void fwd_signal(job_t *job, int signal); +int sig_thr_create(signal_job_t *job); #endif /* !_SIGNALS_H */ diff --git a/src/srun/srun.c b/src/srun/srun.c index a6eb97ad098bf3fe9e9e338f68ded241e9baf1f7..66239d3d4a69788bc8ac5880443fd586d2b8b08e 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -66,16 +66,15 @@ #include "src/common/xmalloc.h" #include "src/common/xsignal.h" #include "src/common/xstring.h" +#include "src/common/net.h" +#include "src/common/mpi.h" #include "src/srun/allocate.h" #include "src/srun/io.h" -#include "src/srun/job.h" -#include "src/srun/gmpi.h" +#include "src/srun/srun_job.h" #include "src/srun/launch.h" #include "src/srun/msg.h" -#include "src/srun/net.h" #include "src/srun/opt.h" -#include "src/srun/signals.h" #include "src/srun/sigstr.h" #include "src/srun/reattach.h" #include "src/srun/attach.h" @@ -98,17 +97,17 @@ static char *_build_script (char *pathname, int file_type); static char *_get_shell (void); static int _is_file_text (char *, char**); static int _run_batch_job (void); -static int _run_job_script(job_t *job, env_t *env); +static int _run_job_script(srun_job_t *job, env_t *env); static int _set_rlimit_env(void); -static char *_task_count_string(job_t *job); -static void _switch_standalone(job_t *job); +static char *_task_count_string(srun_job_t *job); +static void _switch_standalone(srun_job_t *job); static int _become_user (void); static int _print_script_exit_status(const char *argv0, int status); int srun(int ac, char **av) { allocation_resp *resp; - job_t *job; + srun_job_t *job; char *task_cnt, *bgl_part_id = NULL; int exitcode = 0; env_t *env = xmalloc(sizeof(env_t)); @@ -183,7 +182,7 @@ int srun(int ac, char **av) if (msg_thr_create(job) < 0) job_fatal(job, "Unable to create msg thread"); exitcode = _run_job_script(job, env); - job_destroy(job,exitcode); + srun_job_destroy(job,exitcode); debug ("Spawned srun shell terminated"); xfree(env->task_count); @@ -218,7 +217,7 @@ int srun(int ac, char **av) job = job_create_allocation(resp); if (create_job_step(job) < 0) { - job_destroy(job, 0); + srun_job_destroy(job, 0); exit(1); } slurm_free_resource_allocation_response_msg(resp); @@ -252,26 +251,8 @@ int srun(int ac, char **av) xfree(env->task_count); xfree(env); - if (slurm_get_mpich_gm_dir() && getenv("GMPI_PORT") == NULL) { - /* - * It is possible for one to modify the mpirun command in - * MPICH-GM distribution so that it calls srun, instead of - * rsh, for remote process invocations. In that case, we - * should not override envs nor open the master port. - */ - char *port = NULL; - if (gmpi_thr_create(job, &port)) - job_fatal(job, "Unable to create GMPI thread"); - setenvf(NULL, "GMPI_PORT", "%s", port); - xfree(port); - setenvf(NULL, "GMPI_SHMEM", "1"); - setenvf(NULL, "GMPI_MAGIC", "%u", job->jobid); - setenvf(NULL, "GMPI_NP", "%d", opt.nprocs); - setenvf(NULL, - "GMPI_BOARD", - "-1"); /* FIXME for multi-board config.*/ - setenvf(NULL, "SLURM_GMPI", "1"); /* mark for slurmd */ - } + if (slurm_mpi_thr_create(job) < 0) + job_fatal (job, "Failed to initialize MPI"); if (msg_thr_create(job) < 0) job_fatal(job, "Unable to create msg thread"); @@ -299,9 +280,9 @@ int srun(int ac, char **av) */ if (job->state == SRUN_JOB_FAILED) { info("Terminating job"); - job_destroy(job, 0); + srun_job_destroy(job, 0); } else if (job->state == SRUN_JOB_FORCETERM) { - job_destroy(job, 0); + srun_job_destroy(job, 0); exit(1); } @@ -318,8 +299,11 @@ int srun(int ac, char **av) if (pthread_join(job->ioid, NULL) < 0) error ("Waiting on IO: %m"); + if (slurm_mpi_exit () < 0) + ; /* eh, ignore errors here */ + /* Tell slurmctld that job is done */ - job_destroy(job, 0); + srun_job_destroy(job, 0); log_fini(); @@ -331,7 +315,7 @@ int srun(int ac, char **av) } static char * -_task_count_string (job_t *job) +_task_count_string (srun_job_t *job) { int i, last_val, last_cnt; char tmp[16]; @@ -361,7 +345,7 @@ _task_count_string (job_t *job) } static void -_switch_standalone(job_t *job) +_switch_standalone(srun_job_t *job) { int cyclic = (opt.distribution == SRUN_DIST_CYCLIC); @@ -730,7 +714,7 @@ _print_script_exit_status(const char *argv0, int status) } /* allocation option specified, spawn a script and wait for it to exit */ -static int _run_job_script (job_t *job, env_t *env) +static int _run_job_script (srun_job_t *job, env_t *env) { int status, exitcode; pid_t cpid; diff --git a/src/srun/job.c b/src/srun/srun_job.c similarity index 93% rename from src/srun/job.c rename to src/srun/srun_job.c index 3b4f5ad0b41ee7563eba1c650d9856ca04d6434c..11ab98b7f3a500d915638dffccae72d097b6df5a 100644 --- a/src/srun/job.c +++ b/src/srun/srun_job.c @@ -1,6 +1,6 @@ /****************************************************************************\ * job.c - job data structure creation functions - * $Id$ + * $Id: job.c,v 1.77 2005/06/15 16:39:19 da Exp $ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -45,10 +45,9 @@ #include "src/common/xmalloc.h" #include "src/common/xstring.h" -#include "src/srun/job.h" +#include "src/srun/srun_job.h" #include "src/srun/opt.h" #include "src/srun/fname.h" -#include "src/srun/signals.h" #include "src/srun/attach.h" #include "src/srun/io.h" @@ -61,11 +60,11 @@ typedef struct allocation_info { uint32_t jobid; uint32_t stepid; char *nodelist; - uint32_t nnodes; + int nnodes; slurm_addr *addrs; - uint16_t num_cpu_groups; - uint32_t *cpus_per_node; - uint32_t *cpu_count_reps; + int num_cpu_groups; + int *cpus_per_node; + int *cpu_count_reps; select_jobinfo_t select_jobinfo; } allocation_info_t; @@ -74,13 +73,13 @@ typedef struct allocation_info { /* * Prototypes: */ -static void _dist_block(job_t *job); -static void _dist_cyclic(job_t *job); +static void _dist_block(srun_job_t *job); +static void _dist_cyclic(srun_job_t *job); static inline int _estimate_nports(int nclients, int cli_per_port); static int _compute_task_count(allocation_info_t *info); static void _set_nprocs(allocation_info_t *info); -static job_t * _job_create_internal(allocation_info_t *info); -static void _job_fake_cred(job_t *job); +static srun_job_t * _job_create_internal(allocation_info_t *info); +static void _job_fake_cred(srun_job_t *job); static int _job_resp_add_nodes(bitstr_t *req_bitmap, bitstr_t *exc_bitmap, int node_cnt); static int _job_resp_bitmap(hostlist_t resp_node_hl, char *nodelist, @@ -91,8 +90,8 @@ static int _job_resp_cpus(uint32_t *cpus_per_node, uint32_t *cpu_count_reps, int node); static void _job_resp_hack(resource_allocation_response_msg_t *resp, bitstr_t *req_bitmap); -static char * _task_state_name(task_state_t state_inx); -static char * _host_state_name(host_state_t state_inx); +static char * _task_state_name(srun_task_state_t state_inx); +static char * _host_state_name(srun_host_state_t state_inx); static char * _normalize_hostlist(const char *hostlist); @@ -100,7 +99,7 @@ static char * _normalize_hostlist(const char *hostlist); * distribution to figure out how many tasks go on each node and * then make those assignments in a block fashion */ static void -_dist_block(job_t *job) +_dist_block(srun_job_t *job) { int i, j, taskid = 0; bool over_subscribe = false; @@ -143,7 +142,7 @@ _dist_block(job_t *job) * 12 13 14 15 etc. */ static void -_dist_cyclic(job_t *job) +_dist_cyclic(srun_job_t *job) { int i, j, taskid = 0; bool over_subscribe = false; @@ -167,10 +166,10 @@ _dist_cyclic(job_t *job) /* * Create an srun job structure from a resource allocation response msg */ -job_t * +srun_job_t * job_create_allocation(resource_allocation_response_msg_t *resp) { - job_t *job; + srun_job_t *job; allocation_info_t *i = xmalloc(sizeof(*i)); i->nodelist = _normalize_hostlist(resp->node_list); @@ -196,12 +195,12 @@ job_create_allocation(resource_allocation_response_msg_t *resp) * Create an srun job structure w/out an allocation response msg. * (i.e. use the command line options) */ -job_t * +srun_job_t * job_create_noalloc(void) { - job_t *job = NULL; + srun_job_t *job = NULL; allocation_info_t *ai = xmalloc(sizeof(*ai)); - uint32_t cpn = 1; + int cpn = 1; int i = 0; hostlist_t hl = hostlist_create(opt.nodelist); @@ -251,7 +250,7 @@ job_create_noalloc(void) void -update_job_state(job_t *job, job_state_t state) +update_job_state(srun_job_t *job, srun_job_state_t state) { pthread_mutex_lock(&job->state_mutex); if (job->state < state) { @@ -261,10 +260,10 @@ update_job_state(job_t *job, job_state_t state) pthread_mutex_unlock(&job->state_mutex); } -job_state_t -job_state(job_t *job) +srun_job_state_t +job_state(srun_job_t *job) { - job_state_t state; + srun_job_state_t state; slurm_mutex_lock(&job->state_mutex); state = job->state; slurm_mutex_unlock(&job->state_mutex); @@ -273,7 +272,7 @@ job_state(job_t *job) void -job_force_termination(job_t *job) +job_force_termination(srun_job_t *job) { if (mode == MODE_ATTACH) { info ("forcing detach"); @@ -288,7 +287,7 @@ job_force_termination(job_t *job) int -job_rc(job_t *job) +job_rc(srun_job_t *job) { int i; int rc = 0; @@ -317,18 +316,18 @@ job_rc(job_t *job) } -void job_fatal(job_t *job, const char *msg) +void job_fatal(srun_job_t *job, const char *msg) { if (msg) error(msg); - job_destroy(job, errno); + srun_job_destroy(job, errno); exit(1); } void -job_destroy(job_t *job, int error) +srun_job_destroy(srun_job_t *job, int error) { if (job->removed) return; @@ -353,7 +352,7 @@ job_destroy(job_t *job, int error) void -job_kill(job_t *job) +srun_job_kill(srun_job_t *job) { if (!opt.no_alloc) { if (slurm_kill_job_step(job->jobid, job->stepid, SIGKILL) < 0) @@ -361,26 +360,9 @@ job_kill(job_t *job) } update_job_state(job, SRUN_JOB_FAILED); } - - -int -job_active_tasks_on_host(job_t *job, int hostid) -{ - int i; - int retval = 0; - - slurm_mutex_lock(&job->task_mutex); - for (i = 0; i < job->ntask[hostid]; i++) { - uint32_t tid = job->tids[hostid][i]; - if (job->task_state[tid] == SRUN_TASK_RUNNING) - retval++; - } - slurm_mutex_unlock(&job->task_mutex); - return retval; -} void -report_job_status(job_t *job) +report_job_status(srun_job_t *job) { int i; @@ -393,7 +375,7 @@ report_job_status(job_t *job) #define NTASK_STATES 6 void -report_task_status(job_t *job) +report_task_status(srun_job_t *job) { int i; char buf[1024]; @@ -455,14 +437,14 @@ _set_nprocs(allocation_info_t *info) } -static job_t * +static srun_job_t * _job_create_internal(allocation_info_t *info) { int i; int cpu_cnt = 0; int cpu_inx = 0; hostlist_t hl; - job_t *job; + srun_job_t *job; /* Reset nprocs if necessary */ @@ -541,10 +523,10 @@ _job_create_internal(allocation_info_t *info) /* nhost host states */ - job->host_state = xmalloc(job->nhosts * sizeof(host_state_t)); + job->host_state = xmalloc(job->nhosts * sizeof(srun_host_state_t)); /* ntask task states and statii*/ - job->task_state = xmalloc(opt.nprocs * sizeof(task_state_t)); + job->task_state = xmalloc(opt.nprocs * sizeof(srun_task_state_t)); job->tstatus = xmalloc(opt.nprocs * sizeof(int)); for (i = 0; i < opt.nprocs; i++) { @@ -604,7 +586,7 @@ _job_create_internal(allocation_info_t *info) } void -job_update_io_fnames(job_t *job) +job_update_io_fnames(srun_job_t *job) { job->ifname = fname_create(job, opt.ifname); job->ofname = fname_create(job, opt.ofname); @@ -612,7 +594,7 @@ job_update_io_fnames(job_t *job) } static void -_job_fake_cred(job_t *job) +_job_fake_cred(srun_job_t *job) { slurm_cred_arg_t arg; arg.jobid = job->jobid; @@ -627,7 +609,7 @@ _job_fake_cred(job_t *job) static char * -_task_state_name(task_state_t state_inx) +_task_state_name(srun_task_state_t state_inx) { switch (state_inx) { case SRUN_TASK_INIT: @@ -648,7 +630,7 @@ _task_state_name(task_state_t state_inx) } static char * -_host_state_name(host_state_t state_inx) +_host_state_name(srun_host_state_t state_inx) { switch (state_inx) { case SRUN_HOST_INIT: diff --git a/src/srun/job.h b/src/srun/srun_job.h similarity index 85% rename from src/srun/job.h rename to src/srun/srun_job.h index 793d2191ae9b312f45025c63825ab69a31b157ac..e00510f924e8be074b93cf5b482a14d3cf62bf70 100644 --- a/src/srun/job.h +++ b/src/srun/srun_job.h @@ -1,6 +1,6 @@ /*****************************************************************************\ * src/srun/job.h - specification of an srun "job" - * $Id$ + * $Id: job.h,v 1.42 2005/06/15 16:39:19 da Exp $ ***************************************************************************** * Copyright (C) 2002 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). @@ -39,6 +39,9 @@ #include "src/common/macros.h" #include "src/common/node_select.h" #include "src/common/slurm_protocol_defs.h" +//#include "src/common/global_srun.h" + +#include "src/srun/signals.h" #include "src/srun/fname.h" typedef enum { @@ -53,14 +56,14 @@ typedef enum { SRUN_JOB_DETACHED, /* Detached IO from job (Not used now) */ SRUN_JOB_FAILED, /* Job failed for some reason */ SRUN_JOB_FORCETERM, /* Forced termination of IO thread */ -} job_state_t; +} srun_job_state_t; typedef enum { SRUN_HOST_INIT = 0, SRUN_HOST_CONTACTED, SRUN_HOST_UNREACHABLE, SRUN_HOST_REPLIED -} host_state_t; +} srun_host_state_t; typedef enum { SRUN_TASK_INIT = 0, @@ -69,7 +72,7 @@ typedef enum { SRUN_TASK_IO_WAIT, SRUN_TASK_EXITED, SRUN_TASK_ABNORMAL_EXIT -} task_state_t; +} srun_task_state_t; typedef struct srun_job { @@ -78,7 +81,7 @@ typedef struct srun_job { bool old_job; /* run job step under previous allocation */ bool removed; /* job has been removed from SLURM */ - job_state_t state; /* job state */ + srun_job_state_t state; /* job state */ pthread_mutex_t state_mutex; pthread_cond_t state_cond; @@ -96,9 +99,6 @@ typedef struct srun_job { slurm_addr *slurmd_addr;/* slurm_addr vector to slurmd's */ - pthread_t gtid; /* GMPI master thread */ - int gmpi_fd; /* fd for accept(2) */ - pthread_t sigid; /* signals thread tid */ pthread_t jtid; /* job control thread id */ @@ -127,10 +127,10 @@ typedef struct srun_job { time_t ltimeout; /* Time by which all tasks must be running */ time_t etimeout; /* exit timeout (see opt.max_wait */ - host_state_t *host_state; /* nhost host states */ + srun_host_state_t *host_state; /* nhost host states */ int *tstatus; /* ntask exit statii */ - task_state_t *task_state; /* ntask task states */ + srun_task_state_t *task_state; /* ntask task states */ pthread_mutex_t task_mutex; switch_jobinfo_t switch_job; @@ -145,55 +145,50 @@ typedef struct srun_job { bool *stdin_eof; /* true if task i processed stdin eof */ select_jobinfo_t select_jobinfo; -} job_t; +} srun_job_t; -void update_job_state(job_t *job, job_state_t newstate); -void job_force_termination(job_t *job); +void update_job_state(srun_job_t *job, srun_job_state_t newstate); +void job_force_termination(srun_job_t *job); -job_state_t job_state(job_t *job); +srun_job_state_t job_state(srun_job_t *job); -job_t * job_create_noalloc(void); -job_t * job_create_allocation(resource_allocation_response_msg_t *resp); +srun_job_t * job_create_noalloc(void); +srun_job_t * job_create_allocation(resource_allocation_response_msg_t *resp); /* * Update job filenames and modes for stderr, stdout, and stdin. */ -void job_update_io_fnames(job_t *j); +void job_update_io_fnames(srun_job_t *j); /* * Issue a fatal error message and terminate running job */ -void job_fatal(job_t *job, const char *msg); +void job_fatal(srun_job_t *job, const char *msg); /* * Deallocates job and or job step via slurm API */ -void job_destroy(job_t *job, int error); +void srun_job_destroy(srun_job_t *job, int error); /* * Send SIGKILL to running job via slurm controller */ -void job_kill(job_t *job); - -/* - * returns number of active tasks on host with id = hostid. - */ -int job_active_tasks_on_host(job_t *job, int hostid); +void srun_job_kill(srun_job_t *job); /* * report current task status */ -void report_task_status(job_t *job); +void report_task_status(srun_job_t *job); /* * report current node status */ -void report_job_status(job_t *job); +void report_job_status(srun_job_t *job); /* * Returns job return code (for srun exit status) */ -int job_rc(job_t *job); +int job_rc(srun_job_t *job); /* * To run a job step on existing allocation, modify the