diff --git a/NEWS b/NEWS
index ab2bc392e943c2c85484b338d1f52ab4e075dce0..cd5b005dfd91683bbba1b1a682bd947347dbb62e 100644
--- a/NEWS
+++ b/NEWS
@@ -18,6 +18,9 @@ documents those changes that are of interest to users and admins.
  -- Fix bug in allocating GRES that are associated with specific CPUs. In some
     cases the code allocated first available GRES to job instead of allocating
     GRES accessible to the specific CPUs allocated to the job.
+ -- spank: Add callbacks in slurmd: slurm_spank_slurmd_{init,exit}
+     and job epilog/prolog: slurm_spank_job_{prolog,epilog}
+ -- spank: Add spank_option_getopt() function to api
 
 * Changes in SLURM 2.4.0.pre4
 =============================
diff --git a/doc/man/man8/spank.8 b/doc/man/man8/spank.8
index 233edffc78c1ee3c6f906a5b993a92e33f856d6e..fc39301fb43453985be86cc478fd5746e3c3c778 100644
--- a/doc/man/man8/spank.8
+++ b/doc/man/man8/spank.8
@@ -20,7 +20,7 @@ behavior of SLURM job launch.
 .LP
 
 .SH "SPANK PLUGINS"
-\fBSPANK\fR plugins are loaded in up to three separate contexts during a
+\fBSPANK\fR plugins are loaded in up to five separate contexts during a
 \fBSLURM\fR job. Briefly, the three contexts are:
 .TP 8
 \fBlocal\fB
@@ -28,16 +28,36 @@ In \fBlocal\fR context, the plugin is loaded by \fBsrun\fR. (i.e. the "local"
 part of a parallel job).
 .TP
 \fBremote\fR
-In \fBremote\fR context, the plugin is loaded by \fBslurmd\fR. (i.e. the "remote"
+In \fBremote\fR context, the plugin is loaded by \fBslurmstepd\fR. (i.e. the "remote"
 part of a parallel job).
 .TP
 \fBallocator\fR
 In \fBallocator\fR context, the plugin is loaded in one of the job allocation
 utilities \fBsbatch\fR or \fBsalloc\fR.
 .LP
+.TP
+\fBslurmd\fR In \fBslurmd\fR context, the plugin is loaded in the
+\fBslurmd\fR daemon itself. \fBNote\fR: Plugins loaded in slurmd context
+persist for the entire time slurmd is running, so if configuration is
+changed or plugins are updated, slurmd must be restarted for the changes
+to take effect.
+.LP
+.TP
+\fBjob_script\fR
+In the \fBjob_script\fR context, plugins are loaded in the context of the
+job prolog or epilog. \fBNote\fR: Plugins are loaded in \fBjob_script\fR
+context on each run on the job prolog or epilog, in a separate address
+space from plugins in \fBslurmd\fR context. This means there is no
+state shared between this context and other contexts, or even between
+one call to \fBslurm_spank_job_prolog\fR or \fBslurm_spank_job_epilog\fR
+and subsequent calls.
+.LP
 In local context, only the \fBinit\fR, \fBexit\fR, \fBinit_post_opt\fR, and
 \fBuser_local_init\fR functions are called. In allocator context, only the
 \fBinit\fR, \fBexit\fR, and \fBinit_post_opt\fR functions are called.
+Similarly, in slurmd context, only the \fBslurmd_init\fR and \fBslurmd_exit\fR
+callbacks are active, and in the job_script context, only the \fBjob_prolog\fR
+and \fRjob_epilog\fR callbacks are used.
 Plugins may query the context in which they are running with the
 \fBspank_context\fR and \fBspank_remote\fR functions defined in
 \fB<slurm/spank.h>\fR.
@@ -48,7 +68,13 @@ launch. A plugin may define the following functions:
 \fBslurm_spank_init\fR
 Called just after plugins are loaded. In remote context, this is just
 after job step is initialized. This function is called before any plugin
-option processing.
+option processing. This function is not called in slurmd context.
+.TP
+\fBslurm_spank_slurmd_init\fR
+Called in slurmd just after the daemon is started.
+.TP
+\fBslurm_spank_job_prolog\fR
+Called at the same time as the job prolog.
 .TP
 \fBslurm_spank_init_post_opt\fR
 Called at the same point as \fBslurm_spank_init\fR, but after all
@@ -88,6 +114,12 @@ Called for each task as its exit status is collected by SLURM.
 \fBslurm_spank_exit\fR
 Called once just before \fBslurmstepd\fR exits in remote context.
 In local context, called before \fBsrun\fR exits.
+.TP
+\fBslurm_spank_job_epilog\fR
+Called at the same time as the job epilog.
+.TP
+\fBslurm_spank_slurmd_exit\fR
+Called in slurmd when the daemon is shut down.
 .LP
 All of these functions have the same prototype, for example:
 .nf
@@ -311,6 +343,22 @@ after \fBSPANK\fR plugins are loaded and before the \fBspank_init\fR
 handler is called. This allows plugins to modify behavior of all plugin
 functionality based on the value of user\-provided options.
 (See EXAMPLES below for a plugin that registers an option with \fBSLURM\fR).
+.LP
+As an alternative to use of an option callback and global variable,
+plugins can use the \fBspank_option_getopt\fR option to check for
+supplied options after option processing. This function has the prototype:
+.nf
+
+   spank_err_t spank_option_getopt(spank_t sp,
+       struct spank_option *opt, char **optargp);
+
+.nf
+This function returns \fBESPANK_SUCCESS\fR if the option defined in the
+struct spank_option \fIopt\fR has been used by the user. If \fIoptargp\fR
+is non-NULL then it is set to any option argument passed (if the option
+takes an argument). The use of this method is \fIrequired\fR to process
+options in \fBjob_script\fR context (\fBslurm_spank_job_prolog\R and
+\fBslurm_spank_job_epilog\fR).
 
 .SH "CONFIGURATION"
 .LP
diff --git a/slurm/spank.h b/slurm/spank.h
index 8881ffc0dda12fa006627dbeae3ecdc788484e45..5ddf73cc26af7ab53faedf624c9a34e858a2ecd7 100644
--- a/slurm/spank.h
+++ b/slurm/spank.h
@@ -56,36 +56,45 @@ typedef struct spank_handle * spank_t;
  */
 typedef int (spank_f) (spank_t spank, int ac, char *argv[]);
 
-/*  SPANK plugin operations. SPANK plugin should have at least one of 
+/*  SPANK plugin operations. SPANK plugin should have at least one of
  *   these functions defined non-NULL.
  *
  *  Plug-in callbacks are completed at the following points in slurmd:
  *
- *   slurmd -> slurmstepd
- *               `-> init ()
- *                -> process spank options
- *                -> init_post_opt ()
- *               + drop privileges (initgroups(), seteuid(), chdir()) 
- *               `-> user_init ()  
- *               + for each task
- *               |       + fork ()
- *               |       |
- *               |       + reclaim privileges
- *               |       `-> task_init_privileged ()
- *               |       |
- *               |       + become_user ()
- *               |       `-> task_init ()
- *               |       |
- *               |       + execve ()
- *               |
- *               + reclaim privileges
- *               + for each task 
- *               |     `-> task_post_fork ()
- *               |
- *               + for each task
- *               |       + wait ()
- *               |          `-> task_exit ()
- *               `-> exit ()
+ *   slurmd
+ *        `-> slurmd_init()
+ *        |
+ *        `-> job_prolog()
+ *        |
+ *        | `-> slurmstepd
+ *        |      `-> init ()
+ *        |       -> process spank options
+ *        |       -> init_post_opt ()
+ *        |      + drop privileges (initgroups(), seteuid(), chdir())
+ *        |      `-> user_init ()
+ *        |      + for each task
+ *        |      |       + fork ()
+ *        |      |       |
+ *        |      |       + reclaim privileges
+ *        |      |       `-> task_init_privileged ()
+ *        |      |       |
+ *        |      |       + become_user ()
+ *        |      |       `-> task_init ()
+ *        |      |       |
+ *        |      |       + execve ()
+ *        |      |
+ *        |      + reclaim privileges
+ *        |      + for each task
+ *        |      |     `-> task_post_fork ()
+ *        |      |
+ *        |      + for each task
+ *        |      |       + wait ()
+ *        |      |          `-> task_exit ()
+ *        |      `-> exit ()
+ *        |
+ *        `---> job_epilog()
+ *        |
+ *        `-> slurmd_exit()
  *
  *   In srun only the init(), init_post_opt() and local_user_init(), and exit()
  *    callbacks are used.
@@ -93,9 +102,14 @@ typedef int (spank_f) (spank_t spank, int ac, char *argv[]);
  *   In sbatch/salloc only the init(), init_post_opt(), and exit() callbacks
  *    are used.
  *
+ *   In slurmd proper, only the slurmd_init(), slurmd_exit(), and
+ *    job_prolog/epilog callbacks are used.
+ *
  */
 
 extern spank_f slurm_spank_init;
+extern spank_f slurm_spank_slurmd_init;
+extern spank_f slurm_spank_job_prolog;
 extern spank_f slurm_spank_init_post_opt;
 extern spank_f slurm_spank_local_user_init;
 extern spank_f slurm_spank_user_init;
@@ -103,6 +117,8 @@ extern spank_f slurm_spank_task_init_privileged;
 extern spank_f slurm_spank_task_init;
 extern spank_f slurm_spank_task_post_fork;
 extern spank_f slurm_spank_task_exit;
+extern spank_f slurm_spank_job_epilog;
+extern spank_f slurm_spank_slurmd_exit;
 extern spank_f slurm_spank_exit;
 
 
@@ -183,10 +199,15 @@ typedef enum spank_err spank_err_t;
 enum spank_context {
     S_CTX_ERROR,             /* Error obtaining current context              */
     S_CTX_LOCAL,             /* Local context (srun)                         */
-    S_CTX_REMOTE,            /* Remote context (slurmd)                      */
-    S_CTX_ALLOCATOR          /* Allocator context (sbatch/salloc)            */
+    S_CTX_REMOTE,            /* Remote context (slurmstepd)                  */
+    S_CTX_ALLOCATOR,         /* Allocator context (sbatch/salloc)            */
+    S_CTX_SLURMD,            /* slurmd context                               */
+    S_CTX_JOB_SCRIPT         /* prolog/epilog context                        */
 };
 
+#define HAVE_S_CTX_SLURMD 1     /* slurmd context supported                  */
+#define HAVE_S_CTX_JOB_SCRIPT 1 /* job script (prolog/epilog) supported      */
+
 typedef enum spank_context spank_context_t;
 
 /*
@@ -198,7 +219,7 @@ typedef enum spank_context spank_context_t;
  *   the plugin to distinguish between plugin-local options, `optarg'
  *   is an argument passed by the user (if applicable), and `remote'
  *   specifies whether this call is being made locally (e.g. in srun)
- *   or remotely (e.g. in slurmd).
+ *   or remotely (e.g. in slurmstepd/slurmd).
  */
 typedef int (*spank_opt_cb_f) (int val, const char *optarg, int remote);
 
@@ -254,7 +275,7 @@ int spank_symbol_supported (const char *symbol);
  *  Determine whether plugin is loaded in "remote" context
  * 
  *  Returns:
- *  = 1   remote context, i.e. plugin is loaded in slurmd.
+ *  = 1   remote context, i.e. plugin is loaded in /slurmstepd.
  *  = 0   not remote context
  *  < 0   spank handle was not valid.
  */
@@ -284,6 +305,22 @@ spank_context_t spank_context (void);
  */
 spank_err_t spank_option_register (spank_t spank, struct spank_option *opt);
 
+/*
+ *  Check whether spank plugin option [opt] has been activated.
+ *   If the option takes an argument, then the option argument
+ *   (if found) will be returned in *optarg.
+ *
+ *  Returns
+ *   ESPANK_SUCCESS if the option was used by user. In this case
+ *    *optarg will contain the option argument if opt->has_arg != 0.
+ *   ESPANK_ERROR if the option wasn't used.
+ *   ESPANK_BAD_ARG if an invalid argument was passed to the function,
+ *    such as NULL opt, NULL opt->name, or NULL optarg when opt->has_arg != 0.
+ *   ESPANK_NOT_AVAIL if called from improper context.
+ */
+spank_err_t spank_option_getopt (spank_t spank, struct spank_option *opt,
+	char **optarg);
+
 
 /*  Get the value for the current job or task item specified, 
  *   storing the result in the subsequent pointer argument(s).
@@ -296,7 +333,7 @@ spank_err_t spank_option_register (spank_t spank, struct spank_option *opt);
  *   item is requested from outside a task context, ESPANK_BAD_ARG
  *   if invalid args are passed to spank_get_item or spank_get_item
  *   is called from an invalid context, and ESPANK_NOT_REMOTE 
- *   if not called from slurmd context or spank_user_local_init.
+ *   if not called from slurmstepd context or spank_user_local_init.
  */
 spank_err_t spank_get_item (spank_t spank, spank_item_t item, ...);
 
@@ -319,7 +356,7 @@ spank_err_t spank_getenv (spank_t spank, const char *var, char *buf, int len);
  *  Returns ESPANK_SUCCESS on success, o/w spank_err_t on failure:
  *     ESPANK_ENV_EXISTS  = var exists in job env and overwrite == 0.
  *     ESPANK_BAD_ARG     = spank handle invalid or var/val are NULL.
- *     ESPANK_NOT_REMOTE  = not called from slurmd.
+ *     ESPANK_NOT_REMOTE  = not called from slurmstepd.
  */
 spank_err_t spank_setenv (spank_t spank, const char *var, const char *val, 
         int overwrite);
@@ -330,7 +367,7 @@ spank_err_t spank_setenv (spank_t spank, const char *var, const char *val,
  *
  *  Returns ESPANK_SUCCESS on success, o/w spank_err_t on failure:
  *    ESPANK_BAD_ARG   = spank handle invalid or var is NULL.
- *    ESPANK_NOT_REMOTE = not called from slurmd.
+ *    ESPANK_NOT_REMOTE = not called from slurmstepd.
  */
 spank_err_t spank_unsetenv (spank_t spank, const char *var);
 
diff --git a/src/common/plugstack.c b/src/common/plugstack.c
index e9589897996292a403fb774239bd020ac237baba..cf9a57166db1e2c6b41d8193e76670c5082593fc 100644
--- a/src/common/plugstack.c
+++ b/src/common/plugstack.c
@@ -70,6 +70,8 @@
 
 struct spank_plugin_operations {
 	spank_f *init;
+	spank_f *slurmd_init;
+	spank_f *job_prolog;
 	spank_f *init_post_opt;
 	spank_f *local_user_init;
 	spank_f *user_init;
@@ -77,12 +79,16 @@ struct spank_plugin_operations {
 	spank_f *user_task_init;
 	spank_f *task_post_fork;
 	spank_f *task_exit;
+	spank_f *job_epilog;
+	spank_f *slurmd_exit;
 	spank_f *exit;
 };
 
-const int n_spank_syms = 9;
+const int n_spank_syms = 13;
 const char *spank_syms[] = {
 	"slurm_spank_init",
+	"slurm_spank_slurmd_init",
+	"slurm_spank_job_prolog",
 	"slurm_spank_init_post_opt",
 	"slurm_spank_local_user_init",
 	"slurm_spank_user_init",
@@ -90,6 +96,8 @@ const char *spank_syms[] = {
 	"slurm_spank_task_init",
 	"slurm_spank_task_post_fork",
 	"slurm_spank_task_exit",
+	"slurm_spank_job_epilog",
+	"slurm_spank_slurmd_exit",
 	"slurm_spank_exit"
 };
 
@@ -102,6 +110,7 @@ struct spank_plugin {
 	char **argv;
 	struct spank_plugin_operations ops;
 	struct spank_option *opts;
+	struct spank_stack *stack;
 };
 
 /*
@@ -120,24 +129,15 @@ struct spank_plugin_opt {
 };
 
 /*
- *  Initial value for global optvals for SPANK plugin options
- */
-static int spank_optval = 0xfff;
-
-/*
- *  Cache of options provided by spank plugins
- */
-static List option_cache = NULL;
-
-
-/*
- *  SPANK plugin context (local, remote, allocator)
+ *  SPANK plugin context type (local, remote, allocator)
  */
 enum spank_context_type {
 	S_TYPE_NONE,
 	S_TYPE_LOCAL,           /* LOCAL == srun              */
-	S_TYPE_REMOTE,          /* REMOTE == slurmd           */
-	S_TYPE_ALLOCATOR        /* ALLOCATOR == sbatch/salloc */
+	S_TYPE_REMOTE,          /* REMOTE == slurmstepd       */
+	S_TYPE_ALLOCATOR,       /* ALLOCATOR == sbatch/salloc */
+	S_TYPE_SLURMD,          /* SLURMD == slurmd           */
+	S_TYPE_JOB_SCRIPT,      /* JOB_SCRIPT == prolog/epilog*/
 };
 
 /*
@@ -145,6 +145,8 @@ enum spank_context_type {
  */
 typedef enum step_fn {
 	SPANK_INIT = 0,
+	SPANK_SLURMD_INIT,
+	SPANK_JOB_PROLOG,
 	SPANK_INIT_POST_OPT,
 	LOCAL_USER_INIT,
 	STEP_USER_INIT,
@@ -152,9 +154,19 @@ typedef enum step_fn {
 	STEP_USER_TASK_INIT,
 	STEP_TASK_POST_FORK,
 	STEP_TASK_EXIT,
+	SPANK_JOB_EPILOG,
+	SPANK_SLURMD_EXIT,
 	SPANK_EXIT
 } step_fn_t;
 
+/*
+ *  Job information in prolog/epilog context:
+ */
+struct job_script_info {
+	uint32_t  jobid;
+	uid_t     uid;
+};
+
 struct spank_handle {
 #   define SPANK_MAGIC 0x00a5a500
 	int                  magic;  /* Magic identifier to ensure validity. */
@@ -162,25 +174,95 @@ struct spank_handle {
 	step_fn_t            phase;  /* Which spank fn are we called from?   */
 	void               * job;    /* Reference to current srun|slurmd job */
 	slurmd_task_info_t * task;   /* Reference to current task (if valid) */
+	struct spank_stack  *stack;  /* Reference to the current plugin stack*/
 };
 
 /*
- *  SPANK plugins stack
+ *  SPANK stack. The stack of loaded plugins and associated state.
  */
-static List spank_stack = NULL;
-static enum spank_context_type spank_ctx = S_TYPE_NONE;
-
-static pthread_mutex_t spank_mutex = PTHREAD_MUTEX_INITIALIZER;
+struct spank_stack {
+	enum spank_context_type type;/*  Type of context for this stack      */
+	List plugin_list;	     /*  Stack of spank plugins              */
+	List option_cache;           /*  Cache of plugin options in this ctx */
+	int  spank_optval;           /*  optvalue for next plugin option     */
+	const char * plugin_path;    /*  default path to search for plugins  */
+};
 
 /*
- *  Default plugin dir
+ *  The global spank plugin stack:
  */
-static const char * default_spank_path = NULL;
+static struct spank_stack *global_spank_stack = NULL;
 
 /*
  *  Forward declarations
  */
 static int _spank_plugin_options_cache(struct spank_plugin *p);
+static int _spank_stack_load (struct spank_stack *stack, const char *file);
+static void _spank_plugin_destroy (struct spank_plugin *);
+static void _spank_plugin_opt_destroy (struct spank_plugin_opt *);
+static int spank_stack_get_remote_options(struct spank_stack *, job_options_t);
+static int spank_stack_get_remote_options_env (struct spank_stack *, char **);
+static int spank_stack_set_remote_options_env (struct spank_stack * stack);
+static int dyn_spank_set_job_env (const char *var, const char *val, int ovwt);
+
+static void spank_stack_destroy (struct spank_stack *stack)
+{
+	if (stack->plugin_list)
+		list_destroy (stack->plugin_list);
+	if (stack->option_cache)
+		list_destroy (stack->option_cache);
+	xfree (stack->plugin_path);
+	xfree (stack);
+}
+
+static struct spank_stack *
+spank_stack_create (const char *file, enum spank_context_type type)
+{
+	slurm_ctl_conf_t *conf;
+	struct spank_stack *stack = xmalloc (sizeof (*stack));
+
+	conf = slurm_conf_lock();
+	stack->plugin_path = xstrdup (conf->plugindir);
+	slurm_conf_unlock();
+
+	stack->type = type;
+	stack->spank_optval = 0xfff;
+	stack->plugin_list =
+		list_create ((ListDelF) _spank_plugin_destroy);
+	stack->option_cache =
+		list_create ((ListDelF) _spank_plugin_opt_destroy);
+
+	if (_spank_stack_load (stack, file) < 0) {
+		spank_stack_destroy (stack);
+		return (NULL);
+	}
+
+	return (stack);
+}
+
+static List get_global_option_cache (void)
+{
+	if (global_spank_stack)
+		return (global_spank_stack->option_cache);
+	else
+		return (NULL);
+}
+
+
+static int plugin_in_list (List l, struct spank_plugin *sp)
+{
+	int rc = 0;
+	struct spank_plugin *p;
+	ListIterator i = list_iterator_create (l);
+	while ((p = list_next (i))) {
+		if (p->fq_path == sp->fq_path) {
+			rc = 1;
+			break;
+		}
+	}
+	list_iterator_destroy (i);
+	return (rc);
+}
 
 static void _argv_append(char ***argv, int ac, const char *newarg)
 {
@@ -256,7 +338,8 @@ _plugin_stack_parse_line(char *line, char **plugin, int *acp, char ***argv,
 	return (0);
 }
 
-static struct spank_plugin *_spank_plugin_create(char *path, int ac,
+static struct spank_plugin *_spank_plugin_create(struct spank_stack *stack,
+						 char *path, int ac,
 						 char **av, bool required)
 {
 	struct spank_plugin *plugin;
@@ -283,11 +366,12 @@ static struct spank_plugin *_spank_plugin_create(char *path, int ac,
 	plugin->ac = ac;
 	plugin->argv = av;
 	plugin->ops = ops;
+	plugin->stack = stack;
 
 	/*
 	 *  Do not load static plugin options table in allocator context.
 	 */
-	if (spank_ctx != S_TYPE_ALLOCATOR)
+	if (stack->type != S_TYPE_ALLOCATOR)
 		plugin->opts = plugin_get_sym(p, "spank_options");
 
 	return (plugin);
@@ -353,10 +437,40 @@ _spank_plugin_find (const char *path, const char *file)
 	return (NULL);
 }
 
-static int _spank_conf_include (const char *, int, const char *, List *);
+static int _spank_conf_include (struct spank_stack *,
+	const char *, int, const char *);
 
 static int
-_spank_stack_process_line(const char *file, int line, char *buf, List *stackp)
+spank_stack_plugin_valid_for_context (struct spank_stack *stack,
+	struct spank_plugin *p)
+{
+	switch (stack->type) {
+	case S_TYPE_JOB_SCRIPT:
+		if (p->ops.job_prolog || p->ops.job_epilog)
+			return (1);
+		break;
+	case S_TYPE_SLURMD:
+		if (p->ops.slurmd_init || p->ops.slurmd_exit)
+			return (1);
+		break;
+	case S_TYPE_LOCAL:
+	case S_TYPE_ALLOCATOR:
+	case S_TYPE_REMOTE:
+		/*
+		 *  For backwards compatibility: All plugins were
+		 *   always loaded in these contexts, so continue
+		 *   to do so
+		 */
+		return (1);
+	default:
+		return (0);
+	}
+	return (0);
+}
+
+static int
+_spank_stack_process_line(struct spank_stack *stack,
+	const char *file, int line, char *buf)
 {
 	char **argv;
 	int ac;
@@ -372,7 +486,7 @@ _spank_stack_process_line(const char *file, int line, char *buf, List *stackp)
 	}
 
        if (type == CF_INCLUDE) {
-               int rc = _spank_conf_include (file, line, path, stackp);
+               int rc = _spank_conf_include (stack, file, line, path);
                xfree (path);
                return (rc);
        }
@@ -383,14 +497,14 @@ _spank_stack_process_line(const char *file, int line, char *buf, List *stackp)
 	if (path[0] != '/') {
 		char *f;
 
-		if ((f = _spank_plugin_find (default_spank_path, path))) {
+		if ((f = _spank_plugin_find (stack->plugin_path, path))) {
 			xfree (path);
 			path = f;
 		}
 	}
 
 	required = (type == CF_REQUIRED);
-	if (!(p = _spank_plugin_create(path, ac, argv, required))) {
+	if (!(p = _spank_plugin_create(stack, path, ac, argv, required))) {
 		if (required)
 			error ("spank: %s:%d:"
 			       " Failed to load plugin %s. Aborting.",
@@ -401,57 +515,64 @@ _spank_stack_process_line(const char *file, int line, char *buf, List *stackp)
 				 file, line, path);
 		return (required ? -1 : 0);
 	}
-	if (*stackp == NULL)
-		*stackp = list_create((ListDelF) _spank_plugin_destroy);
+
+	if (plugin_in_list (stack->plugin_list, p)) {
+		error ("spank: %s: cowardly refusing to load a second time",
+			p->fq_path);
+		_spank_plugin_destroy (p);
+		return (0);
+	}
+
+	if (!spank_stack_plugin_valid_for_context (stack, p)) {
+		verbose ("spank: %s: no callbacks in this context", p->fq_path);
+		_spank_plugin_destroy (p);
+		return (0);
+	}
 
 	verbose ("spank: %s:%d: Loaded plugin %s",
 			file, line, xbasename (p->fq_path));
 
-	list_append (*stackp, p);
+	list_append (stack->plugin_list, p);
 	_spank_plugin_options_cache(p);
 
 	return (0);
 }
 
 
-static int _spank_stack_create(const char *path, List * listp)
+static int _spank_stack_load(struct spank_stack *stack, const char *path)
 {
+	int rc = 0;
 	int line;
 	char buf[4096];
 	FILE *fp;
 
 	verbose("spank: opening plugin stack %s", path);
 
+	/*
+	 *  Try to open plugstack.conf. A missing config file is not an
+	 *   error, but is equivalent to an empty file.
+	 */
 	if (!(fp = safeopen(path, "r", SAFEOPEN_NOCREATE))) {
 		if (errno == ENOENT)
-			debug("spank: Failed to open %s: %m", path);
-		else
-			error("spank: Failed to open %s: %m", path);
-		return -1;
+			return (0);
+		error("spank: Failed to open %s: %m", path);
+		return (-1);
 	}
 
 	line = 1;
 	while (fgets(buf, sizeof(buf), fp)) {
-		if (_spank_stack_process_line(path, line, buf, listp) < 0)
-			goto fail_immediately;
+		rc = _spank_stack_process_line(stack, path, line, buf);
+		if (rc < 0)
+			break;
 		line++;
 	}
 
 	fclose(fp);
-	return (0);
-
-      fail_immediately:
-	if (*listp != NULL) {
-		list_destroy(*listp);
-		*listp = NULL;
-	}
-	fclose(fp);
-	return (-1);
+	return (rc);
 }
 
-static int
-_spank_conf_include (const char *file, int lineno, const char *pattern,
-		List *stackp)
+static int _spank_conf_include (struct spank_stack *stack,
+		const char *file, int lineno, const char *pattern)
 {
 	int rc = 0;
 	glob_t gl;
@@ -480,7 +601,7 @@ _spank_conf_include (const char *file, int lineno, const char *pattern,
 	switch (rc) {
 	  case 0:
 	  	for (i = 0; i < gl.gl_pathc; i++) {
-			rc = _spank_stack_create (gl.gl_pathv[i], stackp);
+			rc = _spank_stack_load (stack, gl.gl_pathv[i]);
 			if (rc < 0)
 				break;
 		}
@@ -504,18 +625,19 @@ _spank_conf_include (const char *file, int lineno, const char *pattern,
 }
 
 static int
-_spank_handle_init(struct spank_handle *spank, void * arg,
-		   int taskid, step_fn_t fn)
+_spank_handle_init(struct spank_handle *spank, struct spank_stack *stack,
+		void * arg, int taskid, step_fn_t fn)
 {
 	memset(spank, 0, sizeof(*spank));
 	spank->magic = SPANK_MAGIC;
 	spank->plugin = NULL;
 
 	spank->phase = fn;
+	spank->stack = stack;
 
 	if (arg != NULL) {
 		spank->job = arg;
-		if (spank_ctx == S_TYPE_REMOTE && taskid >= 0) {
+		if (stack->type == S_TYPE_REMOTE && taskid >= 0) {
 			spank->task = ((slurmd_job_t *) arg)->task[taskid];
 		}
 	}
@@ -527,6 +649,10 @@ static const char *_step_fn_name(step_fn_t type)
 	switch (type) {
 	case SPANK_INIT:
 		return ("init");
+	case SPANK_SLURMD_INIT:
+		return ("slurmd_init");
+	case SPANK_JOB_PROLOG:
+		return ("job_prolog");
 	case SPANK_INIT_POST_OPT:
 		return ("init_post_opt");
 	case LOCAL_USER_INIT:
@@ -541,6 +667,10 @@ static const char *_step_fn_name(step_fn_t type)
 		return ("task_post_fork");
 	case STEP_TASK_EXIT:
 		return ("task_exit");
+	case SPANK_JOB_EPILOG:
+		return ("job_epilog");
+	case SPANK_SLURMD_EXIT:
+		return ("slurmd_exit");
 	case SPANK_EXIT:
 		return ("exit");
 	}
@@ -549,7 +679,44 @@ static const char *_step_fn_name(step_fn_t type)
 	return ("unknown");
 }
 
-static int _do_call_stack(step_fn_t type, void * job, int taskid)
+static spank_f *spank_plugin_get_fn (struct spank_plugin *sp, step_fn_t type)
+{
+	switch (type) {
+	case SPANK_INIT:
+		return (sp->ops.init);
+	case SPANK_SLURMD_INIT:
+		return (sp->ops.slurmd_init);
+	case SPANK_JOB_PROLOG:
+		return (sp->ops.job_prolog);
+	case SPANK_INIT_POST_OPT:
+		return (sp->ops.init_post_opt);
+	case LOCAL_USER_INIT:
+		return (sp->ops.local_user_init);
+	case STEP_USER_INIT:
+		return (sp->ops.user_init);
+	case STEP_TASK_INIT_PRIV:
+		return (sp->ops.task_init_privileged);
+	case STEP_USER_TASK_INIT:
+		return (sp->ops.user_task_init);
+	case STEP_TASK_POST_FORK:
+		return (sp->ops.task_post_fork);
+	case STEP_TASK_EXIT:
+		return (sp->ops.task_exit);
+	case SPANK_JOB_EPILOG:
+		return (sp->ops.job_epilog);
+	case SPANK_SLURMD_EXIT:
+		return (sp->ops.slurmd_exit);
+	case SPANK_EXIT:
+		return (sp->ops.exit);
+	default:
+		error ("Unhandled spank function type=%d\n", type);
+		return (NULL);
+	}
+	return (NULL);
+}
+
+static int _do_call_stack(struct spank_stack *stack,
+	step_fn_t type, void * job, int taskid)
 {
 	int rc = 0;
 	ListIterator i;
@@ -557,97 +724,29 @@ static int _do_call_stack(step_fn_t type, void * job, int taskid)
 	struct spank_handle spank[1];
 	const char *fn_name;
 
-	if (!spank_stack)
-		return (0);
+	if (!stack)
+		return (-1);
 
-	if (_spank_handle_init(spank, job, taskid, type) < 0) {
+	if (_spank_handle_init(spank, stack, job, taskid, type) < 0) {
 		error("spank: Failed to initialize handle for plugins");
 		return (-1);
 	}
 
 	fn_name = _step_fn_name(type);
 
-	i = list_iterator_create(spank_stack);
+	i = list_iterator_create(stack->plugin_list);
 	while ((sp = list_next(i))) {
 		const char *name = xbasename(sp->fq_path);
+		spank_f *spank_fn;
 
 		spank->plugin = sp;
 
-		switch (type) {
-		case SPANK_INIT:
-			if (sp->ops.init) {
-				rc = (*sp->ops.init) (spank, sp->ac,
-						      sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		case SPANK_INIT_POST_OPT:
-			if (sp->ops.init_post_opt) {
-				rc = (*sp->ops.init_post_opt) (spank, sp->ac,
-						      sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		case LOCAL_USER_INIT:
-			if (sp->ops.local_user_init) {
-				rc = (*sp->ops.local_user_init) (spank, sp->ac,
-			 				         sp->argv);
-				debug2("spank: %s: %s = %d", name, fn_name, rc);
-			}
-			break;
-		case STEP_USER_INIT:
-			if (sp->ops.user_init) {
-				rc = (*sp->ops.user_init) (spank, sp->ac,
-							   sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		case STEP_TASK_INIT_PRIV:
-			if (sp->ops.task_init_privileged) {
-				rc = (*sp->ops.task_init_privileged)
-					(spank, sp->ac, sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		case STEP_USER_TASK_INIT:
-			if (sp->ops.user_task_init) {
-				rc = (*sp->ops.user_task_init) (spank,
-								sp->ac,
-								sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		case STEP_TASK_POST_FORK:
-			if (sp->ops.task_post_fork) {
-				rc = (*sp->ops.task_post_fork) (spank,
-								sp->ac,
-								sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		case STEP_TASK_EXIT:
-			if (sp->ops.task_exit) {
-				rc = (*sp->ops.task_exit) (spank, sp->ac,
-							   sp->argv);
-				debug2("spank: %s: %s = %d", name, fn_name,
-				       rc);
-			}
-			break;
-		case SPANK_EXIT:
-			if (sp->ops.exit) {
-				rc = (*sp->ops.exit) (spank, sp->ac,
-						      sp->argv);
-				debug2("spank: %s: %s = %d", name,
-				       fn_name, rc);
-			}
-			break;
-		}
+		spank_fn = spank_plugin_get_fn (sp, type);
+		if (!spank_fn)
+			continue;
+
+		rc = (*spank_fn) (spank, sp->ac, sp->argv);
+		debug2("spank: %s: %s = %d", name, fn_name, rc);
 
 		if ((rc < 0) && sp->required) {
 			error("spank: required plugin %s: "
@@ -662,52 +761,32 @@ static int _do_call_stack(step_fn_t type, void * job, int taskid)
 	return (rc);
 }
 
-int _spank_init(enum spank_context_type context, slurmd_job_t * job)
+struct spank_stack *spank_stack_init(enum spank_context_type context)
 {
 	slurm_ctl_conf_t *conf = slurm_conf_lock();
 	const char *path = conf->plugstack;
-	default_spank_path = conf->plugindir;
 	slurm_conf_unlock();
 
-	spank_ctx = context;
-
-	/*
-	 *  A nonexistent spank config is not an error, but
-	 *   abort on any other access failures
-	 */
-	if (access (path, R_OK) < 0) {
-		if (errno == ENOENT)
-			return (0);
-		error ("spank: Unable to open config file `%s': %m", path);
-		return (-1);
-	}
+	return spank_stack_create (path, context);
+}
 
-	if (_spank_stack_create(path, &spank_stack) < 0) {
-		error("spank: failed to create plugin stack");
-		return (-1);
-	}
+int _spank_init(enum spank_context_type context, slurmd_job_t * job)
+{
+	struct spank_stack *stack;
 
-	if (_do_call_stack(SPANK_INIT, job, -1) < 0)
+	if (!(stack = spank_stack_init (context)))
 		return (-1);
+	global_spank_stack = stack;
 
-	/*
-	 *  Nothing more to do unless we are in remote context:
-	 */
-	if (spank_ctx != S_TYPE_REMOTE)
-		return (0);
-
-	/*
-	 *  Remote-specific code:
-	 */
-	if (!job) {
-		error("spank: spank_init called without job reference!");
-		return (-1);
-	}
+	return (_do_call_stack(stack, SPANK_INIT, job, -1));
+}
 
+static int spank_stack_post_opt (struct spank_stack * stack, slurmd_job_t *job)
+{
 	/*
 	 *  Get any remote options from job launch message:
 	 */
-	if (spank_get_remote_options(job->options) < 0) {
+	if (spank_stack_get_remote_options(stack, job->options) < 0) {
 		error("spank: Unable to get remote options");
 		return (-1);
 	}
@@ -715,22 +794,39 @@ int _spank_init(enum spank_context_type context, slurmd_job_t * job)
 	/*
 	 *  Get any remote option passed thru environment
 	 */
-	if (spank_get_remote_options_env(job->env) < 0) {
+	if (spank_stack_get_remote_options_env(stack, job->env) < 0) {
 		error("spank: Unable to get remote options from environment");
 		return (-1);
 	}
 
+	/*
+	 * Now clear any remaining options passed through environment
+	 */
+	spank_clear_remote_options_env (job->env);
+
 	/*
 	 *  Now that all options have been processed, we can
 	 *   call the post_opt handlers here in remote context.
 	 */
-	return (_do_call_stack(SPANK_INIT_POST_OPT, job, -1) < 0);
+	return (_do_call_stack(stack, SPANK_INIT_POST_OPT, job, -1) < 0);
+
+}
+
+static int spank_init_remote (slurmd_job_t *job)
+{
+	if (_spank_init (S_TYPE_REMOTE, job) < 0)
+		return (-1);
+
+	/*
+	 * _spank_init initializes global_spank_stack
+	 */
+	return (spank_stack_post_opt (global_spank_stack, job));
 }
 
 int spank_init (slurmd_job_t * job)
 {
 	if (job)
-		return _spank_init (S_TYPE_REMOTE, job);
+		return spank_init_remote (job);
 	else
 		return _spank_init (S_TYPE_LOCAL, NULL);
 }
@@ -740,70 +836,112 @@ int spank_init_allocator (void)
 	return _spank_init (S_TYPE_ALLOCATOR, NULL);
 }
 
+int spank_slurmd_init (void)
+{
+	return _spank_init (S_TYPE_SLURMD, NULL);
+}
+
 int spank_init_post_opt (void)
 {
+	struct spank_stack *stack = global_spank_stack;
+
 	/*
-	 *  In allocator context, set remote options in env here.
+	 *  Set remote options in our environment and the
+	 *   spank_job_env so that we can always pull them out
+	 *   on the remote side and/or job prolog epilog.
 	 */
-	if (spank_ctx == S_TYPE_ALLOCATOR)
-		spank_set_remote_options_env();
+	spank_stack_set_remote_options_env (stack);
 
-	return (_do_call_stack(SPANK_INIT_POST_OPT, NULL, -1));
+	return (_do_call_stack(stack, SPANK_INIT_POST_OPT, NULL, -1));
 }
 
 int spank_user(slurmd_job_t * job)
 {
-	return (_do_call_stack(STEP_USER_INIT, job, -1));
+	return (_do_call_stack(global_spank_stack, STEP_USER_INIT, job, -1));
 }
 
 int spank_local_user(struct spank_launcher_job_info *job)
 {
-	return (_do_call_stack(LOCAL_USER_INIT, job, -1));
+	return (_do_call_stack(global_spank_stack, LOCAL_USER_INIT, job, -1));
 }
 
 int spank_task_privileged(slurmd_job_t *job, int taskid)
 {
-	return (_do_call_stack(STEP_TASK_INIT_PRIV, job, taskid));
+	return (_do_call_stack(global_spank_stack, STEP_TASK_INIT_PRIV, job, taskid));
 }
 
 int spank_user_task(slurmd_job_t * job, int taskid)
 {
-	return (_do_call_stack(STEP_USER_TASK_INIT, job, taskid));
+	return (_do_call_stack(global_spank_stack, STEP_USER_TASK_INIT, job, taskid));
 }
 
 int spank_task_post_fork(slurmd_job_t * job, int taskid)
 {
-	return (_do_call_stack(STEP_TASK_POST_FORK, job, taskid));
+	return (_do_call_stack(global_spank_stack, STEP_TASK_POST_FORK, job, taskid));
 }
 
 int spank_task_exit(slurmd_job_t * job, int taskid)
 {
-	return (_do_call_stack(STEP_TASK_EXIT, job, taskid));
+	return (_do_call_stack(global_spank_stack, STEP_TASK_EXIT, job, taskid));
+}
+
+int spank_slurmd_exit (void)
+{
+	int rc;
+	rc =  _do_call_stack (global_spank_stack, SPANK_SLURMD_EXIT, NULL, 0);
+	spank_stack_destroy (global_spank_stack);
+	global_spank_stack = NULL;
+	return (rc);
 }
 
 int spank_fini(slurmd_job_t * job)
 {
-	int rc = _do_call_stack(SPANK_EXIT, job, -1);
+	int rc = _do_call_stack(global_spank_stack, SPANK_EXIT, job, -1);
 
-	if (option_cache)
-		list_destroy(option_cache);
-	if (spank_stack)
-		list_destroy(spank_stack);
+	spank_stack_destroy (global_spank_stack);
+	global_spank_stack = NULL;
 
 	return (rc);
 }
 
+/*
+ *  Run job_epilog or job_prolog callbacks in a private spank context.
+ */
+static int spank_job_script (step_fn_t fn, uint32_t jobid, uid_t uid)
+{
+	int rc = 0;
+	struct spank_stack *stack;
+	struct job_script_info jobinfo = { jobid, uid };
+
+	stack = spank_stack_init (S_TYPE_JOB_SCRIPT);
+	if (!stack)
+		return (-1);
+	global_spank_stack = stack;
+
+	rc = _do_call_stack (stack, fn, &jobinfo, -1);
+
+	spank_stack_destroy (stack);
+	global_spank_stack = NULL;
+	return (rc);
+}
+
+int spank_job_prolog (uint32_t jobid, uid_t uid)
+{
+	return spank_job_script (SPANK_JOB_PROLOG, jobid, uid);
+}
+
+int spank_job_epilog (uint32_t jobid, uid_t uid)
+{
+	return spank_job_script (SPANK_JOB_EPILOG, jobid, uid);
+}
+
 /*
  *  SPANK options functions
  */
 
-static int _spank_next_option_val(void)
+static int _spank_next_option_val(struct spank_stack *stack)
 {
-	int optval;
-	slurm_mutex_lock(&spank_mutex);
-	optval = spank_optval++;
-	slurm_mutex_unlock(&spank_mutex);
-	return (optval);
+	return (stack->spank_optval++);
 }
 
 static struct spank_option * _spank_option_copy(struct spank_option *opt)
@@ -842,7 +980,7 @@ static struct spank_plugin_opt *_spank_plugin_opt_create(struct
 	struct spank_plugin_opt *spopt = xmalloc(sizeof(*spopt));
 	spopt->opt = _spank_option_copy (opt);
 	spopt->plugin = p;
-	spopt->optval = _spank_next_option_val();
+	spopt->optval = _spank_next_option_val(p->stack);
 	spopt->found = 0;
 	spopt->optarg = NULL;
 
@@ -873,11 +1011,15 @@ _spank_option_register(struct spank_plugin *p, struct spank_option *opt)
 {
 	int disabled = 0;
 	struct spank_plugin_opt *spopt;
+	struct spank_stack *stack;
+	List option_cache;
 
-	if (!option_cache) {
-		option_cache =
-		    list_create((ListDelF) _spank_plugin_opt_destroy);
+	stack = p->stack;
+	if (stack == NULL) {
+		error ("spank: %s: can't determine plugin context", p->name);
+		return (ESPANK_BAD_ARG);
 	}
+	option_cache = stack->option_cache;
 
 	spopt = list_find_first(option_cache,
 			(ListFindF) _opt_by_name, opt->name);
@@ -967,6 +1109,10 @@ struct option *spank_option_table_create(const struct option *orig)
 	struct option *opts = NULL;
 	ListIterator i = NULL;
 
+	List option_cache = get_global_option_cache();
+	if (option_cache == NULL)
+		return (NULL);
+
 	opts = optz_create();
 
 	/*
@@ -1000,6 +1146,7 @@ int spank_process_option(int optval, const char *arg)
 {
 	struct spank_plugin_opt *opt;
 	int rc = 0;
+	List option_cache = get_global_option_cache();
 
 	if (option_cache == NULL || (list_count(option_cache) == 0))
 		return (-1);
@@ -1161,6 +1308,7 @@ int spank_print_options(FILE * fp, int left_pad, int width)
 {
 	struct spank_plugin_opt *p;
 	ListIterator i;
+	List option_cache = get_global_option_cache();
 
 	if ((option_cache == NULL) || (list_count(option_cache) == 0))
 		return (0);
@@ -1232,13 +1380,21 @@ static int _option_setenv (struct spank_plugin_opt *option)
 	if (setenv (var, option->optarg, 1) < 0)
 	    error ("failed to set %s=%s in env", var, option->optarg);
 
+	if (dyn_spank_set_job_env (var, option->optarg, 1) < 0)
+	    error ("failed to set %s=%s in env", var, option->optarg);
+
 	return (0);
 }
 
-int spank_set_remote_options_env(void)
+static int spank_stack_set_remote_options_env (struct spank_stack *stack)
 {
 	struct spank_plugin_opt *p;
 	ListIterator i;
+	List option_cache;
+
+	if (stack == NULL)
+		return (0);
+	option_cache = stack->option_cache;
 
 	if ((option_cache == NULL) || (list_count(option_cache) == 0))
 		return (0);
@@ -1256,6 +1412,11 @@ int spank_set_remote_options(job_options_t opts)
 {
 	struct spank_plugin_opt *p;
 	ListIterator i;
+	List option_cache;
+
+	if (global_spank_stack == NULL)
+		return (0);
+	option_cache = global_spank_stack->option_cache;
 
 	if ((option_cache == NULL) || (list_count(option_cache) == 0))
 		return (0);
@@ -1292,12 +1453,14 @@ static int _opt_find(struct spank_plugin_opt *p,
 	return (1);
 }
 
-static struct spank_plugin_opt *_find_remote_option_by_name(const char *str)
+static struct spank_plugin_opt *
+spank_stack_find_option_by_name(struct spank_stack *stack, const char *str)
 {
 	struct spank_plugin_opt *opt = NULL;
 	struct opt_find_args args;
 	char buf[256];
 	char *name;
+	List option_cache = stack->option_cache;
 
 	if (strlcpy(buf, str, sizeof(buf)) >= sizeof(buf)) {
 		error("plugin option \"%s\" too big. Ignoring.", str);
@@ -1332,12 +1495,89 @@ static struct spank_plugin_opt *_find_remote_option_by_name(const char *str)
 	return (opt);
 }
 
+spank_err_t
+spank_option_getopt (spank_t sp, struct spank_option *opt, char **argp)
+{
+	const char *val;
+	char var[1024];
+	List option_cache;
+	struct spank_plugin_opt *spopt;
+
+	if (argp)
+		*argp = NULL;
+
+	if (!sp->plugin) {
+		error ("spank_option_getopt: Not called from a plugin!?");
+		return (ESPANK_NOT_AVAIL);
+	}
+
+	if (sp->phase == SPANK_INIT)
+		return (ESPANK_NOT_AVAIL);
+
+	if (!opt || !opt->name)
+		return (ESPANK_BAD_ARG);
+
+	if (opt->has_arg && !argp)
+		return (ESPANK_BAD_ARG);
+
+	/*
+	 *   First check the cache:
+	 */
+	option_cache = sp->stack->option_cache;
+	spopt = list_find_first (option_cache,
+	                         (ListFindF) _opt_by_name,
+	                         opt->name);
+	if (spopt) {
+		if (opt->has_arg && argp)
+			*argp = spopt->optarg;
+		return (ESPANK_SUCCESS);
+	}
+
+	/*
+	 *  Otherwise, check current environment:
+	 *
+	 *  We need to check for variables that start with either
+	 *   the default spank option env prefix, or the default
+	 *   prefix + an *extra* prefix of SPANK_, in case we're
+	 *   running in prolog/epilog, where SLURM prepends SPANK_
+	 *   to all spank job environment variables.
+	 */
+	spopt = _spank_plugin_opt_create (sp->plugin, opt, 0);
+	memcpy (var, "SPANK_", 6);
+	if ((val = getenv (_opt_env_name(spopt, var+6, sizeof (var) - 6))) ||
+	    (val = getenv (var))) {
+		spopt->optarg = xstrdup (val);
+		spopt->found = 1;
+		if (opt->has_arg && argp)
+			*argp = spopt->optarg;
+	}
+
+	/*
+	 *  Cache the result
+	 */
+	list_append (option_cache, spopt);
+
+	if (!spopt->found)
+		return (ESPANK_ERROR);
+
+	return (ESPANK_SUCCESS);
+}
+
+
 int spank_get_remote_options_env (char **env)
+{
+	return spank_stack_get_remote_options_env (global_spank_stack, env);
+}
+
+
+static int
+spank_stack_get_remote_options_env (struct spank_stack *stack, char **env)
 {
 	char var [1024];
 	const char *arg;
 	struct spank_plugin_opt *option;
 	ListIterator i;
+	List option_cache = stack->option_cache;
 
 	if (!option_cache)
 		return (0);
@@ -1367,6 +1607,12 @@ int spank_get_remote_options_env (char **env)
 }
 
 int spank_get_remote_options(job_options_t opts)
+{
+	return spank_stack_get_remote_options (global_spank_stack, opts);
+}
+
+static int
+spank_stack_get_remote_options(struct spank_stack *stack, job_options_t opts)
 {
 	const struct job_option_info *j;
 
@@ -1378,7 +1624,7 @@ int spank_get_remote_options(job_options_t opts)
 		if (j->type != OPT_TYPE_SPANK)
 			continue;
 
-		if (!(opt = _find_remote_option_by_name(j->option)))
+		if (!(opt = spank_stack_find_option_by_name(stack, j->option)))
 			continue;
 
 		p = opt->opt;
@@ -1392,6 +1638,36 @@ int spank_get_remote_options(job_options_t opts)
 	return (0);
 }
 
+/*
+ *  Clear any environment variables for spank options.
+ *   spank option env vars  have a prefix of SPANK_OPTION_ENV_PREFIX,
+ *   or SPANK_ + SPANK_OPTION_ENV_PREFIX
+ */
+int spank_clear_remote_options_env (char **env)
+{
+	char **ep;
+	int len = strlen (SPANK_OPTION_ENV_PREFIX);
+
+	for (ep = env; *ep; ep++) {
+		char *p = *ep;
+		if (strncmp (*ep, "SPANK_", 6) == 0)
+			p = *ep+6;
+		if (strncmp (p, SPANK_OPTION_ENV_PREFIX, len) == 0) {
+			char *end = strchr (p+len, '=');
+			if (end) {
+				char name[1024];
+				memcpy (name, *ep, end - *ep);
+				name [end - *ep] = '\0';
+				info ("unsetenv (%s)\n", name);
+				unsetenvp (env, name);
+			}
+		}
+	}
+	return (0);
+}
+
+
+
 static int tasks_execd (spank_t spank)
 {
 	return ( (spank->phase == STEP_TASK_POST_FORK)
@@ -1450,7 +1726,7 @@ static int _valid_in_allocator_context (spank_item_t item)
 	}
 }
 
-static spank_err_t _check_spank_item_validity (spank_item_t item, void *job)
+static spank_err_t _check_spank_item_validity (spank_t spank, spank_item_t item)
 {
 	/*
 	 *  Valid in all contexts:
@@ -1465,15 +1741,24 @@ static spank_err_t _check_spank_item_validity (spank_item_t item, void *job)
 		  break; /* fallthru */
 	}
 
-	if (spank_ctx == S_TYPE_LOCAL) {
+	/*
+	 *  No spank_item_t is available in slurmd context at this time.
+	 */
+	if (spank->stack->type == S_TYPE_SLURMD)
+		return ESPANK_NOT_AVAIL;
+	else if (spank->stack->type == S_TYPE_JOB_SCRIPT) {
+		if (item != S_JOB_UID && item != S_JOB_ID)
+			return ESPANK_NOT_AVAIL;
+	}
+	else if (spank->stack->type == S_TYPE_LOCAL) {
 		if (!_valid_in_local_context (item))
 			return ESPANK_NOT_REMOTE;
-		else if (job == NULL)
+		else if (spank->job == NULL)
 			return ESPANK_NOT_AVAIL;
 	}
-	else if (spank_ctx == S_TYPE_ALLOCATOR) {
+	else if (spank->stack->type == S_TYPE_ALLOCATOR) {
 		if (_valid_in_allocator_context (item)) {
-			if (job)
+			if (spank->job)
 				return ESPANK_SUCCESS;
 			else
 				return ESPANK_NOT_AVAIL;
@@ -1543,7 +1828,7 @@ int spank_remote(spank_t spank)
 {
 	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
 		return (-1);
-	if (spank_ctx == S_TYPE_REMOTE)
+	if (spank->stack->type == S_TYPE_REMOTE)
 		return (1);
 	else
 		return (0);
@@ -1551,13 +1836,19 @@ int spank_remote(spank_t spank)
 
 spank_context_t spank_context (void)
 {
-	switch (spank_ctx) {
+	if (global_spank_stack == NULL)
+		return S_CTX_ERROR;
+	switch (global_spank_stack->type) {
 	  case S_TYPE_REMOTE:
 		  return S_CTX_REMOTE;
 	  case S_TYPE_LOCAL:
 		  return S_CTX_LOCAL;
 	  case S_TYPE_ALLOCATOR:
 		  return S_CTX_ALLOCATOR;
+	  case S_TYPE_SLURMD:
+		  return S_CTX_SLURMD;
+	  case S_TYPE_JOB_SCRIPT:
+		  return S_CTX_JOB_SCRIPT;
 	  default:
 		  return S_CTX_ERROR;
 	}
@@ -1582,6 +1873,7 @@ spank_err_t spank_get_item(spank_t spank, spank_item_t item, ...)
 	slurmd_task_info_t *task;
 	slurmd_job_t  *slurmd_job = NULL;
 	struct spank_launcher_job_info *launcher_job = NULL;
+	struct job_script_info *s_job_info = NULL;
 	va_list vargs;
 	spank_err_t rc = ESPANK_SUCCESS;
 
@@ -1591,31 +1883,35 @@ spank_err_t spank_get_item(spank_t spank, spank_item_t item, ...)
 	/*
 	 *  Check for validity of the given item in the current context
 	 */
-	rc = _check_spank_item_validity (item, spank->job);
+	rc = _check_spank_item_validity (spank, item);
 	if (rc != ESPANK_SUCCESS)
 		return (rc);
 
-	if (spank_ctx == S_TYPE_LOCAL)
+	if (spank->stack->type == S_TYPE_LOCAL)
 		launcher_job = spank->job;
-	else if (spank_ctx == S_TYPE_REMOTE)
+	else if (spank->stack->type == S_TYPE_REMOTE)
 		slurmd_job = spank->job;
+	else if (spank->stack->type == S_TYPE_JOB_SCRIPT)
+		s_job_info = spank->job;
 
 	va_start(vargs, item);
 	switch (item) {
 	case S_JOB_UID:
 		p2uid = va_arg(vargs, uid_t *);
-		if (spank_ctx == S_TYPE_LOCAL)
+		if (spank->stack->type == S_TYPE_LOCAL)
 			*p2uid = launcher_job->uid;
-		else if (spank_ctx == S_TYPE_REMOTE)
+		else if (spank->stack->type == S_TYPE_REMOTE)
 			*p2uid = slurmd_job->uid;
+		else if (spank->stack->type == S_TYPE_JOB_SCRIPT)
+			*p2uid = s_job_info->uid;
 		else
 			*p2uid = getuid();
 		break;
 	case S_JOB_GID:
 		p2gid = va_arg(vargs, gid_t *);
-		if (spank_ctx == S_TYPE_LOCAL)
+		if (spank->stack->type == S_TYPE_LOCAL)
 			*p2gid = launcher_job->gid;
-		else if (spank_ctx == S_TYPE_REMOTE)
+		else if (spank->stack->type == S_TYPE_REMOTE)
 			*p2gid = slurmd_job->gid;
 		else
 			*p2gid = getgid();
@@ -1628,21 +1924,23 @@ spank_err_t spank_get_item(spank_t spank, spank_item_t item, ...)
 		break;
 	case S_JOB_ID:
 		p2uint32 = va_arg(vargs, uint32_t *);
-		if (spank_ctx == S_TYPE_LOCAL)
+		if (spank->stack->type == S_TYPE_LOCAL)
 			*p2uint32 = launcher_job->jobid;
-		else
+		else if (spank->stack->type == S_TYPE_REMOTE)
 			*p2uint32 = slurmd_job->jobid;
+		else if (spank->stack->type == S_TYPE_JOB_SCRIPT)
+			*p2uint32 = s_job_info->jobid;
 		break;
 	case S_JOB_STEPID:
 		p2uint32 = va_arg(vargs, uint32_t *);
-		if (spank_ctx == S_TYPE_LOCAL)
+		if (spank->stack->type == S_TYPE_LOCAL)
 			*p2uint32 = launcher_job->stepid;
 		else
 			*p2uint32 = slurmd_job->stepid;
 		break;
 	case S_JOB_NNODES:
 		p2uint32 = va_arg(vargs, uint32_t *);
-		if (spank_ctx == S_TYPE_LOCAL) {
+		if (spank->stack->type == S_TYPE_LOCAL) {
 			if (launcher_job->step_layout)
 				*p2uint32 = launcher_job->step_layout->
 					    node_cnt;
@@ -1663,7 +1961,7 @@ spank_err_t spank_get_item(spank_t spank, spank_item_t item, ...)
 		break;
 	case S_JOB_TOTAL_TASK_COUNT:
 		p2uint32 = va_arg(vargs, uint32_t *);
-		if (spank_ctx == S_TYPE_LOCAL) {
+		if (spank->stack->type == S_TYPE_LOCAL) {
 			if (launcher_job->step_layout)
 				*p2uint32 = launcher_job->step_layout->
 					    task_cnt;
@@ -1685,7 +1983,7 @@ spank_err_t spank_get_item(spank_t spank, spank_item_t item, ...)
 	case S_JOB_ARGV:
 		p2int = va_arg(vargs, int *);
 		p2argv = va_arg(vargs, char ***);
-		if (spank_ctx == S_TYPE_LOCAL) {
+		if (spank->stack->type == S_TYPE_LOCAL) {
 			*p2int = launcher_job->argc;
 			*p2argv = launcher_job->argv;
 		} else {
@@ -1811,19 +2109,26 @@ spank_err_t spank_get_item(spank_t spank, spank_item_t item, ...)
 	return (rc);
 }
 
-spank_err_t spank_getenv(spank_t spank, const char *var, char *buf,
-			 int len)
+spank_err_t spank_env_access_check (spank_t spank)
 {
-	char *val;
-
 	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
 		return (ESPANK_BAD_ARG);
-
-	if (spank_ctx != S_TYPE_REMOTE)
+	if (spank->stack->type != S_TYPE_REMOTE)
 		return (ESPANK_NOT_REMOTE);
-
 	if (spank->job == NULL)
 		return (ESPANK_BAD_ARG);
+	return (ESPANK_SUCCESS);
+
+}
+
+spank_err_t spank_getenv(spank_t spank, const char *var, char *buf,
+			 int len)
+{
+	char *val;
+	spank_err_t err = spank_env_access_check (spank);
+
+	if (err != ESPANK_SUCCESS)
+		return (err);
 
 	if (len < 0)
 		return (ESPANK_BAD_ARG);
@@ -1841,15 +2146,10 @@ spank_err_t spank_setenv(spank_t spank, const char *var, const char *val,
 			 int overwrite)
 {
 	slurmd_job_t * job;
+	spank_err_t err = spank_env_access_check (spank);
 
-	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
-		return (ESPANK_BAD_ARG);
-
-	if (spank_ctx != S_TYPE_REMOTE)
-		return (ESPANK_NOT_REMOTE);
-
-	if (spank->job == NULL)
-		return (ESPANK_BAD_ARG);
+	if (err != ESPANK_SUCCESS)
+		return (err);
 
 	if ((var == NULL) || (val == NULL))
 		return (ESPANK_BAD_ARG);
@@ -1867,14 +2167,10 @@ spank_err_t spank_setenv(spank_t spank, const char *var, const char *val,
 
 spank_err_t spank_unsetenv (spank_t spank, const char *var)
 {
-	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
-		return (ESPANK_BAD_ARG);
+	spank_err_t err = spank_env_access_check (spank);
 
-	if (spank_ctx != S_TYPE_REMOTE)
-		return (ESPANK_NOT_REMOTE);
-
-	if (spank->job == NULL)
-		return (ESPANK_BAD_ARG);
+	if (err != ESPANK_SUCCESS)
+		return (err);
 
 	if (var == NULL)
 		return (ESPANK_BAD_ARG);
@@ -1924,20 +2220,33 @@ extern int dyn_spank_unset_job_env (const char *n)
 	return ((*fn) (n));
 }
 
+static spank_err_t spank_job_control_access_check (spank_t spank)
+{
+	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
+		return (ESPANK_BAD_ARG);
+
+	if (spank_remote (spank))
+		return (ESPANK_NOT_LOCAL);
+
+	if (spank->stack->type == S_TYPE_SLURMD)
+		return (ESPANK_NOT_AVAIL);
+
+	return (ESPANK_SUCCESS);
+}
+
 
 spank_err_t spank_job_control_getenv (spank_t spank, const char *var,
 			char *buf, int len)
 {
 	const char *val;
-	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
-		return (ESPANK_BAD_ARG);
+	spank_err_t err;
+
+	if ((err = spank_job_control_access_check (spank)))
+		return (err);
 
 	if ((var == NULL) || (buf == NULL) || (len <= 0))
 		return (ESPANK_BAD_ARG);
 
-	if (spank_remote (spank))
-		return (ESPANK_NOT_LOCAL);
-
 	val = dyn_spank_get_job_env (var);
 	if (val == NULL)
 		return (ESPANK_ENV_NOEXIST);
@@ -1951,15 +2260,14 @@ spank_err_t spank_job_control_getenv (spank_t spank, const char *var,
 spank_err_t spank_job_control_setenv (spank_t spank, const char *var,
 			const char *val, int overwrite)
 {
-	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
-		return (ESPANK_BAD_ARG);
+	spank_err_t err;
+
+	if ((err = spank_job_control_access_check (spank)))
+		return (err);
 
 	if ((var == NULL) || (val == NULL))
 		return (ESPANK_BAD_ARG);
 
-	if (spank_remote (spank))
-		return (ESPANK_NOT_LOCAL);
-
 	if (dyn_spank_set_job_env (var, val, overwrite) < 0)
 		return (ESPANK_BAD_ARG);
 
@@ -1968,15 +2276,14 @@ spank_err_t spank_job_control_setenv (spank_t spank, const char *var,
 
 spank_err_t spank_job_control_unsetenv (spank_t spank, const char *var)
 {
-	if ((spank == NULL) || (spank->magic != SPANK_MAGIC))
-		return (ESPANK_BAD_ARG);
+	spank_err_t err;
+
+	if ((err = spank_job_control_access_check (spank)))
+		return (err);
 
 	if (var == NULL)
 		return (ESPANK_BAD_ARG);
 
-	if (spank_remote (spank))
-		return (ESPANK_NOT_LOCAL);
-
 	if (dyn_spank_unset_job_env (var) < 0)
 		return (ESPANK_BAD_ARG);
 
diff --git a/src/common/plugstack.h b/src/common/plugstack.h
index c9af7a1ef5d40dc957830f54d127a42926b1ec9f..2e480ab3f4209a6a5386b83a349d85e3e254a459 100644
--- a/src/common/plugstack.h
+++ b/src/common/plugstack.h
@@ -67,6 +67,10 @@ struct spank_launcher_job_info {
 
 int spank_init (slurmd_job_t *job);
 
+int spank_slurmd_init (void);
+
+int spank_job_prolog (uint32_t jobid, uid_t uid);
+
 int spank_init_allocator (void);
 
 int spank_init_post_opt (void);
@@ -83,6 +87,10 @@ int spank_task_post_fork (slurmd_job_t *job, int taskid);
 
 int spank_task_exit (slurmd_job_t *job, int taskid);
 
+int spank_job_epilog (uint32_t jobid, uid_t uid);
+
+int spank_slurmd_exit (void);
+
 int spank_fini (slurmd_job_t *job);
 
 /*
@@ -132,12 +140,6 @@ int spank_print_options (FILE *fp, int width, int left_pad);
  */
 int spank_set_remote_options (job_options_t options);
 
-/*  Set all registered remote options (i.e. those passed to
- *   spank_process_option) in the current environment for later
- *   retreival by spank_get_remote_options_env().
- */
-int spank_set_remote_options_env (void);
-
 /*  Register any remote spank options that exist in `options'
  *    to their respective spank plugins. This function ends up invoking
  *    all plugin option callbacks, and will fail (return < 0) if
@@ -158,4 +160,7 @@ int spank_get_remote_options (job_options_t options);
  */
 int spank_get_remote_options_env (char **env);
 
+/*  Clear any spank remote options encoded in environment.
+ */
+int spank_clear_remote_options_env (char **env);
 #endif /* !_PLUGSTACK_H */
diff --git a/src/slurmd/common/run_script.c b/src/slurmd/common/run_script.c
index 9f91b38442d1cf14bcffe34cf1ca825690c93c7a..5ec73052eb36201ed9714b23e246e405ba74a5e1 100644
--- a/src/slurmd/common/run_script.c
+++ b/src/slurmd/common/run_script.c
@@ -57,6 +57,46 @@
 
 #include "src/slurmd/common/run_script.h"
 
+/*
+ *  Same as waitpid(2) but kill process group for pid after timeout secs.
+ *   Returns 0 for valid status in pstatus, -1 on failure of waitpid(2).
+ */
+int waitpid_timeout (const char *name, pid_t pid, int *pstatus, int timeout)
+{
+	int timeout_ms = 1000 * timeout; /* timeout in ms                   */
+	int max_delay =  1000;           /* max delay between waitpid calls */
+	int delay = 10;                  /* initial delay                   */
+	int rc;
+	int options = WNOHANG;
+
+	if (timeout <= 0)
+		options = 0;
+
+	while ((rc = waitpid (pid, pstatus, options)) <= 0) {
+		if (rc < 0) {
+			if (errno == EINTR)
+				continue;
+			error("waidpid: %m");
+			return (-1);
+		}
+		else if (timeout_ms <= 0) {
+			info ("%s%stimeout after %ds: killing pgid %d",
+			      name != NULL ? name : "",
+			      name != NULL ? ": " : "",
+			      timeout, pid);
+			killpg(pid, SIGKILL);
+			options = 0;
+		}
+		else {
+			poll(NULL, 0, delay);
+			timeout_ms -= delay;
+			delay = MIN (timeout_ms, MIN(max_delay, delay*2));
+		}
+	}
+
+	killpg(pid, SIGKILL);  /* kill children too */
+	return (0);
+}
 
 /*
  * Run a prolog or epilog script (does NOT drop privileges)
@@ -72,7 +112,7 @@ static int
 run_one_script(const char *name, const char *path, uint32_t jobid,
 	   int max_wait, char **env)
 {
-	int status, rc, opt;
+	int status;
 	pid_t cpid;
 
 	xassert(env);
@@ -110,31 +150,9 @@ run_one_script(const char *name, const char *path, uint32_t jobid,
 		exit(127);
 	}
 
-	if (max_wait < 0)
-		opt = 0;
-	else
-		opt = WNOHANG;
-
-	while (1) {
-		rc = waitpid(cpid, &status, opt);
-		if (rc < 0) {
-			if (errno == EINTR)
-				continue;
-			error("waidpid: %m");
-			return 0;
-		} else if (rc == 0) {
-			sleep(1);
-			if ((--max_wait) == 0) {
-				killpg(cpid, SIGKILL);
-				opt = 0;
-			}
-		} else  {
-			killpg(cpid, SIGKILL);	/* kill children too */
-			return status;
-		}
-	}
-
-	/* NOTREACHED */
+	if (waitpid_timeout(name, cpid, &status, max_wait) < 0)
+		return (-1);
+	return status;
 }
 
 static void _xfree_f (void *x)
diff --git a/src/slurmd/common/run_script.h b/src/slurmd/common/run_script.h
index 82463410e8611083df3ca4d18dd8b0f1c06ba8f3..635487082eb153601b78cb68f01bbadcaf8ff427 100644
--- a/src/slurmd/common/run_script.h
+++ b/src/slurmd/common/run_script.h
@@ -43,6 +43,17 @@
 #include <sys/types.h>
 #include <inttypes.h>
 
+/*
+ *  Same as waitpid(2) but kill process group for pid after timeout secs.
+ *   name    IN: name or class of program we're waiting on (for log messages)
+ *   pid     IN: child on which to call waitpid(2)
+ *   pstatus IN: pointer to integer status
+ *   timeout IN: timeout in seconds
+ *
+ *  Returns 0 for valid status in pstatus, -1 on failure of waitpid(2).
+ */
+int waitpid_timeout (const char *name, pid_t pid, int *pstatus, int timeout);
+
 /*
  * Run a prolog or epilog script (does NOT drop privileges)
  * name IN: class of program (prolog, epilog, etc.),
diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c
index cc8d5fe1a9621aa4278fd6bd99a89d0617746c82..8816f6b57a22f17f6901979ad75b68f64e1fd957 100644
--- a/src/slurmd/slurmd/req.c
+++ b/src/slurmd/slurmd/req.c
@@ -79,6 +79,7 @@
 #include "src/common/util-net.h"
 #include "src/common/xstring.h"
 #include "src/common/xmalloc.h"
+#include "src/common/plugstack.h"
 
 #include "src/slurmd/slurmd/slurmd.h"
 #include "src/slurmd/slurmd/reverse_tree_math.h"
@@ -382,6 +383,19 @@ slurmd_req(slurm_msg_t *msg)
 	}
 	return;
 }
+static int _send_slurmd_conf_lite (int fd, slurmd_conf_t *cf)
+{
+	int len;
+	Buf buffer = init_buf(0);
+	pack_slurmd_conf_lite(cf, buffer);
+	len = get_buf_offset(buffer);
+	safe_write(fd, &len, sizeof(int));
+	safe_write(fd, get_buf_data(buffer), len);
+	free_buf(buffer);
+	return (0);
+ rwfail:
+	return (-1);
+}
 
 static int
 _send_slurmstepd_init(int fd, slurmd_step_type_t type, void *req,
@@ -476,12 +490,8 @@ _send_slurmstepd_init(int fd, slurmd_step_type_t type, void *req,
 	safe_write(fd, &parent_addr, sizeof(slurm_addr_t));
 
 	/* send conf over to slurmstepd */
-	buffer = init_buf(0);
-	pack_slurmd_conf_lite(conf, buffer);
-	len = get_buf_offset(buffer);
-	safe_write(fd, &len, sizeof(int));
-	safe_write(fd, get_buf_data(buffer), len);
-	free_buf(buffer);
+	if (_send_slurmd_conf_lite(fd, conf) < 0)
+		goto rwfail;
 
 	/* send cli address over to slurmstepd */
 	buffer = init_buf(0);
@@ -652,16 +662,8 @@ _forkexec_slurmstepd(slurmd_step_type_t type, void *req,
 			error("close read to_slurmd in parent: %m");
 		return rc;
 	} else {
-		char slurm_stepd_path[MAXPATHLEN];
-		char *const argv[2] = { slurm_stepd_path, NULL};
+		char *const argv[2] = { (char *)conf->stepd_loc, NULL};
 		int failed = 0;
-		if (conf->stepd_loc) {
-			snprintf(slurm_stepd_path, sizeof(slurm_stepd_path),
-				 "%s", conf->stepd_loc);
-		} else {
-			snprintf(slurm_stepd_path, sizeof(slurm_stepd_path),
-				 "%s/sbin/slurmstepd", SLURM_PREFIX);
-		}
 		/* inform slurmstepd about our config */
 		setenv("SLURM_CONF", conf->conffile, 1);
 
@@ -3820,6 +3822,89 @@ _destroy_env(char **env)
 	return;
 }
 
+static int
+run_spank_job_script (const char *mode, char **env)
+{
+	pid_t cpid;
+	int status = 0;
+	int pfds[2];
+
+	if (pipe (pfds) < 0) {
+		error ("run_spank_job_script: pipe: %m");
+		return (-1);
+	}
+
+	fd_set_close_on_exec (pfds[1]);
+
+	if ((cpid = fork ()) < 0) {
+		error ("executing spank %s: %m", mode);
+		return (-1);
+	}
+	if (cpid == 0) {
+		/* Run slurmstepd spank [prolog|epilog] */
+		char *argv[4] = {
+			(char *) conf->stepd_loc,
+			"spank",
+			(char *) mode,
+			NULL };
+
+		/* Set the correct slurm.conf location */
+		setenvf (&env, "SLURM_CONF", conf->conffile);
+
+		if (dup2 (pfds[0], STDIN_FILENO) < 0)
+			fatal ("dup2: %m");
+#ifdef SETPGRP_TWO_ARGS
+                setpgrp(0, 0);
+#else
+                setpgrp();
+#endif
+		info ("Calling %s %s %s\n", argv[0], argv[1], argv[2]);
+		execve (argv[0], argv, env);
+		error ("execve: %m");
+		exit (127);
+	}
+
+	close (pfds[0]);
+
+	if (_send_slurmd_conf_lite (pfds[1], conf) < 0)
+		error ("Failed to send slurmd conf to slurmstepd\n");
+	close (pfds[1]);
+
+	/*
+	 *  Wait for up to 120s for all spank plugins to complete:
+	 */
+	if (waitpid_timeout (mode, cpid, &status, 120) < 0) {
+		error ("spank/%s timed out after 120s", mode);
+		return (-1);
+	}
+
+	if (status)
+		error ("spank/%s returned status 0x%04x", mode, status);
+
+	/*
+	 *  No longer need SPANK option env vars in environment
+	 */
+	spank_clear_remote_options_env (env);
+
+	return (status);
+}
+
+static int _run_job_script(const char *name, const char *path,
+		uint32_t jobid, int timeout, char **env)
+{
+	int status, rc;
+	/*
+	 *  Always run both spank prolog/epilog and real prolog/epilog script,
+	 *   even if spank plugins fail. (May want to alter this in the future)
+	 *   If both "script" mechanisms fail, prefer to return the "real"
+	 *   prolog/epilog status.
+	 */
+	status = run_spank_job_script(name, env);
+	if ((rc = run_script(name, path, jobid, timeout, env)))
+		status = rc;
+	return (status);
+}
+
 #ifdef HAVE_BG
 /* a slow prolog is expected on bluegene systems */
 static int
@@ -3836,7 +3921,7 @@ _run_prolog(uint32_t jobid, uid_t uid, char *resv_id,
 	slurm_mutex_unlock(&conf->config_mutex);
 	_add_job_running_prolog(jobid);
 
-	rc = run_script("prolog", my_prolog, jobid, -1, my_env);
+	rc = _run_job_script("prolog", my_prolog, jobid, -1, my_env);
 	_remove_job_running_prolog(jobid);
 	xfree(my_prolog);
 	_destroy_env(my_env);
@@ -3911,7 +3996,7 @@ _run_prolog(uint32_t jobid, uid_t uid, char *resv_id,
 	timer_struct.timer_cond  = &timer_cond;
 	timer_struct.timer_mutex = &timer_mutex;
 	pthread_create(&timer_id, &timer_attr, &_prolog_timer, &timer_struct);
-	rc = run_script("prolog", my_prolog, jobid, -1, my_env);
+	rc = _run_job_script("prolog", my_prolog, jobid, -1, my_env);
 	slurm_mutex_lock(&timer_mutex);
 	prolog_fini = true;
 	pthread_cond_broadcast(&timer_cond);
@@ -3950,7 +4035,7 @@ _run_epilog(uint32_t jobid, uid_t uid, char *resv_id,
 	slurm_mutex_unlock(&conf->config_mutex);
 
 	_wait_for_job_running_prolog(jobid);
-	error_code = run_script("epilog", my_epilog, jobid, -1, my_env);
+	error_code = _run_job_script("epilog", my_epilog, jobid, -1, my_env);
 	xfree(my_epilog);
 	_destroy_env(my_env);
 
diff --git a/src/slurmd/slurmd/slurmd.c b/src/slurmd/slurmd/slurmd.c
index eb387bf540b4173d49a2082a1d8f2eae7872a8cb..f25e7175465f83b8d7b277c6921360f39eab2630 100644
--- a/src/slurmd/slurmd/slurmd.c
+++ b/src/slurmd/slurmd/slurmd.c
@@ -92,6 +92,7 @@
 #include "src/common/xmalloc.h"
 #include "src/common/xstring.h"
 #include "src/common/xsignal.h"
+#include "src/common/plugstack.h"
 
 #include "src/slurmd/slurmd/slurmd.h"
 #include "src/slurmd/slurmd/req.h"
@@ -1307,6 +1308,14 @@ _process_cmdline(int ac, char **av)
 			break;
 		}
 	}
+
+	/*
+	 *  If slurmstepd path wasn't overridden by command line, set
+	 *   it to the default here:
+	 */
+	if (!conf->stepd_loc)
+		conf->stepd_loc =
+			xstrdup_printf("%s/sbin/slurmstepd", SLURM_PREFIX);
 }
 
 
@@ -1345,7 +1354,6 @@ _slurmd_init(void)
 	struct rlimit rlim;
 	slurm_ctl_conf_t *cf;
 	struct stat stat_buf;
-	char slurm_stepd_path[MAXPATHLEN];
 	uint32_t cpu_cnt;
 
 	/*
@@ -1398,6 +1406,8 @@ _slurmd_init(void)
 		return SLURM_FAILURE;
 	if (slurm_auth_init(NULL) != SLURM_SUCCESS)
 		return SLURM_FAILURE;
+	if (spank_slurmd_init() < 0)
+		return SLURM_FAILURE;
 
 	if (getrlimit(RLIMIT_CPU, &rlim) == 0) {
 		rlim.rlim_cur = rlim.rlim_max;
@@ -1496,21 +1506,10 @@ _slurmd_init(void)
 	fd_set_close_on_exec(devnull);
 
 	/* make sure we have slurmstepd installed */
-	if (conf->stepd_loc) {
-		snprintf(slurm_stepd_path, sizeof(slurm_stepd_path),
-			 "%s", conf->stepd_loc);
-	} else {
-		snprintf(slurm_stepd_path, sizeof(slurm_stepd_path),
-			 "%s/sbin/slurmstepd", SLURM_PREFIX);
-	}
-	if (stat(slurm_stepd_path, &stat_buf)) {
-		fatal("Unable to find slurmstepd file at %s",
-			slurm_stepd_path);
-	}
-	if (!S_ISREG(stat_buf.st_mode)) {
-		fatal("slurmstepd not a file at %s",
-			slurm_stepd_path);
-	}
+	if (stat(conf->stepd_loc, &stat_buf))
+		fatal("Unable to find slurmstepd file at %s", conf->stepd_loc);
+	if (!S_ISREG(stat_buf.st_mode))
+		fatal("slurmstepd not a file at %s", conf->stepd_loc);
 
 	return SLURM_SUCCESS;
 }
@@ -1581,6 +1580,7 @@ _slurmd_fini(void)
 	fini_setproctitle();
 	slurm_select_fini();
 	slurm_jobacct_gather_fini();
+	spank_slurmd_exit();
 	return SLURM_SUCCESS;
 }
 
diff --git a/src/slurmd/slurmd/slurmd.h b/src/slurmd/slurmd/slurmd.h
index 1e07aa4e1be8fbcc86727848f674368b1ba88aaf..d5eb54186efc0cd6ca2374839a5b4a479437d8bd 100644
--- a/src/slurmd/slurmd/slurmd.h
+++ b/src/slurmd/slurmd/slurmd.h
@@ -111,7 +111,7 @@ typedef struct slurmd_config {
 	char         *pubkey;		/* location of job cred public key */
 	char         *epilog;		/* Path to Epilog script	   */
 	char         *prolog;		/* Path to prolog script           */
-	char         *stepd_loc;	/* Non-standard slurmstepd path    */
+	char         *stepd_loc;	/* slurmstepd path                 */
 	char         *task_prolog;	/* per-task prolog script          */
 	char         *task_epilog;	/* per-task epilog script          */
 	int           port;		/* local slurmd port               */
diff --git a/src/slurmd/slurmstepd/slurmstepd.c b/src/slurmd/slurmstepd/slurmstepd.c
index ac4f3c9f1a6f6eab97ce6d16eeb2d7153bf74603..1ffaa34bc3b76f74ffab5038f9e4dab42b7feab6 100644
--- a/src/slurmd/slurmstepd/slurmstepd.c
+++ b/src/slurmd/slurmstepd/slurmstepd.c
@@ -54,6 +54,7 @@
 #include "src/common/switch.h"
 #include "src/common/xmalloc.h"
 #include "src/common/xsignal.h"
+#include "src/common/plugstack.h"
 
 #include "src/slurmd/common/slurmstepd_init.h"
 #include "src/slurmd/common/setproctitle.h"
@@ -76,6 +77,7 @@ static slurmd_job_t *_step_setup(slurm_addr_t *cli, slurm_addr_t *self,
 #ifdef MEMORY_LEAK_DEBUG
 static void _step_cleanup(slurmd_job_t *job, slurm_msg_t *msg, int rc);
 #endif
+static int process_cmdline (int argc, char *argv[]);
 
 int slurmstepd_blocked_signals[] = {
 	SIGPIPE, 0
@@ -96,11 +98,9 @@ main (int argc, char *argv[])
 	gid_t *gids;
 	int rc = 0;
 
-	if ((argc == 2) && (strcmp(argv[1], "getenv") == 0)) {
-		print_rlimits();
-		_dump_user_env();
-		exit(0);
-	}
+	if (process_cmdline (argc, argv) < 0)
+		fatal ("Error in slurmstepd command line");
+
 	xsignal_block(slurmstepd_blocked_signals);
 	conf = xmalloc(sizeof(*conf));
 	conf->argv = &argv;
@@ -178,6 +178,148 @@ ending:
 	return rc;
 }
 
+
+static slurmd_conf_t * read_slurmd_conf_lite (int fd)
+{
+	int rc;
+	int len;
+	Buf buffer;
+	slurmd_conf_t *confl;
+
+	/*  First check to see if we've already initialized the
+	 *   global slurmd_conf_t in 'conf'. Allocate memory if not.
+	 */
+	confl = conf ? conf : xmalloc (sizeof (*confl));
+
+	safe_read(fd, &len, sizeof(int));
+
+	buffer = init_buf(len);
+	safe_read(fd, buffer->head, len);
+
+	rc = unpack_slurmd_conf_lite_no_alloc(confl, buffer);
+	if (rc == SLURM_ERROR)
+		fatal("slurmstepd: problem with unpack of slurmd_conf");
+
+	free_buf(buffer);
+
+	confl->log_opts.stderr_level = confl->debug_level;
+	confl->log_opts.logfile_level = confl->debug_level;
+	confl->log_opts.syslog_level = confl->debug_level;
+	/*
+	 * If daemonizing, turn off stderr logging -- also, if
+	 * logging to a file, turn off syslog.
+	 *
+	 * Otherwise, if remaining in foreground, turn off logging
+	 * to syslog (but keep logfile level)
+	 */
+	if (confl->daemonize) {
+		confl->log_opts.stderr_level = LOG_LEVEL_QUIET;
+		if (confl->logfile)
+			confl->log_opts.syslog_level = LOG_LEVEL_QUIET;
+	} else
+		confl->log_opts.syslog_level  = LOG_LEVEL_QUIET;
+
+	return (confl);
+rwfail:
+	return (NULL);
+}
+
+static int get_jobid_uid_from_env (uint32_t *jobidp, uid_t *uidp)
+{
+	const char *val;
+	char *p;
+
+	if (!(val = getenv ("SLURM_JOBID")))
+		return error ("Unable to get SLURM_JOBID in env!");
+
+	*jobidp = (uint32_t) strtoul (val, &p, 10);
+	if (*p != '\0')
+		return error ("Invalid SLURM_JOBID=%s", val);
+
+	if (!(val = getenv ("SLURM_UID")))
+		return error ("Unable to get SLURM_UID in env!");
+
+	*uidp = (uid_t) strtoul (val, &p, 10);
+	if (*p != '\0')
+		return error ("Invalid SLURM_UID=%s", val);
+
+	return (0);
+}
+
+static int handle_spank_mode (int argc, char *argv[])
+{
+	char prefix[64] = "spank-";
+	const char *mode = argv[2];
+	uid_t uid = (uid_t) -1;
+	uint32_t jobid = (uint32_t) -1;
+	log_options_t lopts = LOG_OPTS_INITIALIZER;
+
+	/*
+	 *  Not necessary to log to syslog
+	 */
+	lopts.syslog_level = LOG_LEVEL_QUIET;
+
+	/*
+	 *  Make our log prefix into spank-prolog: or spank-epilog:
+	 */
+	strcat (prefix, mode);
+	log_init(prefix, lopts, LOG_DAEMON, NULL);
+
+	/*
+	 *  When we are started from slurmd, a lightweight config is
+	 *   sent over the stdin fd. If we are able to read this conf
+	 *   use it to reinitialize the log.
+	 *  It is not a fatal error if we fail to read the conf file.
+	 *   This could happen if slurmstepd is run standalone for
+	 *   testing.
+	 */
+	if ((conf = read_slurmd_conf_lite (STDIN_FILENO)))
+		log_alter (conf->log_opts, 0, conf->logfile);
+	close (STDIN_FILENO);
+
+	if (slurm_conf_init(NULL) != SLURM_SUCCESS)
+		return error ("Failed to read slurm config");
+
+	if (get_jobid_uid_from_env (&jobid, &uid) < 0)
+		return error ("spank environment invalid");
+
+	verbose ("Running spank/%s for jobid [%u] uid [%u]",
+		mode, jobid, uid);
+
+	if (strcmp (mode, "prolog") == 0) {
+		if (spank_job_prolog (jobid, uid) < 0)
+			return (-1);
+	}
+	else if (strcmp (mode, "epilog") == 0) {
+		if (spank_job_epilog (jobid, uid) < 0)
+			return (-1);
+	}
+	else {
+		error ("Invalid mode %s specified!", mode);
+		return (-1);
+	}
+	return (0);
+}
+
+/*
+ *  Process special "modes" of slurmstepd passed as cmdline arguments.
+ */
+static int process_cmdline (int argc, char *argv[])
+{
+	if ((argc == 2) && (strcmp(argv[1], "getenv") == 0)) {
+		print_rlimits();
+		_dump_user_env();
+		exit(0);
+	}
+	if ((argc == 3) && (strcmp(argv[1], "spank") == 0)) {
+		if (handle_spank_mode (argc, argv) < 0)
+			exit (1);
+		exit (0);
+	}
+	return (0);
+}
+
+
 static void
 _send_ok_to_slurmd(int sock)
 {
@@ -243,31 +385,8 @@ _init_from_slurmd(int sock, char **argv,
 	pthread_mutex_unlock(&step_complete.lock);
 
 	/* receive conf from slurmd */
-	safe_read(sock, &len, sizeof(int));
-	incoming_buffer = xmalloc(len);
-	safe_read(sock, incoming_buffer, len);
-	buffer = create_buf(incoming_buffer, len);
-	if (unpack_slurmd_conf_lite_no_alloc(conf, buffer) == SLURM_ERROR)
-		fatal("slurmstepd: problem with unpack of slurmd_conf");
-	free_buf(buffer);
-
-	conf->log_opts.stderr_level = conf->debug_level;
-	conf->log_opts.logfile_level = conf->debug_level;
-	conf->log_opts.syslog_level = conf->debug_level;
-
-	/*
-	 * If daemonizing, turn off stderr logging -- also, if
-	 * logging to a file, turn off syslog.
-	 *
-	 * Otherwise, if remaining in foreground, turn off logging
-	 * to syslog (but keep logfile level)
-	 */
-	if (conf->daemonize) {
-		conf->log_opts.stderr_level = LOG_LEVEL_QUIET;
-		if (conf->logfile)
-			conf->log_opts.syslog_level = LOG_LEVEL_QUIET;
-	} else
-		conf->log_opts.syslog_level  = LOG_LEVEL_QUIET;
+	if ((conf = read_slurmd_conf_lite (sock)) == NULL)
+		fatal("Failed to read conf from slurmd");
 	log_alter(conf->log_opts, 0, conf->logfile);
 
 	debug2("debug level is %d.", conf->debug_level);