From ba092c09376ccd3b560277faccadc11edfcb7bd6 Mon Sep 17 00:00:00 2001
From: Moe Jette <jette1@llnl.gov>
Date: Mon, 16 Sep 2002 16:48:54 +0000
Subject: [PATCH] Define slurm_job_step_create API. Add task distribution
 option to job_step_create RPC. Add uid testing to RPC processing "#ifdef
 HAVE_AUTHD"

---
 src/common/slurm_errno.c         |   2 +-
 src/common/slurm_protocol_defs.c |   2 +
 src/common/slurm_protocol_defs.h |   6 +
 src/common/slurm_protocol_pack.c |   2 +
 src/common/slurm_protocol_util.c |   1 +
 src/slurmctld/controller.c       | 207 +++++++++++++++++++++++--------
 src/slurmctld/job_mgr.c          |  35 +++++-
 src/slurmctld/slurmctld.h        |   8 +-
 src/slurmctld/step_mgr.c         |  14 ++-
 9 files changed, 211 insertions(+), 66 deletions(-)

diff --git a/src/common/slurm_errno.c b/src/common/slurm_errno.c
index d59f7ddab3f..0ec5a0e61c7 100644
--- a/src/common/slurm_errno.c
+++ b/src/common/slurm_errno.c
@@ -69,7 +69,7 @@ static slurm_errtab_t slurm_errtab[] = {
 	{ ESLURM_ERROR_ON_DESC_TO_RECORD_COPY,	"Unable to create job record, try again" },
 	{ ESLURM_JOB_MISSING_SIZE_SPECIFICATION,"Job size specification needs to be provided" },
 	{ ESLURM_JOB_SCRIPT_MISSING,	 	"Job script not specified" },
-	{ ESLURM_USER_ID_MISSING , 		"User id missing" },
+	{ ESLURM_USER_ID_MISSING , 		"User id is missing or invalid" },
 	{ ESLURM_JOB_NAME_TOO_LONG,		"Job name too long" },
 	{ ESLURM_DUPLICATE_JOB_ID , 		"Duplicate job id" },
 	{ ESLURM_PATHNAME_TOO_LONG , 		"Pathname of a file or directory too long" },
diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c
index 3820e482307..a8ce6d5cc0f 100644
--- a/src/common/slurm_protocol_defs.c
+++ b/src/common/slurm_protocol_defs.c
@@ -33,6 +33,8 @@
 #  include <stdlib.h>
 #endif
 
+#include <stdio.h>
+
 #include <src/common/slurm_protocol_defs.h>
 #include <src/common/xmalloc.h>
 
diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h
index 42dd60fe9c6..8ae160929b2 100644
--- a/src/common/slurm_protocol_defs.h
+++ b/src/common/slurm_protocol_defs.h
@@ -78,6 +78,11 @@ enum node_states {
 };
 #define NODE_STATE_NO_RESPOND (0x8000)
 
+enum task_dist_states {
+	SLURM_DIST_CYCLIC,	/* distribute tasks one per node, round robin */
+	SLURM_DIST_BLOCK	/* distribute tasks filling node by node */
+};
+
 /* last entry must be JOB_END, keep in sync with job_state_string    	*/
 enum job_states {
 	JOB_PENDING,		/* queued waiting for initiation */
@@ -234,6 +239,7 @@ typedef struct job_step_specs {
 	uint32_t node_count;
 	uint32_t cpu_count;
 	uint16_t relative;
+	uint16_t task_dist;	/* see task_dist_states for values */
 	char *node_list;
 } job_step_specs_t;
 
diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c
index 52435b99632..ffe26b08a86 100644
--- a/src/common/slurm_protocol_pack.c
+++ b/src/common/slurm_protocol_pack.c
@@ -707,6 +707,7 @@ void pack_job_step_create_request_msg ( job_step_create_request_msg_t* msg , voi
 	pack32 ( msg -> node_count, ( void ** ) buffer , length ) ;
 	pack32 ( msg -> cpu_count, ( void ** ) buffer , length ) ;
 	pack16 ( msg -> relative, ( void ** ) buffer , length ) ;
+	pack16 ( msg -> task_dist, ( void ** ) buffer , length ) ;
 	packstr ( msg -> node_list, ( void ** ) buffer , length ) ;
 }
 
@@ -727,6 +728,7 @@ int unpack_job_step_create_request_msg ( job_step_create_request_msg_t** msg , v
 	unpack32 ( &( tmp_ptr -> node_count), ( void ** ) buffer , length ) ;
 	unpack32 ( &( tmp_ptr -> cpu_count), ( void ** ) buffer , length ) ;
 	unpack16 ( &( tmp_ptr -> relative), ( void ** ) buffer , length ) ;
+	unpack16 ( &( tmp_ptr -> task_dist), ( void ** ) buffer , length ) ;
 	unpackstr_xmalloc ( &( tmp_ptr -> node_list ), &uint16_tmp,  ( void ** ) buffer , length ) ;
 
 	*msg = tmp_ptr;
diff --git a/src/common/slurm_protocol_util.c b/src/common/slurm_protocol_util.c
index 9700039c49b..f92454c2849 100644
--- a/src/common/slurm_protocol_util.c
+++ b/src/common/slurm_protocol_util.c
@@ -1,3 +1,4 @@
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <assert.h>
diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c
index 7f239e9ce55..3f930956fbd 100644
--- a/src/slurmctld/controller.c
+++ b/src/slurmctld/controller.c
@@ -49,8 +49,10 @@
 #include <src/common/xstring.h>
 #include <src/slurmctld/locks.h>
 #include <src/slurmctld/slurmctld.h>
-
 #include <src/common/credential_utils.h>
+#ifdef	HAVE_AUTHD
+#include <src/common/authentication.h>
+#endif
 
 #define BUF_SIZE 1024
 #define DEFAULT_DAEMONIZE 0
@@ -761,19 +763,23 @@ void
 slurm_rpc_job_step_cancel ( slurm_msg_t * msg )
 {
 	/* init */
-	int error_code;
+	int error_code = 0;
 	clock_t start_time;
 	job_step_id_msg_t * job_step_id_msg = ( job_step_id_msg_t * ) msg-> data ;
 	/* Locks: Write job, write node */
 	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
-
+	int uid = 0;
+	
 	start_time = clock ();
 	debug ("Processing RPC: REQUEST_CANCEL_JOB_STEP");
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+#endif
 	lock_slurmctld (job_write_lock);
 
 	/* do RPC call */
 	if (job_step_id_msg->job_step_id == NO_VAL) {
-		error_code = job_cancel ( job_step_id_msg->job_id );
+		error_code = job_cancel ( job_step_id_msg->job_id, uid );
 		unlock_slurmctld (job_write_lock);
 
 		/* return result */
@@ -797,7 +803,8 @@ slurm_rpc_job_step_cancel ( slurm_msg_t * msg )
 	}
 	else {
 		error_code = job_step_cancel (  job_step_id_msg->job_id , 
-						job_step_id_msg->job_step_id);
+						job_step_id_msg->job_step_id ,
+						uid );
 		unlock_slurmctld (job_write_lock);
 
 		/* return result */
@@ -831,15 +838,19 @@ slurm_rpc_job_step_complete ( slurm_msg_t * msg )
 	job_step_id_msg_t * job_step_id_msg = ( job_step_id_msg_t * ) msg-> data ;
 	/* Locks: Write job, write node */
 	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
+	int uid = 0;
 
 	/* init */
 	start_time = clock ();
 	debug ("Processing RPC: REQUEST_COMPLETE_JOB_STEP");
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+#endif
 	lock_slurmctld (job_write_lock);
 
 	/* do RPC call */
 	if (job_step_id_msg->job_step_id == NO_VAL) {
-		error_code = job_complete ( job_step_id_msg->job_id );
+		error_code = job_complete ( job_step_id_msg->job_id, uid );
 		unlock_slurmctld (job_write_lock);
 
 		/* return result */
@@ -860,7 +871,7 @@ slurm_rpc_job_step_complete ( slurm_msg_t * msg )
 	}
 	else {
 		error_code = job_step_complete (  job_step_id_msg->job_id, 
-						job_step_id_msg->job_step_id);
+						job_step_id_msg->job_step_id, uid);
 		unlock_slurmctld (job_write_lock);
 
 		/* return result */
@@ -1059,7 +1070,7 @@ void
 slurm_rpc_submit_batch_job ( slurm_msg_t * msg )
 {
 	/* init */
-	int error_code;
+	int error_code = 0;
 	clock_t start_time;
 	uint32_t job_id ;
 	slurm_msg_t response_msg ;
@@ -1067,17 +1078,30 @@ slurm_rpc_submit_batch_job ( slurm_msg_t * msg )
 	job_desc_msg_t * job_desc_msg = ( job_desc_msg_t * ) msg-> data ;
 	/* Locks: Write job, read node, read partition */
 	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
+#ifdef	HAVE_AUTHD
+	int uid;
+#endif
 
 	start_time = clock ();
 	debug ("Processing RPC: REQUEST_SUBMIT_BATCH_JOB");
 
 	/* do RPC call */
 	dump_job_desc(job_desc_msg);
-	lock_slurmctld (job_write_lock);
-	error_code = job_allocate (job_desc_msg, &job_id, (char **) NULL, 
-		(uint16_t *) NULL, (uint32_t **) NULL, (uint32_t **) NULL,
-		false, false, false);
-	unlock_slurmctld (job_write_lock);
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if ((uid != job_desc_msg.user_ID) &&
+	    (uid != 0)) {
+		error_code = ESLURM_USER_ID_MISSING;
+		error ("Bogus SUBMIT_JOB from uid %d", uid);
+	}
+#endif
+	if (error_code == 0) {
+		lock_slurmctld (job_write_lock);
+		error_code = job_allocate (job_desc_msg, &job_id, (char **) NULL, 
+			(uint16_t *) NULL, (uint32_t **) NULL, (uint32_t **) NULL,
+			false, false, false);
+		unlock_slurmctld (job_write_lock);
+	}
 
 	/* return result */
 	if (error_code)
@@ -1105,7 +1129,7 @@ void
 slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate )
 {
 	/* init */
-	int error_code;
+	int error_code = 0;
 	slurm_msg_t response_msg ;
 	clock_t start_time;
 	job_desc_msg_t * job_desc_msg = ( job_desc_msg_t * ) msg-> data ;
@@ -1116,6 +1140,9 @@ slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate )
 	resource_allocation_response_msg_t alloc_msg ;
 	/* Locks: Write job, write node, read partition */
 	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK };
+#ifdef	HAVE_AUTHD
+	int uid;
+#endif
 
 	start_time = clock ();
 	if (immediate)
@@ -1125,11 +1152,21 @@ slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate )
 
 	/* do RPC call */
 	dump_job_desc (job_desc_msg);
-	lock_slurmctld (job_write_lock);
-	error_code = job_allocate(job_desc_msg, &job_id, 
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if ((uid != job_desc_msg.user_ID) && 
+	    (uid != 0)) {
+		error_code = ESLURM_USER_ID_MISSING;
+		error ("Bogus RESOURCE_ALLOCATE from uid %d", uid);
+	}	
+#endif
+	if (error_code == 0) {
+		lock_slurmctld (job_write_lock);
+		error_code = job_allocate (job_desc_msg, &job_id, 
 			&node_list_ptr, &num_cpu_groups, &cpus_per_node, &cpu_count_reps, 
 			immediate , false, true );
-	unlock_slurmctld (job_write_lock);
+		unlock_slurmctld (job_write_lock);
+	}
 
 	/* return result */
 	if (error_code)
@@ -1166,7 +1203,7 @@ void
 slurm_rpc_allocate_and_run ( slurm_msg_t * msg )
 {
         /* init */
-        int error_code;
+        int error_code = 0;
         slurm_msg_t response_msg ;
         clock_t start_time;
         job_desc_msg_t * job_desc_msg = ( job_desc_msg_t * ) msg-> data ;
@@ -1179,16 +1216,28 @@ slurm_rpc_allocate_and_run ( slurm_msg_t * msg )
 	job_step_create_request_msg_t req_step_msg;
 	/* Locks: Write job, write node, read partition */
 	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK };
-
+#ifdef	HAVE_AUTHD
+	int uid;
+#endif
         start_time = clock ();
 	debug ("Processing RPC: REQUEST_ALLOCATE_AND_RUN_JOB_STEP");
 
         /* do RPC call */
         dump_job_desc (job_desc_msg);
-	lock_slurmctld (job_write_lock);
-        error_code = job_allocate(job_desc_msg, &job_id,
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if ((uid != job_desc_msg.user_ID) &&
+	    (uid != 0)) {
+		error_code = ESLURM_USER_ID_MISSING;
+		error ("Bogus ALLOCATE_AND_RUN RPC from uid %d", uid);
+	}
+#endif
+	if (error_code == 0) {
+		lock_slurmctld (job_write_lock);
+        	error_code = job_allocate(job_desc_msg, &job_id,
                         &node_list_ptr, &num_cpu_groups, &cpus_per_node, &cpu_count_reps,
                         true , false, true );
+	}
 
         /* return result */
         if (error_code) {
@@ -1240,7 +1289,7 @@ slurm_rpc_allocate_and_run ( slurm_msg_t * msg )
 void slurm_rpc_job_will_run ( slurm_msg_t * msg )
 {
 	/* init */
-	int error_code;
+	int error_code = 0;
 	clock_t start_time;
 	uint16_t num_cpu_groups = 0;
 	uint32_t * cpus_per_node = NULL, * cpu_count_reps = NULL;
@@ -1249,21 +1298,34 @@ void slurm_rpc_job_will_run ( slurm_msg_t * msg )
 	char * node_list_ptr = NULL;
 	/* Locks: Write job, read node, read partition */
 	slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
+#ifdef	HAVE_AUTHD
+	int uid;
+#endif
 
 	start_time = clock ();
 	debug ("Processing RPC: REQUEST_JOB_WILL_RUN");
 
 	/* do RPC call */
 	dump_job_desc(job_desc_msg);
-	lock_slurmctld (job_write_lock);
-	error_code = job_allocate(job_desc_msg, &job_id, 
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if ((uid != job_desc_msg.user_ID) &&
+	    (uid != 0)) {
+		error_code = ESLURM_USER_ID_MISSING;
+		error ("Bogus JOB_WILL_RUN RPC from uid %d", uid);
+	}
+#endif
+
+	if (error_code == 0) {
+		lock_slurmctld (job_write_lock);
+		error_code = job_allocate(job_desc_msg, &job_id, 
 			&node_list_ptr, &num_cpu_groups, &cpus_per_node, &cpu_count_reps, 
 			false , true, true );
-	unlock_slurmctld (job_write_lock);
+		unlock_slurmctld (job_write_lock);
+	}
 
 	/* return result */
-	if (error_code)
-	{
+	if (error_code) {
 		info ("slurm_rpc_job_will_run error %d, time=%ld",
 				error_code, (long) (clock () - start_time));
 		slurm_send_rc_msg ( msg , error_code );
@@ -1281,27 +1343,38 @@ void
 slurm_rpc_reconfigure_controller ( slurm_msg_t * msg )
 {
 	/* init */
-	int error_code;
+	int error_code = 0;
 	clock_t start_time;
 	/* Locks: Write configuration, write job, write node, write partition */
 	slurmctld_lock_t config_write_lock = { WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK };
+#ifdef HAVE_AUTHD
+	int uid;
+#endif
 
 	start_time = clock ();
 	debug ("Processing RPC: REQUEST_RECONFIGURE");
-/* must be user root */
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if (uid != 0) {
+		error ("Bogus RECONFIGURE RPC from uid %d", uid);
+		error_code = ESLURM_USER_ID_MISSING;
+	}
+#endif
 
 	/* do RPC call */
-	lock_slurmctld (config_write_lock);
-	error_code = read_slurm_conf (0);
-	if (error_code == 0)
-		reset_job_bitmaps ();
-
-	if (daemonize) {
-		if (chdir (slurmctld_conf.state_save_location))
-			fatal ("chdir to %s error %m", slurmctld_conf.state_save_location);
+	if (error_code == 0) {
+		lock_slurmctld (config_write_lock);
+		error_code = read_slurm_conf (0);
+		if (error_code == 0)
+			reset_job_bitmaps ();
+
+		if (daemonize) {
+			if (chdir (slurmctld_conf.state_save_location))
+				fatal ("chdir to %s error %m", slurmctld_conf.state_save_location);
+		}
+		unlock_slurmctld (config_write_lock);
 	}
-	unlock_slurmctld (config_write_lock);
-
+	
 	/* return result */
 	if (error_code)
 	{
@@ -1324,13 +1397,25 @@ slurm_rpc_reconfigure_controller ( slurm_msg_t * msg )
 void 
 slurm_rpc_shutdown_controller ( slurm_msg_t * msg )
 {
+	int error_code = 0;
 	shutdown_msg_t * shutdown_msg = (shutdown_msg_t *) msg->data;
-/* must be user root */
+#ifdef	HAVE_AUTHD
+	int uid;
+#endif
 
 	/* do RPC call */
 	debug ("Performing RPC: REQUEST_SHUTDOWN");
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if (uid != 0) {
+		error ("Bogus SHUTDOWN RPC from uid %d", uid);
+		error_code = ESLURM_USER_ID_MISSING;
+	}
+#endif
 
-	if (shutdown_msg->core)
+	if (error_code)
+		;
+	else if (shutdown_msg->core)
 		debug3 ("performing immeditate shutdown without state save");
 	else if (shutdown_time)
 		debug3 ("slurm_rpc_shutdown_controller RPC issued after shutdown in progress");
@@ -1345,8 +1430,8 @@ slurm_rpc_shutdown_controller ( slurm_msg_t * msg )
 		slurmctld_shutdown ();
 	}
 
-	slurm_send_rc_msg ( msg , SLURM_SUCCESS );
-	if (shutdown_msg->core)
+	slurm_send_rc_msg ( msg , error_code );
+	if ((error_code == 0) && (shutdown_msg->core))
 		fatal ("Aborting per RPC request");
 }
 
@@ -1354,7 +1439,13 @@ slurm_rpc_shutdown_controller ( slurm_msg_t * msg )
 void 
 slurm_rpc_shutdown_controller_immediate ( slurm_msg_t * msg )
 {
-/* must be user root */
+#ifdef	HAVE_AUTHD
+	int uid;
+
+	uid = slurm_auth_uid (msg.cred);
+	if (uid != 0)
+		error ("Bogus SHUTDOWN_IMMEDIATE RPC from uid %d", uid);
+#endif
 
 	/* do RPC call */
 	debug ("Performing RPC: REQUEST_SHUTDOWN_IMMEDIATE");
@@ -1422,7 +1513,7 @@ void
 slurm_rpc_node_registration ( slurm_msg_t * msg )
 {
 	/* init */
-	int error_code;
+	int error_code = 0;
 	clock_t start_time;
 	slurm_node_registration_status_msg_t * node_reg_stat_msg = 
 			( slurm_node_registration_status_msg_t * ) msg-> data ;
@@ -1431,15 +1522,23 @@ slurm_rpc_node_registration ( slurm_msg_t * msg )
 
 	start_time = clock ();
 	debug ("Processing RPC: MESSAGE_NODE_REGISTRATION_STATUS");
-	lock_slurmctld (node_write_lock);
-
-	/* do RPC call */
-	error_code = validate_node_specs (
-		node_reg_stat_msg -> node_name ,
-		node_reg_stat_msg -> cpus ,
-		node_reg_stat_msg -> real_memory_size ,
-		node_reg_stat_msg -> temporary_disk_space ) ;
-	unlock_slurmctld (node_write_lock);
+#ifdef	HAVE_AUTHD
+	uid = slurm_auth_uid (msg.cred);
+	if (uid != 0) {
+		error_code = ESLURM_USER_ID_MISSING;
+		error ("Bogus NODE_REGISTER RPC from uid %d", uid);
+	}
+#endif
+	if (error_code == 0) {
+		/* do RPC call */
+		lock_slurmctld (node_write_lock);
+		error_code = validate_node_specs (
+			node_reg_stat_msg -> node_name ,
+			node_reg_stat_msg -> cpus ,
+			node_reg_stat_msg -> real_memory_size ,
+			node_reg_stat_msg -> temporary_disk_space ) ;
+		unlock_slurmctld (node_write_lock);
+	}
 
 	/* return result */
 	if (error_code)
diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c
index 3e8f716a4e1..d8dad21e774 100644
--- a/src/slurmctld/job_mgr.c
+++ b/src/slurmctld/job_mgr.c
@@ -1019,12 +1019,13 @@ job_allocate (job_desc_msg_t  *job_specs, uint32_t *new_job_id, char **node_list
 /* 
  * job_cancel - cancel the specified job
  * input: job_id - id of the job to be cancelled
+ *	uid - uid of requesting user
  * output: returns 0 on success, otherwise ESLURM error code 
  * global: job_list - pointer global job list
  *	last_job_update - time of last job table update
  */
 int
-job_cancel (uint32_t job_id) 
+job_cancel (uint32_t job_id, int uid) 
 {
 	struct job_record *job_ptr;
 
@@ -1039,6 +1040,11 @@ job_cancel (uint32_t job_id)
 	    (job_ptr->job_state == JOB_TIMEOUT))
 		return ESLURM_ALREADY_DONE;
 
+	if ((job_ptr->user_id != uid) && (uid != 0)) {
+		error ("Bogus JOB_CANCEL RPC from uid %d", uid);
+		return ESLURM_USER_ID_MISSING;
+	}
+
 	if (job_ptr->job_state == JOB_PENDING) {
 		last_job_update = time (NULL);
 		job_ptr->job_state = JOB_FAILED;
@@ -1068,12 +1074,13 @@ job_cancel (uint32_t job_id)
 /* 
  * job_complete - note the normal termination the specified job
  * input: job_id - id of the job which completed
+ *	uid - user id of user issuing the RPC
  * output: returns 0 on success, otherwise ESLURM error code 
  * global: job_list - pointer global job list
  *	last_job_update - time of last job table update
  */
 int
-job_complete (uint32_t job_id) 
+job_complete (uint32_t job_id, int uid) 
 {
 	struct job_record *job_ptr;
 
@@ -1088,6 +1095,11 @@ job_complete (uint32_t job_id)
 	    (job_ptr->job_state == JOB_TIMEOUT))
 		return ESLURM_ALREADY_DONE;
 
+	if ((job_ptr->user_id != uid) && (uid != 0)) {
+		error ("Bogus JOB_COMPLETE RPC from uid %d", uid);
+		return ESLURM_USER_ID_MISSING;
+	}
+
 	if ((job_ptr->job_state == JOB_STAGE_IN) || 
 	    (job_ptr->job_state == JOB_RUNNING) ||
 	    (job_ptr->job_state == JOB_STAGE_OUT)) {
@@ -1102,6 +1114,7 @@ job_complete (uint32_t job_id)
 	job_ptr->job_state = JOB_COMPLETE;
 	job_ptr->end_time = time(NULL);
 	delete_job_details(job_ptr);
+	delete_all_step_records(job_ptr);
 	return SLURM_SUCCESS;
 }
 
@@ -1504,12 +1517,13 @@ copy_job_desc_to_job_record ( job_desc_msg_t * job_desc ,
 /* 
  * job_step_cancel - cancel the specified job step
  * input: job_id, step_id - id of the job to be cancelled
+ *	uid - user id of user issuing the RPC
  * output: returns 0 on success, otherwise ESLURM error code 
  * global: job_list - pointer global job list
  *	last_job_update - time of last job table update
  */
 int
-job_step_cancel (uint32_t job_id, uint32_t step_id) 
+job_step_cancel (uint32_t job_id, uint32_t step_id, int uid) 
 {
 	struct job_record *job_ptr;
 	int error_code;
@@ -1526,6 +1540,11 @@ job_step_cancel (uint32_t job_id, uint32_t step_id)
 	    (job_ptr->job_state == JOB_TIMEOUT))
 		return ESLURM_ALREADY_DONE;
 
+	if ((job_ptr->user_id != uid) && (uid != 0)) {
+		error ("Bogus JOB_CANCEL RPC from uid %d", uid);
+		return ESLURM_USER_ID_MISSING;
+	}
+
 	if ((job_ptr->job_state == JOB_STAGE_IN) || 
 	    (job_ptr->job_state == JOB_RUNNING) ||
 	    (job_ptr->job_state == JOB_STAGE_OUT)) {
@@ -1548,12 +1567,13 @@ job_step_cancel (uint32_t job_id, uint32_t step_id)
 /* 
  * job_step_complete - note normal completion the specified job step
  * input: job_id, step_id - id of the job to be completed
+ *	uid - user id of user issuing RPC
  * output: returns 0 on success, otherwise ESLURM error code 
  * global: job_list - pointer global job list
  *	last_job_update - time of last job table update
  */
 int
-job_step_complete (uint32_t job_id, uint32_t step_id) 
+job_step_complete (uint32_t job_id, uint32_t step_id, int uid) 
 {
 	struct job_record *job_ptr;
 	int error_code;
@@ -1569,6 +1589,11 @@ job_step_complete (uint32_t job_id, uint32_t step_id)
 	    (job_ptr->job_state == JOB_TIMEOUT))
 		return ESLURM_ALREADY_DONE;
 
+	if ((job_ptr->user_id != uid) && (uid != 0)) {
+		error ("Bogus JOB_COMPLETE RPC from uid %d", uid);
+		return ESLURM_USER_ID_MISSING;
+	}
+
 	last_job_update = time (NULL);
 	error_code = delete_step_record (job_ptr, step_id);
 	if (error_code == ENOENT) {
@@ -1930,7 +1955,7 @@ purge_old_job (void)
 {
 	int i;
 
-	i = list_delete_all (job_list, &list_find_job_old, NULL);
+	i = list_delete_all (job_list, &list_find_job_old, "");
 	if (i) {
 		info ("purge_old_job: purged %d old job records", i);
 		last_job_update = time (NULL);
diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h
index 5d20e4e759c..5d6f679a475 100644
--- a/src/slurmctld/slurmctld.h
+++ b/src/slurmctld/slurmctld.h
@@ -338,16 +338,16 @@ extern int job_allocate (job_desc_msg_t  *job_specs, uint32_t *new_job_id, char
 	int immediate, int will_run, int allocate);
 
 /* job_cancel - cancel the specified job */
-extern int job_cancel (uint32_t job_id);
+extern int job_cancel (uint32_t job_id, int uid);
 
 /* job_step_cancel - cancel the specified job step */
-extern int job_step_cancel (uint32_t job_id, uint32_t job_step_id);
+extern int job_step_cancel (uint32_t job_id, uint32_t job_step_id, int uid );
 
 /* job_complete - note the completion the specified job */
-extern int job_complete (uint32_t job_id);
+extern int job_complete (uint32_t job_id, int uid);
 
 /* job_step_complete - note the completion the specified job step*/
-extern int job_step_complete (uint32_t job_id, uint32_t job_step_id);
+extern int job_step_complete (uint32_t job_id, uint32_t job_step_id, int uid);
 
 /* job_create - create a job table record for the supplied specifications */
 extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int allocate, 
diff --git a/src/slurmctld/step_mgr.c b/src/slurmctld/step_mgr.c
index c4e1a71c9ad..36bbb780f9c 100644
--- a/src/slurmctld/step_mgr.c
+++ b/src/slurmctld/step_mgr.c
@@ -142,8 +142,8 @@ dump_step_desc(step_specs *step_spec)
 
 	debug3("StepDesc: user_id=%u job_id=%u node_count=%u, cpu_count=%u\n", 
 		step_spec->user_id, step_spec->job_id, step_spec->node_count, step_spec->cpu_count);
-	debug3("   relative=%u node_list=%s\n", 
-		step_spec->relative, step_spec->node_list);
+	debug3("   relative=%u task_dist=%u node_list=%s\n", 
+		step_spec->relative, step_spec->task_dist, step_spec->node_list);
 }
 
 
@@ -315,6 +315,15 @@ step_create ( step_specs *step_specs, struct step_record** new_step_record  )
 	    (job_ptr->job_state == JOB_STAGE_OUT))
 		return ESLURM_ALREADY_DONE;
 
+#ifdef HAVE_LIBELAN3
+	if (step_specs->task_dist == SLURM_DIST_CYCLIC)
+		step_specs->task_dist = ELAN_CAP_TYPE_CYCLIC;
+	else if (step_specs->task_dist == SLURM_DIST_BLOCK)
+		step_specs->task_dist = ELAN_CAP_TYPE_BLOCK;
+	else
+		return ESLURM_BAD_DIST;
+#endif
+
 	nodeset = pick_step_nodes (job_ptr, step_specs );
 
 	if (nodeset == NULL)
@@ -327,6 +336,7 @@ step_create ( step_specs *step_specs, struct step_record** new_step_record  )
 	/* set the step_record values */
 	step_ptr->step_id = (job_ptr->next_step_id)++;
 	step_ptr->node_bitmap = nodeset;
+	step_ptr->cyclic_alloc = step_specs->task_dist;
 
 #ifdef HAVE_LIBELAN3
 	if (qsw_alloc_jobinfo (&step_ptr->qsw_job) < 0)
-- 
GitLab