diff --git a/NEWS b/NEWS
index 0d58865a571bc3049550cbd62126f32bf3c37547..19bd03ca71f17966889b76cd3c4d33d7d5131649 100644
--- a/NEWS
+++ b/NEWS
@@ -9,6 +9,11 @@ documents those changes that are of interest to users and admins.
  -- Change behavior of "scancel -s KILL <jobid>" to send SIGKILL to all job
     steps rather than cancelling the job. This now matches the behavior of
     all other signals. "scancel <jobid>" still cancels the job and all steps.
+ -- Add support for new job step options --exclusive and --immediate. Permit
+    job steps to be queued when resources are not available within an existing 
+    job allocation to dedicate the resources to the job step. Useful for
+    executing simultaneous job steps. Provides resource management both at 
+    the level of jobs and job steps.
 
 * Changes in SLURM 1.3.0-pre2
 =============================
diff --git a/RELEASE_NOTES b/RELEASE_NOTES
index a8b637f62f279edd48170a72051242c25c011b8b..06283145a60da526541a27f298d9fc3c100707c8 100644
--- a/RELEASE_NOTES
+++ b/RELEASE_NOTES
@@ -16,6 +16,14 @@ COMMAND CHANGES
   sattach - Attach to an existing job step (functions like "srun --attach")
   sbatch  - Submit a batch job script (functions like "srun --batch")
   See the individual man pages for more information. 
+* The slaunch command has been removed. Use the srun command instead.
+* The srun option --exclusive has been added for job steps to be 
+  allocated processors not already assigned to other job steps. This 
+  can be used to execute multiple job steps simultaneously within a 
+  job allocation and have SLURM perform resource management for the 
+  job steps much like it does for jobs. If dedicated resources are 
+  not immediately available, the job step will be executed later 
+  unless the --immediate option is also set.
 
 CONFIGURATION FILE CHANGES
 
diff --git a/doc/man/man1/srun.1 b/doc/man/man1/srun.1
index 5f9a68b173b67422de98444b377c976e59072ba4..61a6e315895ad4ee5cf4024f177ca81c9d8b3814 100644
--- a/doc/man/man1/srun.1
+++ b/doc/man/man1/srun.1
@@ -238,8 +238,17 @@ parameter in slurm.conf.
 
 .TP
 \fB\-\-exclusive\fR
-Dedicate whole nodes to the job rather than individual processors 
-even if consumable resources are enabled 
+When used to initiate a job step within an existing resource allocation, 
+proceed only when processors can be dedicated to the job step without 
+sharing with other job steps. This can be used to initiate many 
+job steps simultaneously withn an existing job allocation and have 
+SLURM perform resource management for the job. 
+In this mode, use with the \fB\-\-ntasks\fR option and NOT the 
+\fB\-\-nodes\fR, \fB\-\-relative\fR, \fB\-\-relative\fR=\fIarbitrary\fR
+options (which provide user control over task layout).
+See \fBEXAMPLE\fR below.
+When used to initiate a job, dedicate whole nodes to the job rather 
+than individual processors even if consumable resources are enabled 
 (e.g. \fBSelectType=select/cons_res\fR).
 
 .TP
@@ -329,7 +338,7 @@ The \fB\-\-label\fR option will prepend lines of output with the remote
 task id.
 
 .TP
-\fB\-m\fR, \fB\-\-distribution\fR=
+\fB\-m\fR, \\fB\-\-relative\fR
 (\fIblock\fR|\fIcyclic\fR|\fIarbitrary\fR|\fIplane=<options>\fR)
 Specify an alternate distribution method for remote processes.
 .RS
@@ -1477,6 +1486,22 @@ dedicated to the job.
 
 > srun \-N2 \-B 4\-4:2\-2 a.out
 .fi
+.PP
+This example shows a script in which Slurm is used to provide resource 
+management for a job by executing the various job steps as processors 
+become available for their dedicated use.
+
+.nf
+
+> cat my.script
+#!/bin/bash
+srun \-\-exclusive \-n4 prog1 &
+srun \-\-exclusive \-n3 prog2 &
+srun \-\-exclusive \-n1 prog3 &
+srun \-\-exclusive \-n1 prog4 &
+wait
+.fi
+
 
 .SH "COPYING"
 Copyright (C) 2006\-2007 The Regents of the University of California.
diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h
index 2c33c79490379c17f32a0833ef574d5457a6e985..60f7162f6e98587712ac92399f6a3106d8e0d7d2 100644
--- a/src/common/slurm_protocol_defs.h
+++ b/src/common/slurm_protocol_defs.h
@@ -383,6 +383,9 @@ typedef struct job_step_specs {
 				   SLURM_DIST_PLANE */
 	uint16_t port;		/* port to contact initiating srun */
 	uint16_t ckpt_interval;	/* checkpoint creation interval (minutes) */
+	uint16_t exclusive;	/* 1 if CPUs not shared with other steps */
+	uint16_t immediate;	/* 1 if allocate to run or fail immediately,
+				 * 0 if to be queued awaiting resources */
 	char *host;		/* host to contact initiating srun */
 	char *node_list;	/* list of required nodes */
 	char *network;		/* network use spec */
diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c
index 517f81f542159a491a3ad548baf55c57a08bdce4..daeab6ebe6a10077c90a07fd2a956aea855b6aa6 100644
--- a/src/common/slurm_protocol_pack.c
+++ b/src/common/slurm_protocol_pack.c
@@ -1511,6 +1511,8 @@ _pack_job_step_create_request_msg(job_step_create_request_msg_t
 	pack16(msg->plane_size, buffer);
 	pack16(msg->port, buffer);
 	pack16(msg->ckpt_interval, buffer);
+	pack16(msg->exclusive, buffer);
+	pack16(msg->immediate, buffer);
 
 	packstr(msg->host, buffer);
 	packstr(msg->name, buffer);
@@ -1543,6 +1545,8 @@ _unpack_job_step_create_request_msg(job_step_create_request_msg_t ** msg,
 	safe_unpack16(&(tmp_ptr->plane_size), buffer);
 	safe_unpack16(&(tmp_ptr->port), buffer);
 	safe_unpack16(&(tmp_ptr->ckpt_interval), buffer);
+	safe_unpack16(&(tmp_ptr->exclusive), buffer);
+	safe_unpack16(&(tmp_ptr->immediate), buffer);
 
 	safe_unpackstr_xmalloc(&(tmp_ptr->host), &uint16_tmp, buffer);
 	safe_unpackstr_xmalloc(&(tmp_ptr->name), &uint16_tmp, buffer);
diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c
index 7ee952dd21e2f382f6e21143136e7447ad8ee21c..94b9a606c8a6680ce1a536c7fe8d445ee2bf85d6 100644
--- a/src/slurmctld/job_mgr.c
+++ b/src/slurmctld/job_mgr.c
@@ -714,10 +714,6 @@ static int _load_job_state(Buf buffer)
 	job_ptr->mail_user         = mail_user;
 	mail_user = NULL;	/* reused, nothing left to free */
 	job_ptr->select_jobinfo = select_jobinfo;
-
-	build_node_details(job_ptr);	/* set: num_cpu_groups, cpus_per_node, 
-					 *	cpu_count_reps, node_cnt, and
-					 *	node_addr */
 	info("recovered job id %u", job_id);
 
 	safe_unpack16(&step_flag, buffer);
@@ -727,6 +723,9 @@ static int _load_job_state(Buf buffer)
 		safe_unpack16(&step_flag, buffer);
 	}
 
+	build_node_details(job_ptr);	/* set: num_cpu_groups, cpus_per_node,
+					 *  cpu_count_reps, node_cnt,
+					 *  node_addr, alloc_lps, used_lps */
 	return SLURM_SUCCESS;
 
 unpack_error:
@@ -2756,6 +2755,7 @@ static void _list_delete_job(void *job_entry)
 	xfree(job_ptr->mail_user);
 	xfree(job_ptr->network);
 	xfree(job_ptr->alloc_lps);
+	xfree(job_ptr->used_lps);
 	xfree(job_ptr->comment);
 	select_g_free_jobinfo(&job_ptr->select_jobinfo);
 	if (job_ptr->step_list) {
@@ -3200,7 +3200,8 @@ static void _reset_step_bitmaps(struct job_record *job_ptr)
 			      job_ptr->job_id, step_ptr->step_id);
 			delete_step_record (job_ptr, step_ptr->step_id);
 		}
-	}		
+		step_alloc_lps(step_ptr);
+	}
 
 	list_iterator_destroy (step_iterator);
 	return;
diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c
index 646316289a00097d065f6f28be556cba80e718a1..79b2b04cb3f44f83551266d6a2fd9a525f5d8245 100644
--- a/src/slurmctld/node_scheduler.c
+++ b/src/slurmctld/node_scheduler.c
@@ -1415,6 +1415,7 @@ extern void build_node_details(struct job_record *job_ptr)
 		job_ptr->node_addr = NULL;
 		job_ptr->alloc_lps_cnt = 0;
 		xfree(job_ptr->alloc_lps);
+		xfree(job_ptr->used_lps);
 		return;
 	}
 
@@ -1436,6 +1437,8 @@ extern void build_node_details(struct job_record *job_ptr)
 	job_ptr->alloc_lps_cnt = job_ptr->node_cnt;
 	xrealloc(job_ptr->alloc_lps,
 		(sizeof(uint32_t) * job_ptr->node_cnt));
+	xrealloc(job_ptr->used_lps,
+		(sizeof(uint32_t) * job_ptr->node_cnt));
 
 	while ((this_node_name = hostlist_shift(host_list))) {
 		node_ptr = find_node_record(this_node_name);
@@ -1460,11 +1463,13 @@ extern void build_node_details(struct job_record *job_ptr)
 				&usable_lps);
 			if (error_code == SLURM_SUCCESS) {
 				if (job_ptr->alloc_lps) {
+					job_ptr->used_lps[cr_count] = 0;
 					job_ptr->alloc_lps[cr_count++] =
 								usable_lps;
 				}
 			} else {
-				xfree(job_ptr->alloc_lps); 
+				xfree(job_ptr->alloc_lps);
+				xfree(job_ptr->used_lps); 
 				job_ptr->alloc_lps_cnt = 0;
 				error("Unable to get extra jobinfo "
 				      "from JobId=%u", job_ptr->job_id);
diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c
index 10baa55a5d19d4f0491dd819ab7a00087ae1bd4f..3c418bcb84e1893a58eafd77df72c5f61ffec6bb 100644
--- a/src/slurmctld/proc_req.c
+++ b/src/slurmctld/proc_req.c
@@ -2436,6 +2436,8 @@ int _launch_batch_step(job_desc_msg_t *job_desc_msg, uid_t uid,
 	req_step_msg.network = NULL;
 	req_step_msg.node_list = NULL;
 	req_step_msg.ckpt_interval = 0;
+	req_step_msg.exclusive = 0;
+	req_step_msg.immediate = 0;
 
 	error_code = step_create(&req_step_msg, &step_rec, false, true);
 	xfree(req_step_msg.node_list);	/* may be set by step_create */
diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h
index 806441101944812e6d9088a01f0699c33b20acb4..6a7fcc07824092e40b0d807b821aba3a3bce3d3e 100644
--- a/src/slurmctld/slurmctld.h
+++ b/src/slurmctld/slurmctld.h
@@ -381,6 +381,8 @@ struct job_record {
 					 * for the credentials */
         uint32_t *alloc_lps;		/* number of logical processors
 					 * allocated for this job */
+	uint32_t *used_lps;		/* number of logical processors
+					 * already allocated to job steps */
 	uint16_t mail_type;		/* see MAIL_JOB_* in slurm.h */
 	char *mail_user;		/* user to get e-mail notification */
 	uint32_t requid;            	/* requester user ID */
@@ -408,6 +410,7 @@ struct 	step_record {
 	char *host;			/* host for srun communications */
 	uint16_t batch_step;		/* 1 if batch job step, 0 otherwise */
 	uint16_t ckpt_interval;		/* checkpoint interval in minutes */
+	uint16_t exclusive;	/* FIXME */
 	time_t ckpt_time;		/* time of last checkpoint */
 	switch_jobinfo_t switch_job;	/* switch context, opaque */
 	check_jobinfo_t check_job;	/* checkpoint context, opaque */
@@ -1238,6 +1241,9 @@ extern int slurmctld_shutdown(void);
 /* Perform periodic job step checkpoints (per user request) */
 extern void step_checkpoint(void);
 
+/* Update a job's record of allocated CPUs when a job step gets scheduled */
+extern void step_alloc_lps(struct step_record *step_ptr);
+
 /*
  * step_create - creates a step_record in step_specs->job_id, sets up the
  *	according to the step_specs.
diff --git a/src/slurmctld/step_mgr.c b/src/slurmctld/step_mgr.c
index 9cb191b89e22c0b32720bfccb62ddfa24fd6cda8..fba82c8886542a805dd1e4f5e3f67a9cbaf70882 100644
--- a/src/slurmctld/step_mgr.c
+++ b/src/slurmctld/step_mgr.c
@@ -67,16 +67,18 @@
 #include "src/slurmctld/slurmctld.h"
 #include "src/slurmctld/srun_comm.h"
 
+#define STEP_DEBUG 0
 #define MAX_RETRIES 10
 
 static void _pack_ctld_job_step_info(struct step_record *step, Buf buffer);
 static bitstr_t * _pick_step_nodes (struct job_record  *job_ptr, 
-				    job_step_create_request_msg_t *step_spec );
+				    job_step_create_request_msg_t *step_spec,
+				    bool batch_step, int *return_code);
 static hostlist_t _step_range_to_hostlist(struct step_record *step_ptr,
 				uint32_t range_first, uint32_t range_last);
 static int _step_hostname_to_inx(struct step_record *step_ptr,
 				char *node_name);
-
+static void _step_dealloc_lps(struct step_record *step_ptr);
 /* 
  * create_step_record - create an empty step_record for the specified job.
  * IN job_ptr - pointer to job table entry to have step record added
@@ -217,6 +219,8 @@ dump_step_desc(job_step_create_request_msg_t *step_spec)
 	debug3("   host=%s port=%u name=%s network=%s checkpoint=%u", 
 		step_spec->host, step_spec->port, step_spec->name,
 		step_spec->network, step_spec->ckpt_interval);
+	debug3("   exclusive=%u immediate=%u",
+		step_spec->exclusive, step_spec->immediate);
 }
 
 
@@ -374,23 +378,24 @@ int job_step_complete(uint32_t job_id, uint32_t step_id, uid_t uid,
 		info("job_step_complete: invalid job id %u", job_id);
 		return ESLURM_INVALID_JOB_ID;
 	}
-	
+
+	if ((job_ptr->user_id != uid) && (uid != 0) && (uid != getuid())) {
+		error("Security violation, JOB_COMPLETE RPC from uid %d",
+		      uid);
+		return ESLURM_USER_ID_MISSING;
+	}
+
 	step_ptr = find_step_record(job_ptr, step_id);
 	if (step_ptr == NULL) 
 		return ESLURM_INVALID_JOB_ID;
-	else 
-		jobacct_g_step_complete_slurmctld(step_ptr);
-	
+
+	jobacct_g_step_complete_slurmctld(step_ptr);
+	_step_dealloc_lps(step_ptr);
+
 	if ((job_ptr->kill_on_step_done)
 	    &&  (list_count(job_ptr->step_list) <= 1)
 	    &&  (!IS_JOB_FINISHED(job_ptr))) 
 		return job_complete(job_id, uid, requeue, job_return_code);
-	
-	if ((job_ptr->user_id != uid) && (uid != 0) && (uid != getuid())) {
-		error("Security violation, JOB_COMPLETE RPC from uid %d",
-		      uid);
-		return ESLURM_USER_ID_MISSING;
-	}
 
 	last_job_update = time(NULL);
 	error_code = delete_step_record(job_ptr, step_id);
@@ -407,36 +412,76 @@ int job_step_complete(uint32_t job_id, uint32_t step_id, uid_t uid,
  *	we satisfy the super-set of constraints.
  * IN job_ptr - pointer to job to have new step started
  * IN step_spec - job step specification
+ * IN batch_step - if set then step is a batch script
+ * OUT return_code - exit code or SLURM_SUCCESS
  * global: node_record_table_ptr - pointer to global node table
  * NOTE: returns all of a job's nodes if step_spec->node_count == INFINITE
  * NOTE: returned bitmap must be freed by the caller using bit_free()
  */
 static bitstr_t *
 _pick_step_nodes (struct job_record  *job_ptr, 
-		  job_step_create_request_msg_t *step_spec)
+		  job_step_create_request_msg_t *step_spec,
+		  bool batch_step, int *return_code)
 {
 
 	bitstr_t *nodes_avail = NULL, *nodes_idle = NULL;
 	bitstr_t *nodes_picked = NULL, *node_tmp = NULL;
-	int error_code, nodes_picked_cnt = 0, cpus_picked_cnt, i;
-/* 	char *temp; */
+	int error_code, nodes_picked_cnt=0, cpus_picked_cnt, i;
 	ListIterator step_iterator;
 	struct step_record *step_p;
+#if STEP_DEBUG
+	char *temp;
+#endif
 
-	if (job_ptr->node_bitmap == NULL)
+	*return_code = SLURM_SUCCESS;
+	if (job_ptr->node_bitmap == NULL) {
+		*return_code = ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE;
 		return NULL;
+	}
 	
 	nodes_avail = bit_copy (job_ptr->node_bitmap);
 	if (nodes_avail == NULL)
 		fatal("bit_copy malloc failure");
 	bit_and (nodes_avail, up_node_bitmap);
 
+	/* In exclusive mode, just satisfy the processor count.
+	 * Do not use nodes that have no unused CPUs */
+	if (step_spec->exclusive) {
+		int i, j=0, avail, tot_cpus = 0;
+		cpus_picked_cnt = 0;
+		for (i=bit_ffs(job_ptr->node_bitmap); i<node_record_count; 
+		     i++) {
+			if (!bit_test(job_ptr->node_bitmap, i))
+				continue;
+			avail = job_ptr->alloc_lps[j] - job_ptr->used_lps[j];
+			tot_cpus += job_ptr->alloc_lps[j];
+			if ((avail <= 0) ||
+			    (cpus_picked_cnt >= step_spec->cpu_count))
+				bit_clear(nodes_avail, i);
+			else
+				cpus_picked_cnt += avail;
+			if (++j >= job_ptr->node_cnt)
+				break;
+		}
+		if (cpus_picked_cnt >= step_spec->cpu_count)
+			return nodes_avail;
+
+		FREE_NULL_BITMAP(nodes_avail);
+		if (tot_cpus >= step_spec->cpu_count)
+			*return_code = ESLURM_NODES_BUSY;
+		else
+			*return_code = ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE;
+		return NULL;
+	}
+
 	if ( step_spec->node_count == INFINITE)	/* use all nodes */
 		return nodes_avail;
 
 	if (step_spec->node_list) {
 		bitstr_t *selected_nodes = NULL;
-/* 		info("selected nodelist is %s", step_spec->node_list); */
+#if STEP_DEBUG
+		info("selected nodelist is %s", step_spec->node_list);
+#endif
 		error_code = node_name2bitmap(step_spec->node_list, false, 
 					      &selected_nodes);
 		
@@ -532,26 +577,30 @@ _pick_step_nodes (struct job_record  *job_ptr,
 		while ((step_p = (struct step_record *)
 			list_next(step_iterator))) {
 			bit_or(nodes_idle, step_p->step_node_bitmap);
-			/* temp = bitmap2node_name(step_p->step_node_bitmap); */
-/* 			info("step %d has nodes %s", step_p->step_id, temp); */
-/* 			xfree(temp); */
+#if STEP_DEBUG
+			temp = bitmap2node_name(step_p->step_node_bitmap);
+			info("step %d has nodes %s", step_p->step_id, temp);
+			xfree(temp);
+#endif
 		} 
 		list_iterator_destroy (step_iterator);
 		bit_not(nodes_idle);
 		bit_and(nodes_idle, nodes_avail);
 	}
-/* 	temp = bitmap2node_name(nodes_avail); */
-/* 	info("can pick from %s %d", temp, step_spec->node_count); */
-/* 	xfree(temp); */
-/* 	temp = bitmap2node_name(nodes_idle); */
-/* 	info("can pick from %s", temp); */
-/* 	xfree(temp); */
-	
+#if STEP_DEBUG
+	temp = bitmap2node_name(nodes_avail);
+	info("can pick from %s %d", temp, step_spec->node_count);
+	xfree(temp);
+	temp = bitmap2node_name(nodes_idle);
+	info("can pick from %s", temp);
+	xfree(temp);
+#endif
+
 	/* if user specifies step needs a specific processor count and 
 	 * all nodes have the same processor count, just translate this to
 	 * a node count */
-	if (step_spec->cpu_count && (job_ptr->num_cpu_groups == 1)
-	&&  job_ptr->cpus_per_node[0]) {
+	if (step_spec->cpu_count && (job_ptr->num_cpu_groups == 1) && 
+	    job_ptr->cpus_per_node[0]) {
 		i = (step_spec->cpu_count + (job_ptr->cpus_per_node[0] - 1) ) 
 				/ job_ptr->cpus_per_node[0];
 		step_spec->node_count = (i > step_spec->node_count) ? 
@@ -561,7 +610,9 @@ _pick_step_nodes (struct job_record  *job_ptr,
 
 	if (step_spec->node_count) {
 		nodes_picked_cnt = bit_set_count(nodes_picked);
-/* 		info("got %d %d", step_spec->node_count, nodes_picked_cnt); */
+#if STEP_DEBUG
+		info("got %u %d", step_spec->node_count, nodes_picked_cnt);
+#endif
 		if (nodes_idle 
 		    && (bit_set_count(nodes_idle) >= step_spec->node_count)
 		    && (step_spec->node_count > nodes_picked_cnt)) {
@@ -595,8 +646,8 @@ _pick_step_nodes (struct job_record  *job_ptr,
 	
 	if (step_spec->cpu_count) {
 		cpus_picked_cnt = count_cpus(nodes_picked);
-		/* person is requesting more cpus than we got from the
-		   picked nodes we should return with an error */
+		/* user is requesting more cpus than we got from the
+		 * picked nodes we should return with an error */
 		if(step_spec->cpu_count > cpus_picked_cnt) {
 			debug2("Have %d nodes with %d cpus which is less "
 			       "than what the user is asking for (%d cpus) "
@@ -605,60 +656,6 @@ _pick_step_nodes (struct job_record  *job_ptr,
 			       step_spec->cpu_count);
 			goto cleanup;
 		}
-		/* Not sure why the rest of this 'if' is here 
-		   since this will only
-		   change the number of requested nodes by added nodes
-		   to the picked bitmap which isn't what we want to do
-		   if the user requests a node count.  If the user
-		   doesn't specify one then the entire allocation is
-		   already set so we should return an error in either
-		   case */
-		
-/* 		if (nodes_idle */
-/* 		    &&  (step_spec->cpu_count > cpus_picked_cnt)) { */
-/* 			int first_bit, last_bit; */
-/* 			first_bit = bit_ffs(nodes_idle); */
-/* 			if(first_bit == -1) */
-/* 				goto no_idle_bits; */
-/* 			last_bit  = bit_fls(nodes_idle); */
-/* 			if(last_bit == -1) */
-/* 				goto no_idle_bits; */
-			
-/* 			for (i = first_bit; i <= last_bit; i++) { */
-/* 				if (bit_test (nodes_idle, i) != 1) */
-/* 					continue; */
-/* 				bit_set (nodes_picked, i); */
-/* 				bit_clear (nodes_avail, i); */
-/* 				/\* bit_clear (nodes_idle, i);	unused *\/ */
-/* 				cpus_picked_cnt += */
-/* 					node_record_table_ptr[i].cpus; */
-/* 				if (cpus_picked_cnt >= step_spec->cpu_count) */
-/* 					break; */
-/* 			} */
-/* 			if (step_spec->cpu_count > cpus_picked_cnt) */
-/* 				goto cleanup; */
-/* 		} */
-/* 	no_idle_bits: */
-/* 		if (step_spec->cpu_count > cpus_picked_cnt) { */
-/* 			int first_bit, last_bit; */
-/* 			first_bit = bit_ffs(nodes_avail); */
-/* 			if(first_bit == -1) */
-/* 				goto cleanup; */
-/* 			last_bit  = bit_fls(nodes_avail); */
-/*  			if(last_bit == -1) */
-/* 				goto cleanup; */
-/* 			for (i = first_bit; i <= last_bit; i++) { */
-/* 				if (bit_test (nodes_avail, i) != 1) */
-/* 					continue; */
-/* 				bit_set (nodes_picked, i); */
-/* 				cpus_picked_cnt +=  */
-/* 					node_record_table_ptr[i].cpus; */
-/* 				if (cpus_picked_cnt >= step_spec->cpu_count) */
-/* 					break; */
-/* 			} */
-/* 			if (step_spec->cpu_count > cpus_picked_cnt) */
-/* 				goto cleanup; */
-/* 		} */
 	}
 	
 	FREE_NULL_BITMAP(nodes_avail);
@@ -669,9 +666,75 @@ cleanup:
 	FREE_NULL_BITMAP(nodes_avail);
 	FREE_NULL_BITMAP(nodes_idle);
 	FREE_NULL_BITMAP(nodes_picked);
+	*return_code = ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE;
 	return NULL;
 }
 
+/* Update a job's record of allocated CPUs when a job step gets scheduled */
+extern void step_alloc_lps(struct step_record *step_ptr)
+{
+	struct job_record  *job_ptr = step_ptr->job_ptr;
+	int i_node;
+	int job_node_inx = -1, step_node_inx = -1;
+
+	for (i_node = bit_ffs(job_ptr->node_bitmap); ; i_node++) {
+		if (!bit_test(job_ptr->node_bitmap, i_node))
+			continue;
+		job_node_inx++;
+		if (!bit_test(step_ptr->step_node_bitmap, i_node))
+			continue;
+		step_node_inx++;
+		job_ptr->used_lps[job_node_inx] += 
+			step_ptr->step_layout->tasks[step_node_inx];
+#if 0
+		info("step alloc of %s procs: %u of %u", 
+			node_record_table_ptr[i_node].name,
+			job_ptr->used_lps[job_node_inx],
+			job_ptr->alloc_lps[job_node_inx]);
+#endif
+		if (step_node_inx == (step_ptr->step_layout->node_cnt - 1))
+			break;
+	}
+	
+}
+
+static void _step_dealloc_lps(struct step_record *step_ptr)
+{
+	struct job_record  *job_ptr = step_ptr->job_ptr;
+	int i_node;
+	int job_node_inx = -1, step_node_inx = -1;
+
+	if (step_ptr->step_layout == NULL)	/* batch step */
+		return;
+
+	for (i_node = bit_ffs(job_ptr->node_bitmap); 
+	     i_node < job_ptr->node_cnt; i_node++) {
+		if (!bit_test(job_ptr->node_bitmap, i_node))
+			continue;
+		job_node_inx++;
+		if (!bit_test(step_ptr->step_node_bitmap, i_node))
+			continue;
+		step_node_inx++;
+		if (job_ptr->used_lps[job_node_inx] >=
+		    step_ptr->step_layout->tasks[step_node_inx]) {
+			job_ptr->used_lps[job_node_inx] -= 
+				step_ptr->step_layout->tasks[step_node_inx];
+		} else {
+			error("_step_dealloc_lps: underflow for %u.%u",
+				job_ptr->job_id, step_ptr->step_id);
+			job_ptr->used_lps[job_node_inx] = 0;
+		}
+#if 0
+		info("step dealloc of %s procs: %u of %u", 
+			node_record_table_ptr[i_node].name,
+			job_ptr->used_lps[job_node_inx],
+			job_ptr->alloc_lps[job_node_inx]);
+#endif
+		if (step_node_inx == (step_ptr->step_layout->node_cnt - 1))
+			break;
+	}
+	
+}
 
 /*
  * step_create - creates a step_record in step_specs->job_id, sets up the
@@ -692,7 +755,7 @@ step_create(job_step_create_request_msg_t *step_specs,
 	struct step_record *step_ptr;
 	struct job_record  *job_ptr;
 	bitstr_t *nodeset;
-	int node_count;
+	int node_count, ret_code;
 	time_t now = time(NULL);
 	char *step_node_list = NULL;
 
@@ -750,9 +813,9 @@ step_create(job_step_create_request_msg_t *step_specs,
 	job_ptr->kill_on_step_done = kill_job_when_step_done;
 
 	job_ptr->time_last_active = now;
-	nodeset = _pick_step_nodes(job_ptr, step_specs);
+	nodeset = _pick_step_nodes(job_ptr, step_specs, batch_step, &ret_code);
 	if (nodeset == NULL)
-		return ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE ;
+		return ret_code;
 	node_count = bit_set_count(nodeset);
 
 	if (step_specs->num_tasks == NO_VAL) {
@@ -787,8 +850,10 @@ step_create(job_step_create_request_msg_t *step_specs,
 		xfree(step_specs->node_list);
 		step_specs->node_list = xstrdup(step_node_list);
 	}
-/* 	info("got %s and %s looking for %d nodes", step_node_list, */
-/* 	     step_specs->node_list, step_specs->node_count); */
+#if STEP_DEBUG
+	info("got %s and %s looking for %d nodes", step_node_list,
+	     step_specs->node_list, step_specs->node_count);
+#endif
 	step_ptr->step_node_bitmap = nodeset;
 	
 	switch(step_specs->task_dist) {
@@ -808,6 +873,7 @@ step_create(job_step_create_request_msg_t *step_specs,
 	step_ptr->ckpt_interval = step_specs->ckpt_interval;
 	step_ptr->ckpt_time = now;
 	step_ptr->exit_code = NO_VAL;
+	step_ptr->exclusive = step_specs->exclusive;
 
 	/* step's name and network default to job's values if not 
 	 * specified in the step specification */
@@ -843,6 +909,7 @@ step_create(job_step_create_request_msg_t *step_specs,
 			delete_step_record (job_ptr, step_ptr->step_id);
 			return ESLURM_INTERCONNECT_FAILURE;
 		}
+		step_alloc_lps(step_ptr);
 	}
 	if (checkpoint_alloc_jobinfo (&step_ptr->check_job) < 0)
 		fatal ("step_create: checkpoint_alloc_jobinfo error");
@@ -864,12 +931,8 @@ extern slurm_step_layout_t *step_layout_create(struct step_record *step_ptr,
 	int cpu_inx = -1;
 	int usable_cpus = 0, i;
 	int set_nodes = 0;
-	int inx = 0;
 	int pos = -1;
 	struct job_record *job_ptr = step_ptr->job_ptr;
-
-	/* node_pos is the position in the node in the job */
-	uint32_t node_pos = job_ptr->cpu_count_reps[inx];
 			
 	/* build the cpus-per-node arrays for the subset of nodes
 	   used by this job step */
@@ -879,15 +942,17 @@ extern slurm_step_layout_t *step_layout_create(struct step_record *step_ptr,
 			pos = bit_get_pos_num(job_ptr->node_bitmap, i);
 			if (pos == -1)
 				return NULL;
-			/* need to get the correct num of cpus on the
-			   node */
-			while(pos >= node_pos) {
-				node_pos += 
-					job_ptr->cpu_count_reps[++inx];
-			}
-			debug2("%d got inx of %d cpus = %d pos = %d", 
-			       i, inx, job_ptr->cpus_per_node[inx], pos);
-			usable_cpus = job_ptr->cpus_per_node[inx];
+			if (step_ptr->exclusive) {
+				usable_cpus = job_ptr->alloc_lps[pos] -
+					      job_ptr->used_lps[pos];
+				if (usable_cpus < 0) {
+					error("step_layout_create exclusive");
+					return NULL;
+				}
+			} else
+				usable_cpus = job_ptr->alloc_lps[pos];
+			debug2("step_layou cpus = %d pos = %d", 
+			       usable_cpus, pos);
 			
 			if ((cpu_inx == -1) ||
 			    (cpus_per_node[cpu_inx] != usable_cpus)) {
@@ -898,10 +963,11 @@ extern slurm_step_layout_t *step_layout_create(struct step_record *step_ptr,
 			} else
 				cpu_count_reps[cpu_inx]++;
 			set_nodes++;
-			if(set_nodes == node_count)
+			if (set_nodes == node_count)
 				break;
 		}
 	}
+
 	/* layout the tasks on the nodes */
 	return slurm_step_layout_create(step_node_list,
 					cpus_per_node, cpu_count_reps, 
diff --git a/src/srun/allocate.c b/src/srun/allocate.c
index a0d77c3210e74b5e82466640b88a7cb3ca4e677e..55cfd6c21fe078fca5cf50d49030de8409da9741 100644
--- a/src/srun/allocate.c
+++ b/src/srun/allocate.c
@@ -578,6 +578,8 @@ _step_req_create(srun_job_t *j)
 	r->name       = xstrdup(opt.job_name);
 	r->relative   = (uint16_t)opt.relative;
 	r->ckpt_interval = (uint16_t)opt.ckpt_interval;
+	r->exclusive  = (uint16_t)opt.exclusive;
+	r->immediate  = (uint16_t)opt.immediate;
 	r->overcommit = opt.overcommit ? 1 : 0;
 	debug("requesting job %d, user %d, nodes %d including (%s)", 
 	      r->job_id, r->user_id, r->node_count, r->node_list);
@@ -631,7 +633,7 @@ create_job_step(srun_job_t *job)
 {
 	job_step_create_request_msg_t  *req  = NULL;
 	job_step_create_response_msg_t *resp = NULL;
-	int i;
+	int i, rc;
 	
 	if (!(req = _step_req_create(job))) {
 		error ("Unable to allocate step request message");
@@ -640,14 +642,21 @@ create_job_step(srun_job_t *job)
 
 	for (i=0; ;i++) {
 		if ((slurm_job_step_create(req, &resp) == SLURM_SUCCESS)
-		&&  (resp != NULL))
+		&&  (resp != NULL)) {
+			if (i > 0)
+				info("Job step created");
 			break;
-		if (slurm_get_errno() != ESLURM_DISABLED) {
+		}
+		rc = slurm_get_errno();
+		if (opt.immediate ||
+		    ((rc != ESLURM_NODES_BUSY) && (rc != ESLURM_DISABLED))) {
 			error ("Unable to create job step: %m");
 			return -1;
 		}
 		if (i == 0)
 			info("Job step creation temporarily disabled, retrying");
+		else
+			info("Job step creation still disabled, retrying");
 		sleep(MIN((i*10), 60));
 	}
 	
diff --git a/src/srun/opt.c b/src/srun/opt.c
index 4578f1eeb666397eb5fdce1e2c9400c145b93a3e..4df688bff7b45565667d839ac837459db786c529 100644
--- a/src/srun/opt.c
+++ b/src/srun/opt.c
@@ -969,6 +969,7 @@ static void _opt_default()
 	opt.unbuffered = false;
 	opt.overcommit = false;
 	opt.shared = (uint16_t)NO_VAL;
+	opt.exclusive = false;
 	opt.no_kill = false;
 	opt.kill_bad_exit = false;
 
@@ -1190,6 +1191,7 @@ _process_env_var(env_vars_t *e, const char *val)
 		break;
 
 	case OPT_EXCLUSIVE:
+		opt.exclusive = true;
 		opt.shared = 0;
 		break;
 
@@ -1649,6 +1651,7 @@ static void set_options(const int argc, char **argv)
 			opt.contiguous = true;
 			break;
                 case LONG_OPT_EXCLUSIVE:
+			opt.exclusive = true;
                         opt.shared = 0;
                         break;
                 case LONG_OPT_CPU_BIND:
@@ -2590,6 +2593,7 @@ static void _opt_list()
 		info("dependency     : none");
 	else
 		info("dependency     : %u", opt.dependency);
+	info("exclusive      : %s", tf_(opt.exclusive));
 	if (opt.shared != (uint16_t) NO_VAL)
 		info("shared         : %u", opt.shared);
 	str = print_constraints();
@@ -2753,6 +2757,7 @@ static void _help(void)
 "Consumable resources related options:\n" 
 "      --exclusive             allocate nodes in exclusive mode when\n" 
 "                              cpu consumable resource is enabled\n"
+"                              or don't share CPUs for job steps\n"
 "      --job-mem=MB            maximum amount of real memory per node\n"
 "                              required by the job.\n" 
 "                              --mem >= --job-mem if --mem is specified.\n" 
diff --git a/src/srun/opt.h b/src/srun/opt.h
index fa9edbaa510586fcd90ad702c0a582bf2d97d264..bc574bb2c46e2f83f5ebef7d41df0b98a107551b 100644
--- a/src/srun/opt.h
+++ b/src/srun/opt.h
@@ -111,6 +111,7 @@ typedef struct srun_options {
 	char *time_limit_str;	/* --time,   -t (string)	*/
 	int  ckpt_interval;	/* --checkpoint (int minutes)	*/
 	char *ckpt_interval_str;/* --checkpoint (string)	*/
+	bool exclusive;		/* --exclusive			*/
 	char *partition;	/* --partition=n,   -p n   	*/
 	enum task_dist_states
 	        distribution;	/* --distribution=, -m dist	*/
diff --git a/src/srun/srun.c b/src/srun/srun.c
index c67019675a33da77e52cfb67a38083e0938025bb..45aeb5795ad13258d148b2f4b6022cc1b3fe90d2 100644
--- a/src/srun/srun.c
+++ b/src/srun/srun.c
@@ -121,6 +121,7 @@ static int   _slurm_debug_env_val (void);
 static int   _call_spank_local_user (srun_job_t *job);
 static void  _define_symbols(void);
 static void  _pty_restore(void);
+static void  _step_opt_exclusive(void);
 
 int srun(int ac, char **av)
 {
@@ -207,6 +208,8 @@ int srun(int ac, char **av)
 		job_id = resp->job_id;
 		if (opt.alloc_nodelist == NULL)
                        opt.alloc_nodelist = xstrdup(resp->node_list);
+		if (opt.exclusive)
+			_step_opt_exclusive();
 
 		job = job_step_create_allocation(resp);
 		slurm_free_resource_allocation_response_msg(resp);
@@ -239,6 +242,7 @@ int srun(int ac, char **av)
 		job = job_create_allocation(resp);
 		if(!job)
 			exit(1);
+		opt.exclusive = false;	/* not applicable for this step */
 		if (create_job_step(job) < 0) {
 			srun_job_destroy(job, 0);
 			exit(1);
@@ -846,3 +850,23 @@ static void _pty_restore(void)
 	if (tcsetattr(STDOUT_FILENO, TCSANOW, &termdefaults) < 0)
 		fprintf(stderr, "tcsetattr: %s\n", strerror(errno));
 }
+
+/* opt.exclusive is set, disable user task layout controls */
+static void _step_opt_exclusive(void)
+{
+	if (!opt.nprocs_set)
+		fatal("--nprocs must be set with --exclusive");
+	if (opt.relative_set)
+		fatal("--relative disabled, incompatible with --exclusive");
+	if (opt.nodes_set) {
+		/* Likely set via SLURM_NNODES env var from job allocation */
+		verbose("ignoring node count set by --nodes or SLURM_NNODES");
+		verbose("  it is incompatible with --exclusive");
+		opt.min_nodes = 1;
+		opt.max_nodes = 0;
+	}
+	if (opt.exc_nodes)
+		fatal("--exclude is incompatible with --exclusive");
+	if (opt.nodelist)
+		fatal("--nodelist is incompatible with --exclusive");
+}
diff --git a/testsuite/expect/README b/testsuite/expect/README
index bccdcc8a595992a3ce8f5c2772e37ce118a5f933..fb20bc5fd441db6e704428e02ec52a9cd6408cf4 100644
--- a/testsuite/expect/README
+++ b/testsuite/expect/README
@@ -72,7 +72,7 @@ test1.11   Test job name option (--job-name).
 test1.12   Test of --checkpoint option. This does not validate the 
            checkpoint file itself.
 test1.13   Test of immediate allocation option (--immediate option).
-test1.14   REMOVED
+test1.14   Test exclusive resource allocation for a step (--exclusive option).
 test1.15   Test of wait option (--wait option).
 test1.16   Confirm that srun buffering can be disabled (--unbuffered option).
 test1.17   REMOVED
diff --git a/testsuite/expect/test1.14 b/testsuite/expect/test1.14
new file mode 100755
index 0000000000000000000000000000000000000000..c8c108b26864ce6884397dbad9a8fd79988038ff
--- /dev/null
+++ b/testsuite/expect/test1.14
@@ -0,0 +1,211 @@
+#!/usr/bin/expect
+############################################################################
+# Purpose: Test of SLURM functionality
+#          Test exclusive resource allocation for a step (--exclusive option).
+#
+# Output:  "TEST: #.#" followed by "SUCCESS" if test was successful, OR
+#          "FAILURE: ..." otherwise with an explanation of the failure, OR
+#          anything else indicates a failure mode that must be investigated.
+############################################################################
+# Copyright (C) 2007 The Regents of the University of California.
+# Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+# Written by Morris Jette <jette1@llnl.gov>
+# UCRL-CODE-226842.
+# 
+# This file is part of SLURM, a resource management program.
+# For details, see <http://www.llnl.gov/linux/slurm/>.
+#  
+# SLURM is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 2 of the License, or (at your option)
+# any later version.
+# 
+# SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+# details.
+# 
+# You should have received a copy of the GNU General Public License along
+# with SLURM; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
+############################################################################
+source ./globals
+
+set test_id          "1.14"
+set exit_code        0
+set file_in         "test$test_id.input"
+set file_out        "test$test_id.output"
+set job_id           0
+set sleep_secs       10
+
+print_header $test_id
+
+#
+# Delete left-over input script
+# Build input script file
+# Run one more step than allocated CPUs and make sure it waits
+#
+exec $bin_rm -f $file_in $file_out
+make_bash_script $file_in "
+  inx=0
+  while \[ \$inx -lt \$SLURM_TASKS_PER_NODE \]
+  do
+    $srun --exclusive -n1 sleep $sleep_secs &
+    inx=\$((inx+1))
+  done
+  $srun --exclusive -n1 hostname &
+  wait
+"
+
+#
+# Spawn a job via sbatch
+#
+spawn $sbatch -N1 -t1 --output=$file_out $file_in
+expect {
+	-re "Submitted batch job ($number)" {
+		set job_id $expect_out(1,string)
+		exp_continue
+	}
+	timeout {
+		send_user "\nFAILURE: sbatch not responding\n"
+		set exit_code 1
+		exp_continue
+	}
+	eof {
+		wait
+	}
+}
+if { $job_id == 0 } {
+	send_user "\nFAILURE: failed to submit job\n"
+	exit 1
+}
+
+#
+# Wait for job to complete
+#
+if {[wait_for_job $job_id "DONE"] != 0} {
+	send_user "\nFAILURE: waiting for job to complete\n"
+	cancel_job $job_id
+	set exit_code 1
+}
+
+#
+# Check for desired output
+#
+if {[wait_for_file $file_out] != 0} {
+	send_user "\nFAILURE: Output file $file_out is missing\n"
+	exit 1
+}
+set match1 0
+set match2 0
+spawn $bin_cat $file_out
+expect {
+	-re "Job step creation temporarily disabled, retrying" {
+		incr match1
+		exp_continue
+	}
+	-re "Job step created" {
+		incr match2
+		exp_continue
+	}
+	eof {
+		wait
+	}
+}
+
+if { $match1 != 1 || $match2 != 1 } {
+	send_user "\nFAILURE: Problem with exclusive resource allocation "
+	send_user "for step ($match1, $match2)\n"
+	set exit_code 1
+}
+
+if {$exit_code == 0} {
+	send_user "\nSo far, so good. Trying with --imediate option\n\n"
+} else {
+	exit $exit_code
+}
+
+#
+# Delete left-over input script
+# Build another input script file
+# Run one more step than allocated CPUs with immediate option and make aborts
+#
+exec $bin_rm -f $file_in $file_out
+make_bash_script $file_in "
+  inx=0
+  while \[ \$inx -lt \$SLURM_TASKS_PER_NODE \]
+  do
+    $srun --exclusive -n1 sleep $sleep_secs &
+    inx=\$((inx+1))
+  done
+  $srun --exclusive -n1 --immediate hostname &
+  wait
+"
+
+#
+# Spawn a job via sbatch
+#
+spawn $sbatch -N1 -t1 --output=$file_out $file_in
+expect {
+	-re "Submitted batch job ($number)" {
+		set job_id $expect_out(1,string)
+		exp_continue
+	}
+	timeout {
+		send_user "\nFAILURE: sbatch not responding\n"
+		set exit_code 1
+		exp_continue
+	}
+	eof {
+		wait
+	}
+}
+if { $job_id == 0 } {
+	send_user "\nFAILURE: failed to submit job\n"
+	exit 1
+}
+
+#
+# Wait for job to complete
+#
+if {[wait_for_job $job_id "DONE"] != 0} {
+	send_user "\nFAILURE: waiting for job to complete\n"
+	cancel_job $job_id
+	set exit_code 1
+}
+
+#
+# Check for desired output
+#
+if {[wait_for_file $file_out] != 0} {
+	send_user "\nFAILURE: Output file $file_out is missing\n"
+	exit 1
+}
+set match1 0
+spawn $bin_cat $file_out
+expect {
+	-re "Job step creation temporarily disabled, retrying" {
+		send_user "\nFAILURE: Problem --exclusive and --immediate option for step\n"
+		set exit_code 1
+		exp_continue
+	}
+	-re "Unable to create job step" {
+		send_user "This error was expected, no worries\n"
+		incr match1
+		exp_continue
+	}
+	eof {
+		wait
+	}
+}
+
+if { $match1 != 1 } {
+	send_user "\nFAILURE: Problem --exclusive and --immediate option for step\n"
+	set exit_code 1
+}
+
+if {$exit_code == 0} {
+	exec $bin_rm -f $file_in $file_out
+	send_user "\nSUCCESS\n"
+}
+exit $exit_code