From c71013ab9f518e5993de321a72e64eb45a95f9c2 Mon Sep 17 00:00:00 2001
From: Moe Jette <jette1@llnl.gov>
Date: Wed, 24 Jul 2002 23:55:18 +0000
Subject: [PATCH] Added RPC for slurmctld generated shutdown RPC. This
 SHUTDOWN_IMMEDIATE RPC is used to distinguish it from the SHUTDOWN RPC
 comming from scontrol. It was added to deal with a race condition. Kevin's
 change in where pthread_create occurs and how the RPC is processed is now the
 default (#ifdef kevins_way removed). Added limit on the thread count before
 going to non-threaded RPCs (see MAX_THREAD_COUNT in controller.c).

---
 src/slurmctld/controller.c | 116 ++++++++++++++++++-------------------
 1 file changed, 55 insertions(+), 61 deletions(-)

diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c
index 59251162cf5..ea91630fbcf 100644
--- a/src/slurmctld/controller.c
+++ b/src/slurmctld/controller.c
@@ -36,6 +36,7 @@
 #include <string.h>
 #include <syslog.h>
 #include <sys/socket.h>
+#include <sys/types.h>
 #include <netdb.h>
 #include <netinet/in.h>
 #include <unistd.h>
@@ -49,10 +50,13 @@
 #include <src/slurmctld/slurmctld.h>
 
 #define BUF_SIZE 1024
+#define MAX_SERVER_THREAD_COUNT 20
 
 log_options_t log_opts = LOG_OPTS_STDERR_ONLY ;
 slurm_ctl_conf_t slurmctld_conf;
 time_t shutdown_time = (time_t)0;
+int server_thread_count = 0;
+pid_t slurmctld_pid;
 
 int getnodename (char *name, size_t len);
 int msg_from_root (void);
@@ -83,14 +87,10 @@ inline static void slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t im
 static void * service_connection ( void * arg ) ;
 void usage (char *prog_name);
 
-/* kevins_way allows a higher degree of parrallelism, not thread pooling would also be a good security addition to prevent thread explosion*/
-#define kevins_way
-#ifdef kevins_way
 typedef struct connection_arg
 {
 	int newsockfd ;
 } connection_arg_t ;
-#endif
 
 /* main - slurmctld main function, start various threads and process RPCs */
 int 
@@ -98,11 +98,6 @@ main (int argc, char *argv[])
 {
 	int sig ;
 	int error_code;
-	pthread_t thread_id;
-	slurm_fd newsockfd;
-        slurm_fd sockfd;
-	slurm_msg_t * msg = NULL ;
-	slurm_addr cli_addr ;
 	char node_name[MAX_NAME_LEN];
 	pthread_t thread_id_bg, thread_id_rpc;
 	pthread_attr_t thread_attr_bg, thread_attr_rpc;
@@ -113,6 +108,7 @@ main (int argc, char *argv[])
 	 */
 	log_init(argv[0], log_opts, SYSLOG_FACILITY_DAEMON, NULL);
 
+	slurmctld_pid = getpid ( );
 	init_ctld_conf ( &slurmctld_conf );
 	init_locks ( );
 	parse_commandline ( argc, argv, &slurmctld_conf );
@@ -144,6 +140,7 @@ main (int argc, char *argv[])
 		fatal ("pthread_create errno %d", errno);
 
 	/* create attached thread to process RPCs */
+	server_thread_count++;
 	if (pthread_attr_init (&thread_attr_rpc))
 		fatal ("pthread_attr_init errno %d", errno);
 	if (pthread_create ( &thread_id_rpc, &thread_attr_rpc, slurmctld_rpc_mgr, NULL))
@@ -167,7 +164,7 @@ main (int argc, char *argv[])
 			case SIGTERM:	/* kill -15 */
 				info ("Terminate signal (SIGINT or SIGTERM) received\n");
 				shutdown_time = time (NULL);
-				/* send RPC to shutdown */
+				/* send REQUEST_SHUTDOWN_IMMEDIATE RPC */
 				slurm_shutdown ();
 				pthread_join (thread_id_rpc, NULL);
 				/* thread_id_bg waits for all RPCs to complete */
@@ -193,13 +190,13 @@ main (int argc, char *argv[])
 void *
 slurmctld_rpc_mgr ( void * no_data ) 
 {
-	int error_code;
 	slurm_fd newsockfd;
         slurm_fd sockfd;
-	slurm_msg_t * msg = NULL ;
 	slurm_addr cli_addr ;
 	pthread_t thread_id_rpc_req;
 	pthread_attr_t thread_attr_rpc_req;
+	int no_thread;
+	connection_arg_t * conn_arg;
 
 	/* threads to process individual RPC's are detached */
 	if (pthread_attr_init (&thread_attr_rpc_req))
@@ -215,11 +212,9 @@ slurmctld_rpc_mgr ( void * no_data )
 	/*
 	 * Procss incoming RPCs indefinitely
 	 */
-	while (shutdown_time == 0) 
+	while (1) 
 	{
-#ifdef kevins_way
-		connection_arg_t * conn_arg = xmalloc ( sizeof ( connection_arg_t ) ) ;
-#endif
+		conn_arg = xmalloc ( sizeof ( connection_arg_t ) ) ;
 		/* accept needed for stream implementation 
 		 * is a no-op in message implementation that just passes sockfd to newsockfd
 		 */
@@ -228,49 +223,40 @@ slurmctld_rpc_mgr ( void * no_data )
 			error ("slurm_accept_msg_conn error %d", errno) ;
 			continue ;
 		}
-#ifdef kevins_way
 		conn_arg -> newsockfd = newsockfd ;
-		if (pthread_create ( &thread_id_rpc_req, &thread_attr_rpc_req, service_connection, (void *) conn_arg )) {
-			/* Do without threads on failure */
+		server_thread_count++;
+		if (server_thread_count > MAX_SERVER_THREAD_COUNT) {
+			info ("Warning: server_thread_count is %d, over system limit", server_thread_count);
+			no_thread = 1;
+		}
+		else if (shutdown_time)
+			no_thread = 1;
+		else if (pthread_create ( &thread_id_rpc_req, &thread_attr_rpc_req, service_connection, (void *) conn_arg )) {
 			error ("pthread_create errno %d", errno);
-			service_connection ( ( void * ) conn_arg ) ;
-		}		
-		
-#elif
-		/* receive message call that must occur before thread spawn because in message 
-		 * implementation their is no connection and the message is the sign of a new connection */
-		msg = xmalloc ( sizeof ( slurm_msg_t ) ) ;	
-		
-		if ( ( error_code = slurm_receive_msg ( newsockfd , msg ) ) == SLURM_SOCKET_ERROR )
-		{
-			if (shutdown_time)
+			no_thread = 1;
+		}
+		else
+			no_thread = 0;
+
+		if (no_thread) {
+			if ( service_connection ( ( void * ) conn_arg ) ) 
 				break;
-			error ("slurm_receive_msg error %d", errno);
-			slurm_close_accepted_conn ( newsockfd ); /* close the new socket */
-			slurm_free_msg ( msg ) ;
-			continue ;
 		}
 
-		msg -> conn_fd = newsockfd ;	
+	}
 
-		if (pthread_create ( &thread_id_rpc_req, &thread_attr_rpc_req, process_rpc, (void *) msg)) {
-			/* Do without threads on failure */
-			error ("pthread_create errno %d", errno);
-			slurmctld_req ( msg );	/* process the request */
-			/* close should only be called when the socket implementation is being used 
-			 * the following call will be a no-op in a message/mongo implementation */
-			slurm_close_accepted_conn ( newsockfd ); /* close the new socket */
-		}
-#endif
-	}			
+	debug3 ("slurmctld_rpc_mgr shutting down");
+	server_thread_count--;
 	pthread_exit ((void *)0);
 }
 
+/* service_connection - service the RPC, return NULL except in case of REQUEST_SHUTDOWN_IMMEDIATE */
 void * service_connection ( void * arg )
 {
 	int error_code;
 	slurm_fd newsockfd = ( ( connection_arg_t * ) arg ) -> newsockfd ;
 	slurm_msg_t * msg = NULL ;
+	void * return_code;
 	
 	msg = xmalloc ( sizeof ( slurm_msg_t ) ) ;	
 
@@ -279,22 +265,20 @@ void * service_connection ( void * arg )
 		error ("slurm_receive_msg error %d", errno);
 		slurm_free_msg ( msg ) ;
 	}
-	else
-	{
+	else {
+		if (msg->msg_type == REQUEST_SHUTDOWN_IMMEDIATE)
+			return_code = (void *) "fini";
 		msg -> conn_fd = newsockfd ;	
 		slurmctld_req ( msg );	/* process the request */
 	}
 
 	/* close should only be called when the socket implementation is being used 
-	* the following call will be a no-op in a message/mongo implementation */
+	 * the following call will be a no-op in a message/mongo implementation */
 	slurm_close_accepted_conn ( newsockfd ); /* close the new socket */
 
 	xfree ( arg ) ;
-#ifdef kevins_way
-	return NULL ;	
-#else
-	pthread_exit (NULL);
-#endif
+	server_thread_count--;
+	return return_code ;	
 }
 
 /* slurmctld_background - process slurmctld background activities */
@@ -311,7 +295,7 @@ slurmctld_background ( void * no_data )
 	slurmctld_lock_t state_write_lock = { READ_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK };
 
 	while (shutdown_time == 0) {
-		sleep (2);
+		sleep (1);
 
 		now = time (NULL);
 
@@ -333,9 +317,12 @@ slurmctld_background ( void * no_data )
 		if (shutdown_time || (now - last_checkpoint_time) > PERIODIC_CHECKPOINT) {
 			if (shutdown_time) {	
 				/* wait for any RPC's to complete */
-				now = time (NULL);
-				if ((now - shutdown_time) < 2)
-					sleep ((int)(now - shutdown_time + 2));
+				if (server_thread_count)
+					sleep (1);
+				if (server_thread_count)
+					sleep (1);
+				if (server_thread_count)
+					info ("warning: shutting down with server_thread_count of %d", server_thread_count);
 			}
 			
 			last_checkpoint_time = now;
@@ -345,6 +332,7 @@ slurmctld_background ( void * no_data )
 		}
 
 	}
+	debug3 ("slurmctld_background shutting down");
 	pthread_exit ((void *)0);
 }
 
@@ -421,6 +409,7 @@ slurmctld_req ( slurm_msg_t * msg )
 			slurm_rpc_reconfigure_controller ( msg ) ;
 			break;
 		case REQUEST_SHUTDOWN:
+		case REQUEST_SHUTDOWN_IMMEDIATE:
 			slurm_rpc_shutdown_controller ( msg ) ;
 			break;
 		case REQUEST_UPDATE_JOB:
@@ -959,10 +948,15 @@ slurm_rpc_shutdown_controller ( slurm_msg_t * msg )
 {
 	/* do RPC call */
 /* must be user root */
-	shutdown_time = time (NULL);
+/*	shutdown_time = time (NULL); leaves master and possibly rpc_mgr around */
+	if (shutdown_time)
+		debug3 ("slurm_rpc_shutdown_controller again");
+	else {
+		kill (slurmctld_pid, SIGTERM);	/* tell master to clean-up */
+		info ("slurm_rpc_shutdown_controller completed successfully");
+	}
 
 	/* return result */
-	info ("slurm_rpc_shutdown_controller completed successfully");
 	slurm_send_rc_msg ( msg , SLURM_SUCCESS );
 }
 
@@ -995,7 +989,7 @@ slurm_rpc_job_step_create( slurm_msg_t* msg )
 	else
 	{
 		/* FIXME Needs to be fixed to really work with a credential */
-		slurm_job_credential_t cred = { 1,1,"test",start_time,0} ;
+		slurm_job_credential_t cred = { 1,1,"test",start_time, "signature"} ;
 		info ("slurm_rpc_job_step_create success time=%ld",
 				(long) (clock () - start_time));
 		
@@ -1072,7 +1066,7 @@ slurm_shutdown ()
 	}
 
 	/* send request message */
-	request_msg . msg_type = REQUEST_SHUTDOWN ;
+	request_msg . msg_type = REQUEST_SHUTDOWN_IMMEDIATE ;
 
 	if ( ( rc = slurm_send_controller_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
 		error ("slurm_send_controller_msg error");
-- 
GitLab