Skip to content
Snippets Groups Projects
Commit 69de51d9 authored by Moe Jette's avatar Moe Jette
Browse files

General code clean-up

parent 82f8ef55
No related branches found
No related tags found
No related merge requests found
......@@ -77,7 +77,8 @@ typedef struct thd {
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
state_t state; /* thread state */
time_t time; /* time stamp for start or delta time */
time_t time; /* start time or delta time
* at termination */
struct sockaddr_in slurm_addr; /* network address */
char node_name[MAX_NAME_LEN];/* node's name */
} thd_t;
......@@ -86,7 +87,7 @@ typedef struct agent_info {
pthread_mutex_t thread_mutex; /* agent specific mutex */
pthread_cond_t thread_cond; /* agent specific condition */
uint32_t thread_count; /* number of threads records */
uint32_t threads_active; /* count of currently active threads */
uint32_t threads_active; /* currently active threads */
uint16_t retry; /* if set, keep trying */
thd_t *thread_struct; /* thread structures */
slurm_msg_type_t msg_type; /* RPC to be issued */
......@@ -94,32 +95,34 @@ typedef struct agent_info {
} agent_info_t;
typedef struct task_info {
pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific mutex */
pthread_cond_t *thread_cond_ptr; /* pointer to agent specific condition */
uint32_t *threads_active_ptr; /* pointer to count of currently active threads */
thd_t *thread_struct_ptr; /* pointer to thread structures */
pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific
* mutex */
pthread_cond_t *thread_cond_ptr; /* pointer to agent specific
* condition */
uint32_t *threads_active_ptr; /* currently active thread ptr */
thd_t *thread_struct_ptr; /* thread structures ptr */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args_ptr; /* pointer to RPC data to be used */
void *msg_args_ptr; /* ptr to RPC data to be used */
} task_info_t;
static void alarm_handler(int dummy);
static void list_delete_retry (void *retry_entry);
static void queue_agent_retry (agent_info_t *agent_info_ptr, int count);
static void slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg);
static void spawn_retry_agent (agent_arg_t *agent_arg_ptr);
static void *thread_per_node_rpc (void *args);
static void *wdog (void *args);
static void xsignal(int signal, void (*handler)(int));
static void _alarm_handler(int dummy);
static void _list_delete_retry (void *retry_entry);
static void _queue_agent_retry (agent_info_t *agent_info_ptr, int count);
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg);
static void _spawn_retry_agent (agent_arg_t *agent_arg_ptr);
static void *_thread_per_node_rpc (void *args);
static void *_wdog (void *args);
static void _xsignal(int signal, void (*handler)(int));
/* retry RPC data structures */
pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
List retry_list = NULL; /* agent_arg_t list for retry */
/*
* agent - party responsible for transmitting an common RPC in parallel across a set
* of nodes
* input: pointer to agent_arg_t, which is xfree'd (including slurm_addr, node_names,
* and msg_args) upon completion if AGENT_IS_THREAD is set
* agent - party responsible for transmitting an common RPC in parallel
* across a set of nodes
* input: pointer to agent_arg_t, which is xfree'd (including slurm_addr,
* node_names and msg_args) upon completion if AGENT_IS_THREAD is set
*/
void *
agent (void *args)
......@@ -145,7 +148,8 @@ agent (void *args)
(agent_arg_ptr->msg_type != REQUEST_NODE_REGISTRATION_STATUS) &&
(agent_arg_ptr->msg_type != REQUEST_PING) &&
(agent_arg_ptr->msg_type != REQUEST_BATCH_JOB_LAUNCH))
fatal ("agent passed invalid message type %d", agent_arg_ptr->msg_type);
fatal ("agent passed invalid message type %d",
agent_arg_ptr->msg_type);
/* initialize the data structures */
agent_info_ptr = xmalloc (sizeof (agent_info_t));
......@@ -164,7 +168,8 @@ agent (void *args)
thread_ptr[i].state = DSH_NEW;
thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
strncpy (thread_ptr[i].node_name,
&agent_arg_ptr->node_names[i*MAX_NAME_LEN], MAX_NAME_LEN);
&agent_arg_ptr->node_names[i*MAX_NAME_LEN],
MAX_NAME_LEN);
}
/* start the watchdog thread */
......@@ -176,17 +181,18 @@ agent (void *args)
if (pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM))
error ("pthread_attr_setscope error %m");
#endif
if (pthread_create (&thread_wdog, &attr_wdog, wdog, (void *)agent_info_ptr)) {
if (pthread_create (&thread_wdog, &attr_wdog, _wdog,
(void *)agent_info_ptr)) {
error ("pthread_create error %m");
sleep (1); /* sleep and try once more */
if (pthread_create (&thread_wdog, &attr_wdog, wdog, args))
if (pthread_create (&thread_wdog, &attr_wdog, _wdog, args))
fatal ("pthread_create error %m");
}
#if AGENT_THREAD_COUNT < 1
fatal ("AGENT_THREAD_COUNT value is invalid");
#endif
/* start all the other threads (up to AGENT_THREAD_COUNT active at once) */
/* start all the other threads (up to AGENT_THREAD_COUNT active) */
for (i = 0; i < agent_info_ptr->thread_count; i++) {
/* wait until "room" for another thread */
......@@ -196,26 +202,35 @@ agent (void *args)
&agent_info_ptr->thread_mutex);
}
/* create thread specific data, NOTE freed from thread_per_node_rpc() */
task_specific_ptr = xmalloc (sizeof (task_info_t));
task_specific_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
task_specific_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
task_specific_ptr->threads_active_ptr = &agent_info_ptr->threads_active;
/* create thread specific data, NOTE freed from
* _thread_per_node_rpc() */
task_specific_ptr =
xmalloc (sizeof (task_info_t));
task_specific_ptr->thread_mutex_ptr =
&agent_info_ptr->thread_mutex;
task_specific_ptr->thread_cond_ptr =
&agent_info_ptr->thread_cond;
task_specific_ptr->threads_active_ptr =
&agent_info_ptr->threads_active;
task_specific_ptr->thread_struct_ptr = &thread_ptr[i];
task_specific_ptr->msg_type = agent_info_ptr->msg_type;
task_specific_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr;
task_specific_ptr->msg_type =
agent_info_ptr->msg_type;
task_specific_ptr->msg_args_ptr =
*agent_info_ptr->msg_args_pptr;
if (pthread_attr_init (&thread_ptr[i].attr))
fatal ("pthread_attr_init error %m");
if (pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED))
if (pthread_attr_setdetachstate (&thread_ptr[i].attr,
PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
if (pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM))
if (pthread_attr_setscope (&thread_ptr[i].attr,
PTHREAD_SCOPE_SYSTEM))
error ("pthread_attr_setscope error %m");
#endif
while ( (rc = pthread_create (&thread_ptr[i].thread,
&thread_ptr[i].attr,
thread_per_node_rpc,
_thread_per_node_rpc,
(void *) task_specific_ptr)) ) {
error ("pthread_create error %m");
if (agent_info_ptr->threads_active)
......@@ -244,7 +259,7 @@ cleanup:
xfree (agent_arg_ptr->node_names);
if (agent_arg_ptr->msg_args) {
if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH)
slurmctld_free_job_launch_msg (agent_arg_ptr->msg_args);
_slurmctld_free_job_launch_msg (agent_arg_ptr->msg_args);
else
xfree (agent_arg_ptr->msg_args);
}
......@@ -261,11 +276,11 @@ cleanup:
}
/*
* wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long.
* _wdog - Watchdog thread. Send SIGALRM to threads which have been active for too long.
* Sleep for WDOG_POLL seconds between polls.
*/
static void *
wdog (void *args)
_wdog (void *args)
{
int i, fail_cnt, work_done, delay, max_delay = 0;
agent_info_t *agent_ptr = (agent_info_t *) args;
......@@ -288,9 +303,11 @@ wdog (void *args)
switch (thread_ptr[i].state) {
case DSH_ACTIVE:
work_done = 0;
delay = difftime (time (NULL), thread_ptr[i].time);
delay = difftime (time (NULL),
thread_ptr[i].time);
if ( delay >= COMMAND_TIMEOUT)
pthread_kill(thread_ptr[i].thread, SIGALRM);
pthread_kill(thread_ptr[i].thread,
SIGALRM);
break;
case DSH_NEW:
work_done = 0;
......@@ -320,14 +337,15 @@ wdog (void *args)
}
unlock_slurmctld (node_write_lock);
#else
/* Build a list of all non-responding nodes and send it to slurmctld */
/* Build a list of all non-responding nodes and send
* it to slurmctld */
slurm_names = xmalloc (fail_cnt * MAX_NAME_LEN);
fail_cnt = 0;
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_FAILED) {
strncpy (&slurm_names[MAX_NAME_LEN * fail_cnt],
thread_ptr[i].node_name, MAX_NAME_LEN);
error ("agent/wdog: node %s failed to respond",
error ("agent/_wdog: node %s failed to respond",
thread_ptr[i].node_name);
fail_cnt++;
}
......@@ -339,7 +357,7 @@ wdog (void *args)
xfree (slurm_names);
#endif
if (agent_ptr->retry)
queue_agent_retry (agent_ptr, fail_cnt);
_queue_agent_retry (agent_ptr, fail_cnt);
}
#if AGENT_IS_THREAD
......@@ -351,7 +369,8 @@ wdog (void *args)
}
unlock_slurmctld (node_write_lock);
#else
/* Build a list of all responding nodes and send it to slurmctld to update time stamps */
/* Build a list of all responding nodes and send it to slurmctld to
* update time stamps */
done_cnt = agent_ptr->thread_count - fail_cnt;
slurm_names = xmalloc (done_cnt * MAX_NAME_LEN);
done_cnt = 0;
......@@ -375,10 +394,10 @@ wdog (void *args)
return (void *) NULL;
}
/* thread_per_node_rpc - thread to revoke a credential on a collection of nodes
/* _thread_per_node_rpc - thread to revoke a credential on a collection of nodes
* This xfrees the argument passed to it */
static void *
thread_per_node_rpc (void *args)
_thread_per_node_rpc (void *args)
{
int msg_size ;
int rc ;
......@@ -398,42 +417,45 @@ thread_per_node_rpc (void *args)
error ("sigaddset error on SIGALRM: %m");
if (sigprocmask (SIG_UNBLOCK, &set, NULL) != 0)
fatal ("sigprocmask error: %m");
xsignal(SIGALRM, alarm_handler);
_xsignal(SIGALRM, _alarm_handler);
if (args == NULL)
fatal ("thread_per_node_rpc has NULL argument");
fatal ("_thread_per_node_rpc has NULL argument");
pthread_mutex_lock (task_ptr->thread_mutex_ptr);
thread_ptr->state = DSH_ACTIVE;
thread_ptr->time = time (NULL);
pthread_mutex_unlock (task_ptr->thread_mutex_ptr);
/* init message connection for message communication */
if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_open_msg_conn error %m");
if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) )
== SLURM_SOCKET_ERROR ) {
error ("_thread_per_node_rpc/slurm_open_msg_conn error %m");
goto cleanup;
}
/* send request message */
request_msg . msg_type = task_ptr->msg_type ;
request_msg . data = task_ptr->msg_args_ptr ;
if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_send_node_msg error %m");
if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) )
== SLURM_SOCKET_ERROR ) {
error ("_thread_per_node_rpc/slurm_send_node_msg error %m");
goto cleanup;
}
/* receive message */
if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_receive_msg error %m");
if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) )
== SLURM_SOCKET_ERROR ) {
error ("_thread_per_node_rpc/slurm_receive_msg error %m");
goto cleanup;
}
/* shutdown message connection */
if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_per_node_rpc/slurm_shutdown_msg_conn error %m");
error ("_thread_per_node_rpc/slurm_shutdown_msg_conn error %m");
goto cleanup;
}
if ( msg_size ) {
error ("thread_per_node_rpc/msg_size error %d", msg_size);
error ("_thread_per_node_rpc/msg_size error %d", msg_size);
goto cleanup;
}
......@@ -444,8 +466,8 @@ thread_per_node_rpc (void *args)
rc = slurm_rc_msg->return_code;
slurm_free_return_code_msg ( slurm_rc_msg );
if (rc)
error ("thread_per_node_rpc/rc error %s",
slurm_strerror (rc));
error ("_thread_per_node_rpc/rc error %s",
slurm_strerror (rc)); /* Don't use %m */
else {
debug3 ("agent sucessfully processed RPC to node %s",
thread_ptr->node_name);
......@@ -453,7 +475,8 @@ thread_per_node_rpc (void *args)
thread_state = DSH_DONE;
break ;
default:
error ("thread_per_node_rpc bad msg_type %d",response_msg.msg_type);
error ("_thread_per_node_rpc bad msg_type %d",
response_msg.msg_type);
break ;
}
......@@ -475,7 +498,7 @@ cleanup:
* Emulate signal() but with BSD semantics (i.e. don't restore signal to
* SIGDFL prior to executing handler).
*/
static void xsignal(int signal, void (*handler)(int))
static void _xsignal(int signal, void (*handler)(int))
{
struct sigaction sa, old_sa;
......@@ -491,14 +514,14 @@ static void xsignal(int signal, void (*handler)(int))
* in interrupting connect() in k4cmd/rcmd or select() in rsh() below and
* causing them to return EINTR.
*/
static void alarm_handler(int dummy)
static void _alarm_handler(int dummy)
{
}
/* queue_agent_retry - Queue any failed RPCs for later replay */
void
queue_agent_retry (agent_info_t *agent_info_ptr, int count)
/* _queue_agent_retry - Queue any failed RPCs for later replay */
static void
_queue_agent_retry (agent_info_t *agent_info_ptr, int count)
{
agent_arg_t *agent_arg_ptr;
thd_t *thread_ptr = agent_info_ptr->thread_struct;
......@@ -511,7 +534,8 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count)
agent_arg_ptr = xmalloc (sizeof (agent_arg_t));
agent_arg_ptr -> node_count = count;
agent_arg_ptr -> retry = 1;
agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in) * count);
agent_arg_ptr -> slurm_addr = xmalloc (sizeof (struct sockaddr_in)
* count);
agent_arg_ptr -> node_names = xmalloc (MAX_NAME_LEN * count);
agent_arg_ptr -> msg_type = agent_info_ptr -> msg_type;
agent_arg_ptr -> msg_args = *(agent_info_ptr -> msg_args_pptr);
......@@ -531,7 +555,7 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count)
/* add the requeust to a list */
pthread_mutex_lock (&retry_mutex);
if (retry_list == NULL) {
retry_list = list_create (&list_delete_retry);
retry_list = list_create (&_list_delete_retry);
if (retry_list == NULL)
fatal ("list_create failed");
}
......@@ -541,11 +565,10 @@ queue_agent_retry (agent_info_t *agent_info_ptr, int count)
}
/*
* list_delete_retry - delete an entry from the retry list,
* _list_delete_retry - delete an entry from the retry list,
* see common/list.h for documentation
*/
void
list_delete_retry (void *retry_entry)
static void _list_delete_retry (void *retry_entry)
{
agent_arg_t *agent_arg_ptr; /* pointer to part_record */
......@@ -564,8 +587,7 @@ list_delete_retry (void *retry_entry)
/* agent_retry - Agent for retrying pending RPCs (top one on the queue),
* argument is unused */
void *
agent_retry (void *args)
void * agent_retry (void *args)
{
agent_arg_t *agent_arg_ptr = NULL;
......@@ -575,14 +597,13 @@ agent_retry (void *args)
pthread_mutex_unlock (&retry_mutex);
if (agent_arg_ptr)
spawn_retry_agent (agent_arg_ptr);
_spawn_retry_agent (agent_arg_ptr);
return NULL;
}
/* retry_pending - retry all pending RPCs for the given node name */
void
retry_pending (char *node_name)
void retry_pending (char *node_name)
{
int list_size = 0, i, j, found;
agent_arg_t *agent_arg_ptr = NULL;
......@@ -602,16 +623,15 @@ retry_pending (char *node_name)
break;
}
if (found) /* issue this RPC */
spawn_retry_agent (agent_arg_ptr);
_spawn_retry_agent (agent_arg_ptr);
else /* put the RPC back on the queue */
list_enqueue (retry_list, (void*) agent_arg_ptr);
}
pthread_mutex_unlock (&retry_mutex);
}
/* spawn_retry_agent - pthread_crate an agent for the given task */
void
spawn_retry_agent (agent_arg_t *agent_arg_ptr)
/* _spawn_retry_agent - pthread_crate an agent for the given task */
static void _spawn_retry_agent (agent_arg_t *agent_arg_ptr)
{
pthread_attr_t attr_agent;
pthread_t thread_agent;
......@@ -622,7 +642,8 @@ spawn_retry_agent (agent_arg_t *agent_arg_ptr)
debug3 ("Spawning RPC retry agent");
if (pthread_attr_init (&attr_agent))
fatal ("pthread_attr_init error %m");
if (pthread_attr_setdetachstate (&attr_agent, PTHREAD_CREATE_DETACHED))
if (pthread_attr_setdetachstate (&attr_agent,
PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
if (pthread_attr_setscope (&attr_agent, PTHREAD_SCOPE_SYSTEM))
......@@ -638,12 +659,12 @@ spawn_retry_agent (agent_arg_t *agent_arg_ptr)
}
}
/* slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg
/* _slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg
* because all environment variables currently loaded in one xmalloc
* buffer (see get_job_env()), which is different from how slurmd
* assembles the data from a message */
void slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg)
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg)
{
if (msg) {
if (msg->environment) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment