Skip to content
Snippets Groups Projects
Commit 9b0a7d99 authored by Moe Jette's avatar Moe Jette
Browse files

Minor restructuring of code to handle incomming requests when the

MAX_SERVER_THREADS is exceeded. Thread counter, mutex, and cond
logic all moved into new allocate/deallocate server thread functions.
parent 894131e6
No related branches found
No related tags found
No related merge requests found
...@@ -83,8 +83,10 @@ static int daemonize = DEFAULT_DAEMONIZE; ...@@ -83,8 +83,10 @@ static int daemonize = DEFAULT_DAEMONIZE;
static int debug_level = 0; static int debug_level = 0;
static char *debug_logfile = NULL; static char *debug_logfile = NULL;
static int recover = DEFAULT_RECOVER; static int recover = DEFAULT_RECOVER;
static pthread_cond_t server_thread_cond = PTHREAD_COND_INITIALIZER;
static pid_t slurmctld_pid; static pid_t slurmctld_pid;
inline static void _free_server_thread(void);
static void _init_config(void); static void _init_config(void);
static void _init_pidfile(void); static void _init_pidfile(void);
static void _kill_old_slurmctld(void); static void _kill_old_slurmctld(void);
...@@ -98,6 +100,7 @@ static void * _slurmctld_rpc_mgr(void *no_data); ...@@ -98,6 +100,7 @@ static void * _slurmctld_rpc_mgr(void *no_data);
static void * _slurmctld_signal_hand(void *no_data); static void * _slurmctld_signal_hand(void *no_data);
inline static void _update_cred_key(void); inline static void _update_cred_key(void);
inline static void _usage(char *prog_name); inline static void _usage(char *prog_name);
static void _wait_for_server_thread(void);
typedef struct connection_arg { typedef struct connection_arg {
int newsockfd; int newsockfd;
...@@ -407,27 +410,21 @@ static void *_slurmctld_rpc_mgr(void *no_data) ...@@ -407,27 +410,21 @@ static void *_slurmctld_rpc_mgr(void *no_data)
* Process incoming RPCs indefinitely * Process incoming RPCs indefinitely
*/ */
while (1) { while (1) {
conn_arg = xmalloc(sizeof(connection_arg_t)); /*
/* accept needed for stream implementation is a no-op in * accept needed for stream implementation is a no-op in
* message implementation that just passes sockfd to newsockfd * message implementation that just passes sockfd to newsockfd
*/ */
_wait_for_server_thread();
if ((newsockfd = slurm_accept_msg_conn(sockfd, if ((newsockfd = slurm_accept_msg_conn(sockfd,
&cli_addr)) == &cli_addr)) ==
SLURM_SOCKET_ERROR) { SLURM_SOCKET_ERROR) {
_free_server_thread();
error("slurm_accept_msg_conn error %m"); error("slurm_accept_msg_conn error %m");
continue; continue;
} }
conn_arg = xmalloc(sizeof(connection_arg_t));
conn_arg->newsockfd = newsockfd; conn_arg->newsockfd = newsockfd;
slurm_mutex_lock(&slurmctld_config.thread_count_lock); if (slurmctld_config.shutdown_time)
slurmctld_config.server_thread_count++;
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
if (slurmctld_config.server_thread_count >=
MAX_SERVER_THREADS) {
info(
"Warning: server_thread_count over limit: %d",
slurmctld_config.server_thread_count);
no_thread = 1;
} else if (slurmctld_config.shutdown_time)
no_thread = 1; no_thread = 1;
else if (pthread_create(&thread_id_rpc_req, else if (pthread_create(&thread_id_rpc_req,
&thread_attr_rpc_req, &thread_attr_rpc_req,
...@@ -442,14 +439,11 @@ static void *_slurmctld_rpc_mgr(void *no_data) ...@@ -442,14 +439,11 @@ static void *_slurmctld_rpc_mgr(void *no_data)
if (_service_connection((void *) conn_arg)) if (_service_connection((void *) conn_arg))
break; break;
} }
} }
debug3("_slurmctld_rpc_mgr shutting down"); debug3("_slurmctld_rpc_mgr shutting down");
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
slurmctld_config.server_thread_count--;
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
(void) slurm_shutdown_msg_engine(sockfd); (void) slurm_shutdown_msg_engine(sockfd);
_free_server_thread();
pthread_exit((void *) 0); pthread_exit((void *) 0);
} }
...@@ -484,10 +478,44 @@ static void *_service_connection(void *arg) ...@@ -484,10 +478,44 @@ static void *_service_connection(void *arg)
slurm_free_msg(msg); slurm_free_msg(msg);
xfree(arg); xfree(arg);
_free_server_thread();
return return_code;
}
/* Increment slurmctld_config.server_thread_count and don't return
* until its value is no larger than MAX_SERVER_THREADS */
static void _wait_for_server_thread(void)
{
bool print_it = true;
slurm_mutex_lock(&slurmctld_config.thread_count_lock); slurm_mutex_lock(&slurmctld_config.thread_count_lock);
slurmctld_config.server_thread_count--; while (1) {
if (slurmctld_config.server_thread_count <
MAX_SERVER_THREADS) {
slurmctld_config.server_thread_count++;
break;
} else { /* wait for state change and retry */
if (print_it) {
debug("server_thread_count over limit: %d",
slurmctld_config.server_thread_count);
print_it = false;
}
pthread_cond_wait(&server_thread_cond,
&slurmctld_config.thread_count_lock);
}
}
slurm_mutex_unlock(&slurmctld_config.thread_count_lock); slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
return return_code; }
static void _free_server_thread(void)
{
slurm_mutex_lock(&slurmctld_config.thread_count_lock);
if (slurmctld_config.server_thread_count > 0)
slurmctld_config.server_thread_count--;
else
error("slurmctld_config.server_thread_count underflow");
slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
pthread_cond_broadcast(&server_thread_cond);
} }
/* /*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment