diff --git a/NEWS b/NEWS
index d7c7d8f4844427071ef7e53fd158031179ec8d11..9d0ffc18caf092413a4a2baf4c2b35ff57a569fb 100644
--- a/NEWS
+++ b/NEWS
@@ -247,6 +247,8 @@ documents those changes that are of interest to users and administrators.
     'assoc_limit_stop'.
  -- Allow a lower version client command to talk to a higher version contoller
     using the multi-cluster options (e.g. squeue -M<clsuter>).
+ -- slurmctld/agent race condition fix: Prevent job launch while PrologSlurmctld
+    daemon is running or node boot in progress.
 
 * Changes in Slurm 16.05.8
 ==========================
diff --git a/src/plugins/sched/backfill/backfill.c b/src/plugins/sched/backfill/backfill.c
index 6259517226355e23d080e6ceefb57bce7265db91..f1842065f228c3ea812e9bc09a36c66ec49b2e41 100644
--- a/src/plugins/sched/backfill/backfill.c
+++ b/src/plugins/sched/backfill/backfill.c
@@ -1934,8 +1934,7 @@ static int _start_job(struct job_record *job_ptr, bitstr_t *resv_bitmap)
 		power_g_job_start(job_ptr);
 		if (job_ptr->batch_flag == 0)
 			srun_allocate(job_ptr->job_id);
-		else if ((job_ptr->details == NULL) ||
-			 (job_ptr->details->prolog_running == 0))
+		else if (!IS_JOB_CONFIGURING(job_ptr))
 			launch_job(job_ptr);
 		slurmctld_diag_stats.backfilled_jobs++;
 		slurmctld_diag_stats.last_backfilled_jobs++;
diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c
index 6356996f1e0cdba2690d0de67236c911ba13e585..c985b8481e9a8cc050bf4eb5e57f5052d440f829 100644
--- a/src/slurmctld/agent.c
+++ b/src/slurmctld/agent.c
@@ -171,7 +171,12 @@ typedef struct mail_info {
 	char *message;
 } mail_info_t;
 
-static void _sig_handler(int dummy);
+typedef struct retry_args {
+	bool mail_too;			/* Time to wait between retries */
+	int min_wait;			/* Send pending email too */
+} retry_args_t;
+
+static void *_agent_retry(void *arg);
 static int  _batch_launch_defer(queued_request_t *queued_req_ptr);
 static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type);
 static void _list_delete_retry(void *retry_entry);
@@ -182,8 +187,9 @@ static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
 		int no_resp_cnt, int retry_cnt);
 static void _purge_agent_args(agent_arg_t *agent_arg_ptr);
 static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
-static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
-			  int *count, int *spot);
+static int  _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
+			   int *count, int *spot);
+static void _sig_handler(int dummy);
 static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr);
 static void *_thread_per_group_rpc(void *args);
 static int   _valid_agent_arg(agent_arg_t *agent_arg_ptr);
@@ -1249,17 +1255,41 @@ static void _list_delete_retry(void *retry_entry)
 }
 
 /*
- * agent_retry - Agent for retrying pending RPCs. One pending request is
+ * agent_retry - Spawn agent for retrying pending RPCs. One pending request is
  *	issued if it has been pending for at least min_wait seconds
  * IN min_wait - Minimum wait time between re-issue of a pending RPC
  * IN mail_too - Send pending email too, note this performed using a
  *	fork/waitpid, so it can take longer than just creating a pthread
  *	to send RPCs
- * RET count of queued requests remaining
  */
-extern int agent_retry (int min_wait, bool mail_too)
+extern void agent_retry(int min_wait, bool mail_too)
+{
+	pthread_attr_t thread_attr;
+	pthread_t thread_id = (pthread_t) 0;
+	retry_args_t *retry_args_ptr;
+
+	retry_args_ptr = xmalloc(sizeof(struct retry_args));
+	retry_args_ptr->mail_too = mail_too;
+	retry_args_ptr->min_wait = min_wait;
+
+	slurm_attr_init(&thread_attr);
+	if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE))
+		error("pthread_attr_setdetachstate error %m");
+	if (pthread_create(&thread_id, &thread_attr, _agent_retry,
+			   (void *) retry_args_ptr)) {
+		error("pthread_create error %m");
+		xfree(retry_args_ptr);
+	}
+	slurm_attr_destroy(&thread_attr);
+}
+
+/* Do the work requested by agent_retry (retry pending RPCs).
+ * This is a separate thread so the job records can be locked */
+static void *_agent_retry(void *arg)
 {
-	int list_size = 0, rc;
+	retry_args_t *retry_args_ptr = (retry_args_t *) arg;
+	bool mail_too;
+	int min_wait, rc;
 	time_t now = time(NULL);
 	queued_request_t *queued_req_ptr = NULL;
 	agent_arg_t *agent_arg_ptr = NULL;
@@ -1267,17 +1297,26 @@ extern int agent_retry (int min_wait, bool mail_too)
 	pthread_t thread_mail = 0;
 	pthread_attr_t attr_mail;
 	mail_info_t *mi = NULL;
+	/* Write lock on jobs */
+	slurmctld_lock_t job_write_lock =
+		{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
 
+	mail_too = retry_args_ptr->mail_too;
+	min_wait = retry_args_ptr->min_wait;
+	xfree(arg);
+
+	lock_slurmctld(job_write_lock);
 	slurm_mutex_lock(&retry_mutex);
 	if (retry_list) {
 		static time_t last_msg_time = (time_t) 0;
-		uint32_t msg_type[5] = {0, 0, 0, 0, 0}, i = 0;
+		uint32_t msg_type[5] = {0, 0, 0, 0, 0};
+		int i = 0, list_size;
 		list_size = list_count(retry_list);
 		if ((list_size > 100) &&
 		    (difftime(now, last_msg_time) > 300)) {
 			/* Note sizable backlog of work */
 			info("slurmctld: agent retry_list size is %d",
-				list_size);
+			     list_size);
 			retry_iter = list_iterator_create(retry_list);
 			while ((queued_req_ptr = (queued_request_t *)
 					list_next(retry_iter))) {
@@ -1299,13 +1338,13 @@ extern int agent_retry (int min_wait, bool mail_too)
 		/* too much work already */
 		slurm_mutex_unlock(&agent_cnt_mutex);
 		slurm_mutex_unlock(&retry_mutex);
-		return list_size;
+		unlock_slurmctld(job_write_lock);
+		return NULL;
 	}
 	slurm_mutex_unlock(&agent_cnt_mutex);
 
 	if (retry_list) {
 		/* first try to find a new (never tried) record */
-
 		retry_iter = list_iterator_create(retry_list);
 		while ((queued_req_ptr = (queued_request_t *)
 				list_next(retry_iter))) {
@@ -1315,14 +1354,12 @@ extern int agent_retry (int min_wait, bool mail_too)
 						  agent_arg_ptr);
 				xfree(queued_req_ptr);
 				list_remove(retry_iter);
-				list_size--;
 				continue;
 			}
 			if (rc > 0)
 				continue;
  			if (queued_req_ptr->last_attempt == 0) {
 				list_remove(retry_iter);
-				list_size--;
 				break;
 			}
 		}
@@ -1344,7 +1381,6 @@ extern int agent_retry (int min_wait, bool mail_too)
 						  agent_arg_ptr);
 				xfree(queued_req_ptr);
 				list_remove(retry_iter);
-				list_size--;
 				continue;
 			}
 			if (rc > 0)
@@ -1352,13 +1388,13 @@ extern int agent_retry (int min_wait, bool mail_too)
 			age = difftime(now, queued_req_ptr->last_attempt);
 			if (age > min_wait) {
 				list_remove(retry_iter);
-				list_size--;
 				break;
 			}
 		}
 		list_iterator_destroy(retry_iter);
 	}
 	slurm_mutex_unlock(&retry_mutex);
+	unlock_slurmctld(job_write_lock);
 
 	if (queued_req_ptr) {
 		agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
@@ -1394,7 +1430,7 @@ extern int agent_retry (int min_wait, bool mail_too)
 		slurm_mutex_unlock(&agent_cnt_mutex);
 	}
 
-	return list_size;
+	return NULL;
 }
 
 /*
@@ -1822,7 +1858,7 @@ static int _batch_launch_defer(queued_request_t *queued_req_ptr)
 	agent_arg_t *agent_arg_ptr;
 	batch_job_launch_msg_t *launch_msg_ptr;
 	time_t now = time(NULL);
-	struct job_record  *job_ptr;
+	struct job_record *job_ptr;
 	int nodes_ready = 0, tmp = 0;
 
 	agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
@@ -1844,6 +1880,9 @@ static int _batch_launch_defer(queued_request_t *queued_req_ptr)
 		return -1;	/* job cancelled while waiting */
 	}
 
+	if (job_ptr->details && job_ptr->details->prolog_running)
+		return 1;
+
 	if (job_ptr->wait_all_nodes) {
 		(void) job_node_ready(launch_msg_ptr->job_id, &tmp);
 		if (tmp == (READY_JOB_STATE | READY_NODE_STATE)) {
@@ -1852,9 +1891,6 @@ static int _batch_launch_defer(queued_request_t *queued_req_ptr)
 			    !xstrcmp(launch_msg_ptr->alias_list, "TBD")) {
 				/* Update launch RPC with correct node
 				 * aliases */
-				struct job_record *job_ptr;
-				job_ptr = find_job_record(launch_msg_ptr->
-							  job_id);
 				xfree(launch_msg_ptr->alias_list);
 				launch_msg_ptr->alias_list = xstrdup(job_ptr->
 								     alias_list);
@@ -1886,7 +1922,8 @@ static int _batch_launch_defer(queued_request_t *queued_req_ptr)
 	}
 
 	if (nodes_ready) {
-		job_config_fini(job_ptr);
+		if (IS_JOB_CONFIGURING(job_ptr))
+			job_config_fini(job_ptr);
 		queued_req_ptr->last_attempt = (time_t) 0;
 		return 0;
 	}
diff --git a/src/slurmctld/agent.h b/src/slurmctld/agent.h
index dd831c223467d1ba6e02f9aedfedee7753e6a85e..35a6c5ff5be3a07140ccf4f41830d4c4feb1c01f 100644
--- a/src/slurmctld/agent.h
+++ b/src/slurmctld/agent.h
@@ -85,11 +85,10 @@ extern void agent_queue_request(agent_arg_t *agent_arg_ptr);
  *	issued if it has been pending for at least min_wait seconds
  * IN min_wait - Minimum wait time between re-issue of a pending RPC
  * IN mail_too - Send pending email too, note this performed using a
- *		fork/waitpid, so it can take longer than just creating
- *		a pthread to send RPCs
- * RET count of queued requests remaining
+ *	fork/waitpid, so it can take longer than just creating a pthread
+ *	to send RPCs
  */
-extern int agent_retry (int min_wait, bool mail_too);
+extern void agent_retry(int min_wait, bool mail_too);
 
 /* agent_purge - purge all pending RPC requests */
 extern void agent_purge (void);
diff --git a/src/slurmctld/job_scheduler.c b/src/slurmctld/job_scheduler.c
index 5076cdcb4a68677e4be0d7ee34aace5a9ba19b81..ae639e653bdf118c058bb425f6be2d0dc3763416 100644
--- a/src/slurmctld/job_scheduler.c
+++ b/src/slurmctld/job_scheduler.c
@@ -913,8 +913,7 @@ next_part:		part_ptr = (struct part_record *)
 			info("sched: Allocate JobId=%u Partition=%s NodeList=%s #CPUs=%u",
 			     job_ptr->job_id, job_ptr->part_ptr->name,
 			     job_ptr->nodes, job_ptr->total_cpus);
-			if ((job_ptr->details->prolog_running == 0) &&
-			    ((job_ptr->bit_flags & NODE_REBOOT) == 0)) {
+			if (!IS_JOB_CONFIGURING(job_ptr)) {
 				launch_msg = build_launch_job_msg(job_ptr,
 							msg->protocol_version);
 			}
@@ -1918,10 +1917,8 @@ next_task:
 #endif
 			if (job_ptr->batch_flag == 0)
 				srun_allocate(job_ptr->job_id);
-			else if ((job_ptr->details->prolog_running == 0) &&
-			         ((job_ptr->bit_flags & NODE_REBOOT) == 0)) {
+			else if (!IS_JOB_CONFIGURING(job_ptr))
 				launch_job(job_ptr);
-			}
 			rebuild_job_part_list(job_ptr);
 			job_cnt++;
 			if (is_job_array_head &&
diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c
index 591d50b36582f12df3498dd9c9638aa2a68b4ec5..a9170cda36a61d90c133d947b1309017124e8f18 100644
--- a/src/slurmctld/proc_req.c
+++ b/src/slurmctld/proc_req.c
@@ -3593,8 +3593,7 @@ static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg)
 				reject_job = true;
 				goto unlock;
 			}
-			if (job_ptr->details &&
-			    job_ptr->details->prolog_running) {
+			if (IS_JOB_CONFIGURING(job_ptr)) {
 				error_code = EAGAIN;
 				reject_job = true;
 				goto unlock;
diff --git a/src/slurmctld/step_mgr.c b/src/slurmctld/step_mgr.c
index 9ac7f4f5cb16409c1bcbc6abe12b89ef5f4a3cd4..b91592175f2ab42d696a49eb4440bc37f777fcc4 100644
--- a/src/slurmctld/step_mgr.c
+++ b/src/slurmctld/step_mgr.c
@@ -1029,11 +1029,12 @@ _pick_step_nodes (struct job_record  *job_ptr,
 				return NULL;
 			}
 		}
-		if (job_ptr->details
-		    && job_ptr->details->prolog_running == 0) {
+		if (IS_JOB_CONFIGURING(job_ptr)) {
 			info("%s: Configuration for job %u is complete",
 			      __func__, job_ptr->job_id);
 			job_config_fini(job_ptr);
+			if (job_ptr->bit_flags & NODE_REBOOT)
+				job_validate_mem(job_ptr);
 		}
 	}