diff --git a/NEWS b/NEWS
index 6c2545aeaf53de88885e167dfbca4a56ffbe9b78..7fe692999f0cab099db7616b2e5de8da9fae71b2 100644
--- a/NEWS
+++ b/NEWS
@@ -16,6 +16,7 @@ documents those changes that are of interest to users and admins.
     Former users of SchedType=sched/gang should set SchedType=sched/backfill,
     PreemptType=preempt/partition_prio and PreemptMode=gang,suspend. See
     web and slurm.conf man page for other options.
+ -- In select/linear, optimize job placement across partitions.
  -- If the --partition option is used with the sinfo or squeue command then
     print information about even hidden partitions.
  -- Replaced misc cpu allocation members in job_info_t with select_job_res_t
diff --git a/src/common/read_config.c b/src/common/read_config.c
index 6848fc47f61eac6d72872f0ea17714320ff80995..048fb5c97a4f230b87cdaa878db57fb0324469a2 100644
--- a/src/common/read_config.c
+++ b/src/common/read_config.c
@@ -2177,6 +2177,12 @@ _validate_and_set_defaults(slurm_ctl_conf_t *conf, s_p_hashtbl_t *hashtbl)
 			fatal("PreemptType and PreemptMode values "
 			      "incompatible");
 		}
+	} else if (strcmp(conf->preempt_type, "preempt/partition_prio") == 0) {
+		int preempt_mode = conf->preempt_mode & (~PREEMPT_MODE_GANG);
+		if (preempt_mode == PREEMPT_MODE_OFF) {
+			fatal("PreemptType and PreemptMode values "
+			      "incompatible");
+		}
 	} else if (strcmp(conf->preempt_type, "preempt/none") == 0) {
 		int preempt_mode = conf->preempt_mode & (~PREEMPT_MODE_GANG);
 		if (preempt_mode != PREEMPT_MODE_OFF) {
diff --git a/src/plugins/preempt/qos/preempt_qos.c b/src/plugins/preempt/qos/preempt_qos.c
index 13dc138a8faffdfbabdd49394c5c022f4166838b..5f0da069c7c2f4075caf9a112ccd9463670f9816 100644
--- a/src/plugins/preempt/qos/preempt_qos.c
+++ b/src/plugins/preempt/qos/preempt_qos.c
@@ -105,7 +105,8 @@ extern struct job_record **find_preemptable_jobs(struct job_record *job_ptr)
 		if (!IS_JOB_RUNNING(job_p) && !IS_JOB_SUSPENDED(job_p))
 			continue;
 /* FIXME: Change to some QOS comparison */
-		if (job_p->account && job_ptr->account &&
+		if ((job_p->account == NULL)	|| 
+		    (job_ptr->account == NULL)	||
 		    (job_p->account[0] >= job_ptr->account[0]))
 			continue;
 		if ((job_p->node_bitmap == NULL) ||
diff --git a/src/plugins/select/linear/select_linear.c b/src/plugins/select/linear/select_linear.c
index 65e064e102a1e5cd3f0a6ac83741a4193d29755d..1be29bf279ece6a253a00031996ab3bda81c56d2 100644
--- a/src/plugins/select/linear/select_linear.c
+++ b/src/plugins/select/linear/select_linear.c
@@ -68,6 +68,7 @@
 #include "src/common/xmalloc.h"
 
 #include "src/slurmctld/slurmctld.h"
+#include "src/slurmctld/preempt.h"
 #include "src/slurmctld/proc_req.h"
 #include "src/plugins/select/linear/select_linear.h"
 
@@ -88,6 +89,7 @@ int node_record_count;
 time_t last_node_update;
 struct switch_record *switch_record_table; 
 int switch_record_cnt;
+bool preempt_within_partition(void);
 
 struct select_nodeinfo {
 	uint16_t magic;		/* magic number */
@@ -124,6 +126,12 @@ static bool _rem_run_job(struct part_cr_record *part_cr_ptr, uint32_t job_id);
 static int _rm_job_from_nodes(struct node_cr_record *node_cr_ptr,
 			      struct job_record *job_ptr, char *pre_err, 
 			      bool remove_all);
+static int _run_now(struct job_record *job_ptr, bitstr_t *bitmap,
+		    uint32_t min_nodes, uint32_t max_nodes, 
+		    int max_share, uint32_t req_nodes);
+static int _test_only(struct job_record *job_ptr, bitstr_t *bitmap,
+			  uint32_t min_nodes, uint32_t max_nodes, 
+			  uint32_t req_nodes);
 static int _will_run_test(struct job_record *job_ptr, bitstr_t *bitmap,
 			  uint32_t min_nodes, uint32_t max_nodes, 
 			  int max_share, uint32_t req_nodes);
@@ -173,7 +181,7 @@ static bool job_preemption_tested  = false;
 
 static struct node_cr_record *node_cr_ptr = NULL;
 static pthread_mutex_t cr_mutex = PTHREAD_MUTEX_INITIALIZER;
-static List step_cr_list = NULL;
+static List preempt_job_list = NULL;
 
 #ifdef HAVE_XCPU
 #define XCPU_POLL_TIME 120
@@ -373,8 +381,10 @@ static uint16_t _get_avail_cpus(struct job_record *job_ptr, int index)
 	uint16_t avail_cpus;
 	uint16_t cpus, sockets, cores, threads;
 	uint16_t cpus_per_task = 1;
-	uint16_t ntasks_per_node = 0, ntasks_per_socket = 0, ntasks_per_core = 0;
-	uint16_t max_sockets = 0xffff, max_cores = 0xffff, max_threads = 0xffff;
+	uint16_t ntasks_per_node = 0, ntasks_per_socket = 0;
+	uint16_t ntasks_per_core = 0;
+	uint16_t max_sockets = 0xffff, max_cores = 0xffff;
+	uint16_t max_threads = 0xffff;
 	multi_core_data_t *mc_ptr = NULL;
 	int min_sockets = 0, min_cores = 0;
 
@@ -541,22 +551,11 @@ static int _job_count_bitmap(struct node_cr_record *node_cr_ptr,
 			     int run_job_cnt, int tot_job_cnt)
 {
 	int i, count = 0, total_jobs, total_run_jobs;
-	int lower_prio_jobs, same_prio_jobs, higher_prio_jobs;
 	struct part_cr_record *part_cr_ptr;
 	uint32_t job_memory_cpu = 0, job_memory_node = 0;
 	uint32_t alloc_mem = 0, job_mem = 0, avail_mem = 0;
-	bool exclusive;
 
 	xassert(node_cr_ptr);
-
-	/* Jobs submitted to a partition with 
-	 * Shared=FORCE:1 may share resources with jobs in other partitions
-	 * Shared=NO  may not share resources with jobs in other partitions */
-	if (run_job_cnt || (job_ptr->part_ptr->max_share & SHARED_FORCE))
-		exclusive = false;
-	else
-		exclusive = true;
-
 	if (job_ptr->details->job_min_memory  && (cr_type == CR_MEMORY)) {
 		if (job_ptr->details->job_min_memory & MEM_PER_CPU) {
 			job_memory_cpu = job_ptr->details->job_min_memory &
@@ -593,105 +592,20 @@ static int _job_count_bitmap(struct node_cr_record *node_cr_ptr,
 			}
 		}
 
-		if ((run_job_cnt != NO_SHARE_LIMIT) &&
-		    (!_job_preemption_enabled()) &&
-		    (node_cr_ptr[i].exclusive_jobid != 0)) {
+		if (node_cr_ptr[i].exclusive_jobid != 0) {
 			/* already reserved by some exclusive job */
 			bit_clear(jobmap, i);
 			continue;
 		}
 
-		if (_job_preemption_enabled()) {
-			/* clear this node if any higher-priority
-			 * partitions have existing allocations */
-			lower_prio_jobs = 0;
-			same_prio_jobs = 0;
-			higher_prio_jobs = 0;
-			part_cr_ptr = node_cr_ptr[i].parts;
-			for ( ;part_cr_ptr; part_cr_ptr = part_cr_ptr->next) {
-				if (part_cr_ptr->part_ptr->priority <
-				    job_ptr->part_ptr->priority) {
-					lower_prio_jobs += part_cr_ptr->
-							   tot_job_cnt;
-				} else if (part_cr_ptr->part_ptr->priority ==
-					   job_ptr->part_ptr->priority) {
-					same_prio_jobs += part_cr_ptr->
-							  tot_job_cnt;
-				} else {
-					higher_prio_jobs += part_cr_ptr->
-							    tot_job_cnt;
-				}
-			}
-			if ((run_job_cnt != NO_SHARE_LIMIT) &&
-			    (higher_prio_jobs > 0)) {
-				bit_clear(jobmap, i);
-				continue;
-			}
-			/* We're not currently tracking memory allocation 
-			 * by partition, so we avoid nodes where the total 
-			 * allocated memory would exceed that available
-			 * and there are *any* jobs left on the node after
-			 * this one is started. */
-			if (((alloc_mem + job_mem) > avail_mem)		&&
-			    ((!_job_preemption_killing())		||
-			     ((same_prio_jobs + higher_prio_jobs) > 0))) {
-				bit_clear(jobmap, i);
-				continue;
-			}
-			/* if not sharing, then check with other partitions
-			 * of equal priority. Otherwise, load-balance within
-			 * the local partition */
-			total_jobs = 0;
-			total_run_jobs = 0;
-			part_cr_ptr = node_cr_ptr[i].parts;
-			for ( ; part_cr_ptr; part_cr_ptr = part_cr_ptr->next) {
-				if (part_cr_ptr->part_ptr->priority !=
-				    job_ptr->part_ptr->priority)
-					continue;
-				if (!job_ptr->details->shared) {
-					total_run_jobs +=
-						      part_cr_ptr->run_job_cnt;
-					total_jobs += part_cr_ptr->tot_job_cnt;
-					continue;
-				}
-				if (part_cr_ptr->part_ptr == 
-				    job_ptr->part_ptr) {
-					total_run_jobs +=
-						      part_cr_ptr->run_job_cnt;
-					total_jobs += part_cr_ptr->tot_job_cnt;
-					break;
-				}
-			}
-			if ((total_run_jobs <= run_job_cnt) &&
-			    (total_jobs     <= tot_job_cnt)) {
-				bit_set(jobmap, i);
-				count++;
-			} else {
-				bit_clear(jobmap, i);
-			}
-			continue;
-		}
-
 		total_jobs = 0;
 		total_run_jobs = 0;
 		part_cr_ptr = node_cr_ptr[i].parts;
 		while (part_cr_ptr) {
-			if (exclusive) {     /* count jobs in all partitions */
-				total_run_jobs += part_cr_ptr->run_job_cnt;
-				total_jobs     += part_cr_ptr->tot_job_cnt;
-			} else if (part_cr_ptr->part_ptr == job_ptr->part_ptr) {
-				total_run_jobs += part_cr_ptr->run_job_cnt;
-				total_jobs     += part_cr_ptr->tot_job_cnt; 
-				break;
-			}
+			total_run_jobs += part_cr_ptr->run_job_cnt;
+			total_jobs     += part_cr_ptr->tot_job_cnt;
 			part_cr_ptr = part_cr_ptr->next;
 		}
-		if ((run_job_cnt != 0) && (part_cr_ptr == NULL)) {
-			error("_job_count_bitmap: could not find "
-				"partition %s for node %s",
-				job_ptr->part_ptr->name,
-				node_record_table_ptr[i].name);
-		}
 		if ((total_run_jobs <= run_job_cnt) &&
 		    (total_jobs     <= tot_job_cnt)) {
 			bit_set(jobmap, i);
@@ -808,9 +722,9 @@ static int _job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 
 			avail_cpus = _get_avail_cpus(job_ptr, index);
 
-			if (job_ptr->details->req_node_bitmap
-			&&  bit_test(job_ptr->details->req_node_bitmap, index)
-			&&  (max_nodes > 0)) {
+			if (job_ptr->details->req_node_bitmap	&&
+			    (max_nodes > 0)			&&
+			    bit_test(job_ptr->details->req_node_bitmap,index)){
 				if (consec_req[consec_index] == -1) {
 					/* first required node in set */
 					consec_req[consec_index] = index;
@@ -937,8 +851,8 @@ static int _job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 			 * then down from the required nodes */
 			for (i = best_fit_req;
 			     i <= consec_end[best_fit_location]; i++) {
-				if ((max_nodes <= 0)
-				||  ((rem_nodes <= 0) && (rem_cpus <= 0)))
+				if ((max_nodes <= 0) ||
+				    ((rem_nodes <= 0) && (rem_cpus <= 0)))
 					break;
 				if (bit_test(bitmap, i))
 					continue;
@@ -952,8 +866,8 @@ static int _job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 			}
 			for (i = (best_fit_req - 1);
 			     i >= consec_start[best_fit_location]; i--) {
-				if ((max_nodes <= 0)
-				||  ((rem_nodes <= 0) && (rem_cpus <= 0)))
+				if ((max_nodes <= 0) ||
+				    ((rem_nodes <= 0) && (rem_cpus <= 0)))
 					break;
 				if (bit_test(bitmap, i)) 
 					continue;
@@ -968,8 +882,8 @@ static int _job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 		} else {
 			for (i = consec_start[best_fit_location];
 			     i <= consec_end[best_fit_location]; i++) {
-				if ((max_nodes <= 0)
-				||  ((rem_nodes <= 0) && (rem_cpus <= 0)))
+				if ((max_nodes <= 0) ||
+				    ((rem_nodes <= 0) && (rem_cpus <= 0)))
 					break;
 				if (bit_test(bitmap, i))
 					continue;
@@ -991,8 +905,8 @@ static int _job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 		consec_nodes[best_fit_location] = 0;
 	}
 
-	if (error_code && (rem_cpus <= 0)
-	&&  _enough_nodes(0, rem_nodes, min_nodes, req_nodes)) {
+	if (error_code && (rem_cpus <= 0) &&
+	    _enough_nodes(0, rem_nodes, min_nodes, req_nodes)) {
 		error_code = SLURM_SUCCESS;
 	}
 	if (error_code == SLURM_SUCCESS) {
@@ -1369,6 +1283,7 @@ static int _rm_job_from_nodes(struct node_cr_record *node_cr_ptr,
 		}
 		if (node_cr_ptr[i].exclusive_jobid == job_ptr->job_id)
 			node_cr_ptr[i].exclusive_jobid = 0;
+
 		part_cr_ptr = node_cr_ptr[i].parts;
 		while (part_cr_ptr) {
 			if (part_cr_ptr->part_ptr != job_ptr->part_ptr) {
@@ -1475,8 +1390,7 @@ static int _add_job_to_nodes(struct node_cr_record *node_cr_ptr,
 					node_record_table_ptr[i].cpus;
 		}
 		if (exclusive) {
-			if (node_cr_ptr[i].exclusive_jobid &&
-			    !_job_preemption_killing()) {
+			if (node_cr_ptr[i].exclusive_jobid) {
 				error("select/linear: conflicting exclusive "
 				      "jobs %u and %u on %s",
 				      job_ptr->job_id, 
@@ -1675,8 +1589,7 @@ static void _init_node_cr(void)
 			if (!bit_test(select_ptr->node_bitmap, i))
 				continue;
 			if (exclusive) {
-				if (node_cr_ptr[i].exclusive_jobid &&
-				    !_job_preemption_killing()) {
+				if (node_cr_ptr[i].exclusive_jobid) {
 					error("select/linear: conflicting "
 				 	      "exclusive jobs %u and %u on %s",
 				 	      job_ptr->job_id, 
@@ -1726,16 +1639,149 @@ static void _init_node_cr(void)
 	_dump_node_cr(node_cr_ptr);
 }
 
+static bool _is_preemptable(struct job_record *job_ptr, 
+			    struct job_record **preempt_job_ptr)
+{
+	int i;
+
+	if (!preempt_job_ptr)
+		return false;
+
+	for (i=0; preempt_job_ptr[i]; i++) {
+		if (preempt_job_ptr[i]->job_id == job_ptr->job_id)
+			return true;
+	}
+	return false;
+}
+
+/* Determine if a job can ever run */
+static int _test_only(struct job_record *job_ptr, bitstr_t *bitmap,
+			  uint32_t min_nodes, uint32_t max_nodes, 
+			  uint32_t req_nodes)
+{
+	int rc;
+	uint32_t save_mem;
+
+	save_mem = job_ptr->details->job_min_memory;
+	job_ptr->details->job_min_memory = 0;
+	rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, req_nodes);
+	job_ptr->details->job_min_memory = save_mem;
+
+	return rc;
+}
+
+/* Allocate resources for a job now, if possible */
+static int _run_now(struct job_record *job_ptr, bitstr_t *bitmap,
+		    uint32_t min_nodes, uint32_t max_nodes, 
+		    int max_share, uint32_t req_nodes)
+{
+
+	bitstr_t *orig_map = bit_copy(bitmap);
+	int max_run_job, j, sus_jobs, rc = EINVAL, prev_cnt = -1;
+	struct job_record **preempt_job_ptr = NULL, *tmp_job_ptr;
+	ListIterator job_iterator;
+	struct node_cr_record *exp_node_cr;
+
+	for (max_run_job=0; max_run_job<max_share; max_run_job++) {
+		bool last_iteration = (max_run_job == (max_share - 1));
+		for (sus_jobs=0; ((sus_jobs<5) && (rc != SLURM_SUCCESS)); 
+		     sus_jobs+=4) {
+			if (last_iteration)
+				sus_jobs = NO_SHARE_LIMIT;
+			j = _job_count_bitmap(node_cr_ptr, job_ptr, 
+					      orig_map, bitmap, 
+					      max_run_job, 
+					      max_run_job + sus_jobs);
+#if 0
+{			char *node_list = bitmap2node_name(bitmap);
+			info("_run_job %u iter:%d cnt:%d nodes:%s", 
+			     job_ptr->job_id, max_run_job, j, node_list);
+			xfree(node_list);
+}
+#endif
+			if ((j == prev_cnt) || (j < min_nodes))
+				continue;
+			prev_cnt = j;
+			if (max_run_job > 0) {
+				/* We need to share. Try to find
+				 * suitable job to share nodes with */
+				rc = _find_job_mate(job_ptr, bitmap,
+						    min_nodes,
+						    max_nodes, req_nodes);
+				if (rc == SLURM_SUCCESS)
+					break;
+			}
+			rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, 
+				       req_nodes);
+			if (rc == SLURM_SUCCESS)
+				break;
+			continue;
+		}
+	}
+
+	if ((rc != SLURM_SUCCESS) &&
+	    (preempt_job_ptr = slurm_find_preemptable_jobs(job_ptr))&&
+	    (exp_node_cr = _dup_node_cr(node_cr_ptr))) {
+		/* Remove all preemptable jobs from simulated environment */
+		job_iterator = list_iterator_create(job_list);
+		while ((tmp_job_ptr = (struct job_record *) 
+				list_next(job_iterator))) {
+			if (!IS_JOB_RUNNING(tmp_job_ptr) && 
+			    !IS_JOB_SUSPENDED(tmp_job_ptr))
+				continue;
+			if (_is_preemptable(tmp_job_ptr, preempt_job_ptr)) {
+				/* Remove preemptable job now */
+				_rm_job_from_nodes(exp_node_cr, tmp_job_ptr,
+						   "_will_run_test", 
+						   _job_preemption_killing());
+				j = _job_count_bitmap(exp_node_cr, job_ptr, orig_map, 
+						      bitmap, (max_share - 1), 
+						      NO_SHARE_LIMIT);
+				if (j < min_nodes)
+					continue;
+				rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, 
+					       req_nodes);
+				if (rc == SLURM_SUCCESS)
+					break;
+			}
+		}
+		list_iterator_destroy(job_iterator);
+
+		if ((rc == SLURM_SUCCESS) && _job_preemption_killing()) {
+			/* Queue preemption of jobs whose resources actually used */
+			for (j=0; preempt_job_ptr[j]; j++) {
+				uint32_t *job_id;
+				if (bit_overlap(bitmap, 
+						preempt_job_ptr[j]->node_bitmap) == 0)
+					continue;
+
+				job_id = xmalloc(sizeof(uint32_t));
+				job_id[0] = preempt_job_ptr[j]->job_id;
+				list_append(preempt_job_list, job_id);
+			}
+			rc = EINVAL;	/* Can't schedule until after preemptions */
+		}
+		_free_node_cr(exp_node_cr);
+	}
+	if (rc == SLURM_SUCCESS)
+		_build_select_struct(job_ptr, bitmap);
+	xfree(preempt_job_ptr);
+	bit_free(orig_map);
+
+	return rc;
+}
+
 /* Determine where and when the job at job_ptr can begin execution by updating 
  * a scratch node_cr_record structure to reflect each job terminating at the 
  * end of its time limit and use this to show where and when the job at job_ptr
- * will begin execution. Used by Moab for backfill scheduling. */
+ * will begin execution. Used by SLURM's sched/backfill plugin and Moab. */
 static int _will_run_test(struct job_record *job_ptr, bitstr_t *bitmap,
 			  uint32_t min_nodes, uint32_t max_nodes, 
 			  int max_share, uint32_t req_nodes)
 {
 	struct node_cr_record *exp_node_cr;
 	struct job_record *tmp_job_ptr, **tmp_job_pptr;
+	struct job_record **preempt_job_ptr = NULL;
 	List cr_job_list;
 	ListIterator job_iterator;
 	bitstr_t *orig_map;
@@ -1766,48 +1812,74 @@ static int _will_run_test(struct job_record *job_ptr, bitstr_t *bitmap,
 		return SLURM_ERROR;
 	}
 
-	/* Build list of running jobs */
+	/* Build list of running and suspended jobs */
 	cr_job_list = list_create(_cr_job_list_del);
 	if (!cr_job_list)
 		fatal("list_create: memory allocation failure");
+	preempt_job_ptr = slurm_find_preemptable_jobs(job_ptr);
 	job_iterator = list_iterator_create(job_list);
 	while ((tmp_job_ptr = (struct job_record *) list_next(job_iterator))) {
-		if (!IS_JOB_RUNNING(tmp_job_ptr))
+		if (!IS_JOB_RUNNING(tmp_job_ptr) && 
+		    !IS_JOB_SUSPENDED(tmp_job_ptr))
 			continue;
 		if (tmp_job_ptr->end_time == 0) {
 			error("Job %u has zero end_time", tmp_job_ptr->job_id);
 			continue;
 		}
-		tmp_job_pptr = xmalloc(sizeof(struct job_record *));
-		*tmp_job_pptr = tmp_job_ptr;
-		list_append(cr_job_list, tmp_job_pptr);
+		if (_is_preemptable(tmp_job_ptr, preempt_job_ptr)) {
+			/* Remove preemptable job now */
+			_rm_job_from_nodes(exp_node_cr, tmp_job_ptr,
+					   "_will_run_test", 
+					   _job_preemption_killing());
+		} else {
+			tmp_job_pptr = xmalloc(sizeof(struct job_record *));
+			*tmp_job_pptr = tmp_job_ptr;
+			list_append(cr_job_list, tmp_job_pptr);
+		}
 	}
 	list_iterator_destroy(job_iterator);
-	list_sort(cr_job_list, _cr_job_list_sort);
 
-	/* Remove the running jobs one at a time from exp_node_cr and try
-	 * scheduling the pending job after each one */
-	job_iterator = list_iterator_create(cr_job_list);
-	while ((tmp_job_pptr = (struct job_record **) 
-			       list_next(job_iterator))) {
-		tmp_job_ptr = *tmp_job_pptr;
-		_rm_job_from_nodes(exp_node_cr, tmp_job_ptr,
-				   "_will_run_test", true);
+	/* Test with all preemptable jobs gone */
+	if (preempt_job_ptr) {
 		i = _job_count_bitmap(exp_node_cr, job_ptr, orig_map, bitmap, 
 				      max_run_jobs, NO_SHARE_LIMIT);
-		if (i < min_nodes)
-			continue;
-		rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, 
-			       req_nodes);
-		if (rc != SLURM_SUCCESS)
-			continue;
-		if (tmp_job_ptr->end_time <= now)
-			job_ptr->start_time = now + 1;
-		else
-			job_ptr->start_time = tmp_job_ptr->end_time;
-		break;
+		if (i >= min_nodes) {
+			rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, 
+				       req_nodes);
+			if (rc == SLURM_SUCCESS)
+				job_ptr->start_time = now + 1;
+		}
+		xfree(preempt_job_ptr);
 	}
-	list_iterator_destroy(job_iterator);
+
+	/* Remove the running jobs one at a time from exp_node_cr and try
+	 * scheduling the pending job after each one */
+	if (rc != SLURM_SUCCESS) {
+		list_sort(cr_job_list, _cr_job_list_sort);
+		job_iterator = list_iterator_create(cr_job_list);
+		while ((tmp_job_pptr = (struct job_record **) 
+				       list_next(job_iterator))) {
+			tmp_job_ptr = *tmp_job_pptr;
+			_rm_job_from_nodes(exp_node_cr, tmp_job_ptr,
+					   "_will_run_test", true);
+			i = _job_count_bitmap(exp_node_cr, job_ptr, orig_map, 
+					      bitmap, max_run_jobs, 
+					      NO_SHARE_LIMIT);
+			if (i < min_nodes)
+				continue;
+			rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, 
+				       req_nodes);
+			if (rc != SLURM_SUCCESS)
+				continue;
+			if (tmp_job_ptr->end_time <= now)
+				job_ptr->start_time = now + 1;
+			else
+				job_ptr->start_time = tmp_job_ptr->end_time;
+			break;
+		}
+		list_iterator_destroy(job_iterator);
+	}
+
 	list_destroy(cr_job_list);
 	_free_node_cr(exp_node_cr);
 	bit_free(orig_map);
@@ -1826,6 +1898,11 @@ static int  _cr_job_list_sort(void *x, void *y)
 	return (int) difftime(job1_pptr[0]->end_time, job2_pptr[0]->end_time);
 }
 
+static void _preempt_list_del(void *x)
+{
+	xfree(x);
+}
+
 /*
  * init() is called when the plugin is loaded, before any other functions
  * are called.  Put global initialization here.
@@ -1842,6 +1919,10 @@ extern int init ( void )
 #endif
 	cr_type = (select_type_plugin_info_t)
 			slurmctld_conf.select_type_param;
+	slurm_mutex_lock(&cr_mutex);
+	if (!preempt_job_list)
+		preempt_job_list = list_create(_preempt_list_del);
+	slurm_mutex_unlock(&cr_mutex);
 	return rc;
 }
 
@@ -1854,9 +1935,9 @@ extern int fini ( void )
 	slurm_mutex_lock(&cr_mutex);
 	_free_node_cr(node_cr_ptr);
 	node_cr_ptr = NULL;
-	if (step_cr_list)
-		list_destroy(step_cr_list);
-	step_cr_list = NULL;
+	if (preempt_job_list)
+		list_destroy(preempt_job_list);
+	preempt_job_list = NULL;
 	slurm_mutex_unlock(&cr_mutex);
 	return rc;
 }
@@ -1899,9 +1980,6 @@ extern int select_p_node_init(struct node_record *node_ptr, int node_cnt)
 	slurm_mutex_lock(&cr_mutex);
 	_free_node_cr(node_cr_ptr);
 	node_cr_ptr = NULL;
-	if (step_cr_list)
-		list_destroy(step_cr_list);
-	step_cr_list = NULL;
 	slurm_mutex_unlock(&cr_mutex);
 
 	select_node_ptr = node_ptr;
@@ -1947,10 +2025,8 @@ extern int select_p_job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 			     uint32_t min_nodes, uint32_t max_nodes, 
 			     uint32_t req_nodes, int mode)
 {
-	bitstr_t *orig_map;
-	int max_run_job, j, sus_jobs, rc = EINVAL, prev_cnt = -1;
-	int min_share = 0, max_share = 0;
-	uint32_t save_mem = 0;
+	int max_share = 0, rc = EINVAL;
+	uint32_t *job_id;
 
 	xassert(bitmap);
 	if (job_ptr->details == NULL)
@@ -1975,7 +2051,7 @@ extern int select_p_job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 	if (mode != SELECT_MODE_TEST_ONLY) {
 		if (job_ptr->details->shared) {
 			max_share = job_ptr->part_ptr->max_share & 
-					~SHARED_FORCE;
+				    ~SHARED_FORCE;
 		} else	/* ((shared == 0) || (shared == (uint16_t) NO_VAL)) */
 			max_share = 1;
 	}
@@ -1983,56 +2059,27 @@ extern int select_p_job_test(struct job_record *job_ptr, bitstr_t *bitmap,
 	if (mode == SELECT_MODE_WILL_RUN) {
 		rc = _will_run_test(job_ptr, bitmap, min_nodes, max_nodes,
 				    max_share, req_nodes);
-		slurm_mutex_unlock(&cr_mutex);
-		return rc;
 	} else if (mode == SELECT_MODE_TEST_ONLY) {
-		min_share = NO_SHARE_LIMIT;
-		max_share = min_share + 1;
-		save_mem = job_ptr->details->job_min_memory;
-		job_ptr->details->job_min_memory = 0;
-	}
+		rc = _test_only(job_ptr, bitmap, min_nodes, max_nodes,
+				req_nodes);
+	} else if (mode == SELECT_MODE_RUN_NOW) {
+		rc = _run_now(job_ptr, bitmap, min_nodes, max_nodes,
+			      max_share, req_nodes);
+	} else
+		fatal("select_p_job_test: Mode %d is invalid", mode);
 
-	debug3("select/linear: job_test: job %u max_share %d avail nodes %u",
-		job_ptr->job_id, max_share, bit_set_count(bitmap));
-	orig_map = bit_copy(bitmap);
-	for (max_run_job=min_share; max_run_job<max_share; max_run_job++) {
-		bool last_iteration = (max_run_job == (max_share -1));
-		for (sus_jobs=0; ((sus_jobs<5) && (rc != SLURM_SUCCESS)); 
-		     sus_jobs++) {
-			if (last_iteration)
-				sus_jobs = NO_SHARE_LIMIT;
-			j = _job_count_bitmap(node_cr_ptr, job_ptr, 
-					      orig_map, bitmap, 
-					      max_run_job, 
-					      max_run_job + sus_jobs);
-			debug3("select/linear: job_test: found %d nodes for "
-			       "job %u", j, job_ptr->job_id);
-			if ((j == prev_cnt) || (j < min_nodes))
-				continue;
-			prev_cnt = j;
-			if ((mode == SELECT_MODE_RUN_NOW)
-			    && (max_run_job > 0)) {
-				/* We need to share. Try to find 
-				 * suitable job to share nodes with */
-				rc = _find_job_mate(job_ptr, bitmap, 
-						    min_nodes, 
-						    max_nodes, req_nodes);
-				if (rc == SLURM_SUCCESS)
-					break;
-			}
-			rc = _job_test(job_ptr, bitmap, min_nodes, max_nodes, 
-				       req_nodes);
-			if (rc == SLURM_SUCCESS)
-				break;
-			continue;
-		}
+	/* Preempt any needed jobs. Preempting job will start later. */
+	while (preempt_job_list &&
+	       (job_id = list_pop(preempt_job_list))) {
+		slurm_mutex_unlock(&cr_mutex);
+		/* job preemption must happen outside of cr_mutex so that 
+		 * the resource deallocation can take place */
+		job_preempt_remove(job_id[0]);
+		xfree(job_id);
+		slurm_mutex_lock(&cr_mutex);
 	}
-	bit_free(orig_map);
 	slurm_mutex_unlock(&cr_mutex);
-	if ((rc == SLURM_SUCCESS) && (mode == SELECT_MODE_RUN_NOW))
-		_build_select_struct(job_ptr, bitmap);
-	if (save_mem)
-		job_ptr->details->job_min_memory = save_mem;
+
 	return rc;
 }
 
@@ -2210,7 +2257,7 @@ extern int select_p_select_nodeinfo_set_all(time_t last_query_time)
 	static time_t last_set_all = 0;
 
 	/* only set this once when the last_node_update is newer than
-	   the last time we set things up. */
+	 * the last time we set things up. */
 	if(last_set_all && (last_node_update < last_set_all)) {
 		debug2("Node select info for set all hasn't "
 		       "changed since %d", 
@@ -2295,12 +2342,14 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo,
 }
 
 extern int select_p_select_jobinfo_get (select_jobinfo_t *jobinfo,
-				 enum select_jobdata_type data_type, void *data)
+					enum select_jobdata_type data_type, 
+					void *data)
 {
 	return SLURM_SUCCESS;
 }
 
-extern select_jobinfo_t *select_p_select_jobinfo_copy(select_jobinfo_t *jobinfo)
+extern select_jobinfo_t *select_p_select_jobinfo_copy(
+				select_jobinfo_t *jobinfo)
 {
 	return NULL;
 }
@@ -2377,9 +2426,6 @@ extern int select_p_reconfigure(void)
 	job_preemption_tested  = false;
 	_free_node_cr(node_cr_ptr);
 	node_cr_ptr = NULL;
-	if (step_cr_list)
-		list_destroy(step_cr_list);
-	step_cr_list = NULL;
 	_init_node_cr();
 	slurm_mutex_unlock(&cr_mutex);
 
diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c
index 28038e43c49dd04c0681f822207852fcf46e3b0d..195da28dfb341bd4473527423c05107bcb096914 100644
--- a/src/slurmctld/job_mgr.c
+++ b/src/slurmctld/job_mgr.c
@@ -7634,42 +7634,40 @@ _read_job_ckpt_file(char *ckpt_file, int *size_ptr)
  * Do not use this function for job suspend/resume. This is handled by the
  * gang module.
  */
-extern void job_preempt_remove(struct job_record *job_ptr)
+extern void job_preempt_remove(uint32_t job_id)
 {
 	int rc = SLURM_SUCCESS;
 	uint16_t preempt_mode = slurm_get_preempt_mode();
 	checkpoint_msg_t ckpt_msg;
 
+	preempt_mode &= (~PREEMPT_MODE_GANG);
 	if (preempt_mode == PREEMPT_MODE_REQUEUE) {
-		rc = job_requeue(0, job_ptr->job_id, -1);
+		rc = job_requeue(0, job_id, -1);
 		if (rc == SLURM_SUCCESS) {
-			info("preempted job %u has been requeued", 
-			     job_ptr->job_id);
+			info("preempted job %u has been requeued", job_id);
 		}
 	} else if (preempt_mode == PREEMPT_MODE_CANCEL) {
-		(void) job_signal(job_ptr->job_id, SIGKILL, 0, 0);
+		(void) job_signal(job_id, SIGKILL, 0, 0);
 	} else if (preempt_mode == PREEMPT_MODE_CHECKPOINT) {
 		memset(&ckpt_msg, 0, sizeof(checkpoint_msg_t));
 		ckpt_msg.op        = CHECK_VACATE;
-		ckpt_msg.job_id    = job_ptr->job_id;
+		ckpt_msg.job_id    = job_id;
 		rc = job_checkpoint(&ckpt_msg, 0, -1);
 		if (rc == SLURM_SUCCESS) {
-			info("preempted job %u has been checkpointed", 
-			     job_ptr->job_id);
+			info("preempted job %u has been checkpointed", job_id);
 		}
 	} else {
-		fatal("Invalid preempt_mode: %u", preempt_mode);
+		error("Invalid preempt_mode: %u", preempt_mode);
 		return;
 	}
 
 	if (rc != SLURM_SUCCESS) {
-		rc = job_signal(job_ptr->job_id, SIGKILL, 0, 0);
-		if (rc == SLURM_SUCCESS) {
-			info("preempted job %u had to be killed", 
-			     job_ptr->job_id);
-		} else {
+		rc = job_signal(job_id, SIGKILL, 0, 0);
+		if (rc == SLURM_SUCCESS)
+			info("preempted job %u had to be killed", job_id);
+		else {
 			info("preempted job %u kill failure %s", 
-			     job_ptr->job_id, slurm_strerror(rc));
+			     job_id, slurm_strerror(rc));
 		}
 	}
 }
diff --git a/src/slurmctld/job_scheduler.c b/src/slurmctld/job_scheduler.c
index 45dd95792c9d0dd40267e3d86396628ffd5fcd24..5de42c1624ca6e46211e5003fcfd4f22fb6545e7 100644
--- a/src/slurmctld/job_scheduler.c
+++ b/src/slurmctld/job_scheduler.c
@@ -298,8 +298,8 @@ extern int schedule(void)
 		if (strcmp(sched_type, "sched/backfill") == 0)
 			backfill_sched = true;
 		/* Disable avoiding of fragmentation with sched/wiki */
-		if ((strcmp(sched_type, "sched/wiki") == 0)
-		||  (strcmp(sched_type, "sched/wiki2") == 0))
+		if ((strcmp(sched_type, "sched/wiki") == 0) ||
+		    (strcmp(sched_type, "sched/wiki2") == 0))
 			wiki_sched = true;
 		xfree(sched_type);
 		sched_test = true;
diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c
index ea2d9a2aee7873225d2e961e582e63861b742ce1..9e59e03fc22e6b7e964baf376d36326ed323b2b9 100644
--- a/src/slurmctld/node_scheduler.c
+++ b/src/slurmctld/node_scheduler.c
@@ -70,6 +70,7 @@
 #include "src/slurmctld/job_scheduler.h"
 #include "src/slurmctld/licenses.h"
 #include "src/slurmctld/node_scheduler.h"
+#include "src/slurmctld/preempt.h"
 #include "src/slurmctld/proc_req.h"
 #include "src/slurmctld/reservation.h"
 #include "src/slurmctld/sched_plugin.h"
@@ -594,7 +595,7 @@ _pick_best_nodes(struct node_set *node_set_ptr, int node_set_size,
 	bool runable_avail = false;	/* Job can run with available nodes */
 	bool tried_sched = false;	/* Tried to schedule with avail nodes */
 	static uint32_t cr_enabled = NO_VAL;
-	bool sched_gang = false;
+	bool preempt_flag = false;
 	select_type_plugin_info_t cr_type = SELECT_TYPE_INFO_NONE; 
 	int shared = 0, select_mode;
 
@@ -626,7 +627,7 @@ _pick_best_nodes(struct node_set *node_set_ptr, int node_set_size,
 	/* If job preemption is enabled, then do NOT limit the set of available
 	 * nodes by their current 'sharable' or 'idle' setting */
 	if (slurm_get_preempt_mode() != PREEMPT_MODE_OFF)
-		sched_gang = true;		
+		preempt_flag = true;		
 
 	if (cr_enabled) {
 		/* Determine which nodes might be used by this job based upon
@@ -679,7 +680,7 @@ _pick_best_nodes(struct node_set *node_set_ptr, int node_set_size,
 			}
 		}
 
-		if (!sched_gang) {
+		if (!preempt_flag) {
 			if (shared) {
 				if (!bit_super_set(job_ptr->details->
 						   req_node_bitmap, 
@@ -743,7 +744,7 @@ _pick_best_nodes(struct node_set *node_set_ptr, int node_set_size,
 					partially_idle_node_bitmap);
 			}
 
-			if (!sched_gang) {
+			if (!preempt_flag) {
 				if (shared) {
 					bit_and(node_set_ptr[i].my_bitmap,
 						share_node_bitmap);
@@ -764,10 +765,10 @@ _pick_best_nodes(struct node_set *node_set_ptr, int node_set_size,
 			avail_nodes = bit_set_count(avail_bitmap);
 			tried_sched = false;	/* need to test these nodes */
 
-			if (shared && ((i+1) < node_set_size)	&& 
+			if ((shared || preempt_flag)	&& 
+			    ((i+1) < node_set_size)	&& 
 			    (node_set_ptr[i].weight == 
-			     node_set_ptr[i+1].weight)		&&
-			    ((i+1) < node_set_size)) {
+			     node_set_ptr[i+1].weight)) {
 				/* Keep accumulating so we can pick the
 				 * most lightly loaded nodes */
 				continue;
@@ -1468,7 +1469,7 @@ static int _build_node_list(struct job_record *job_ptr,
 
 		if (has_xor) {
 			tmp_feature = _valid_features(job_ptr->details, 
-						config_ptr);
+						      config_ptr);
 			if (tmp_feature == NULL) {
 				FREE_NULL_BITMAP(node_set_ptr[node_set_inx].
 						 my_bitmap);
diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h
index ff8c6002185f8663c56a19a9a64c93d06c314696..0f35806c96f4974730843be114290e2ca14ea9b3 100644
--- a/src/slurmctld/slurmctld.h
+++ b/src/slurmctld/slurmctld.h
@@ -993,7 +993,7 @@ extern int job_node_ready(uint32_t job_id, int *ready);
  * Do not use this function for job suspend/resume. This is handled by the
  * gang module.
  */
-extern void job_preempt_remove(struct job_record *job_ptr);
+extern void job_preempt_remove(uint32_t job_id);
 
 /*
  * job_restart - Restart a batch job from checkpointed state