Skip to content
Snippets Groups Projects
Commit a1a7e335 authored by Moe Jette's avatar Moe Jette
Browse files

Agent framework complete, compiles, not tested.

parent c9e021da
No related branches found
No related tags found
No related merge requests found
...@@ -54,6 +54,12 @@ ...@@ -54,6 +54,12 @@
#include <signal.h> #include <signal.h>
#include <unistd.h> #include <unistd.h>
#include <src/common/log.h>
#include <src/common/slurm_protocol_defs.h>
#include <src/common/xmalloc.h>
#include <src/common/xstring.h>
#include <src/slurmctld/agent.h>
typedef struct task_info { typedef struct task_info {
pthread_mutex_t *thread_mutex; /* agent specific mutex */ pthread_mutex_t *thread_mutex; /* agent specific mutex */
pthread_cond_t *thread_cond; /* agent specific condition */ pthread_cond_t *thread_cond; /* agent specific condition */
...@@ -63,8 +69,8 @@ typedef struct task_info { ...@@ -63,8 +69,8 @@ typedef struct task_info {
void *msg_args; /* RPC data to be used */ void *msg_args; /* RPC data to be used */
} task_info_t; } task_info_t;
void *thread_revoke_job_cred (void *args); static void *thread_revoke_job_cred (void *args);
void *wdog (void *args); static void *wdog (void *args);
/* /*
* agent - party responsible for performing some task in parallel across a set of nodes * agent - party responsible for performing some task in parallel across a set of nodes
...@@ -76,26 +82,23 @@ agent (void *args) ...@@ -76,26 +82,23 @@ agent (void *args)
int i, rc; int i, rc;
pthread_attr_t attr_wdog; pthread_attr_t attr_wdog;
pthread_t thread_wdog; pthread_t thread_wdog;
revoke_credential_msg_t revoke_job_cred;
agent_info_t *agent_ptr = (agent_info_t *) args; agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct; thd_t *thread_ptr = agent_ptr->thread_struct;
task_info_t *task_specific_ptr; task_info_t *task_specific_ptr;
/* basic argument value tests */ /* basic argument value tests */
if (agent_ptr->thread_count == 0) if (agent_ptr->thread_count == 0)
go to cleanup; goto cleanup;
if (thread_ptr == NULL) { if (thread_ptr == NULL)
error ("agent_revoke_job_cred passed null thread_struct"); error ("agent_revoke_job_cred passed null thread_struct");
go to cleanup; if (agent_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL)
}
if (agent_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) {
fatal ("agent_revoke_job_cred passed invaid message type %d", agent_ptr->msg_type); fatal ("agent_revoke_job_cred passed invaid message type %d", agent_ptr->msg_type);
go to cleanup;
}
/* initialize the thread data structure */ /* initialize the thread data structure */
agent_ptr->thread_mutex = PTHREAD_MUTEX_INITIALZER; if (pthread_mutex_init (&agent_ptr->thread_mutex, NULL))
agent_ptr->thread_cond = PTHREAD_COND_INITIALZER; fatal ("agent_revoke_job_cred passed invaid invalid thread_mutex address");
if (pthread_cond_init (&agent_ptr->thread_cond, NULL))
fatal ("agent_revoke_job_cred passed invaid invalid thread_cond address");
agent_ptr->threads_active = 0; agent_ptr->threads_active = 0;
for (i = 0; i < agent_ptr->thread_count; i++) { for (i = 0; i < agent_ptr->thread_count; i++) {
thread_ptr[i].state = DSH_NEW; thread_ptr[i].state = DSH_NEW;
...@@ -107,13 +110,12 @@ agent (void *args) ...@@ -107,13 +110,12 @@ agent (void *args)
if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_DETACHED)) if (pthread_attr_setdetachstate (&attr_wdog, PTHREAD_CREATE_DETACHED))
error ("pthread_attr_setdetachstate errno %d", errno); error ("pthread_attr_setdetachstate errno %d", errno);
#ifdef PTHREAD_SCOPE_SYSTEM #ifdef PTHREAD_SCOPE_SYSTEM
/* we want 1:1 threads if there is a choice */
pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM); pthread_attr_setscope (&attr_wdog, PTHREAD_SCOPE_SYSTEM);
#endif #endif
if (pthread_create (&thread_wdog, &attr_wdog, wdog, args) { if (pthread_create (&thread_wdog, &attr_wdog, wdog, args)) {
error ("pthread_create errno %d", errno); error ("pthread_create errno %d", errno);
sleep (1); sleep (1); /* sleep and try once more */
if (pthread_create (&thread_wdog, &attr_wdog, wdog, args) if (pthread_create (&thread_wdog, &attr_wdog, wdog, args))
fatal ("pthread_create errno %d", errno); fatal ("pthread_create errno %d", errno);
} }
...@@ -126,18 +128,17 @@ agent (void *args) ...@@ -126,18 +128,17 @@ agent (void *args)
pthread_cond_wait (&agent_ptr->thread_cond, &agent_ptr->thread_mutex); pthread_cond_wait (&agent_ptr->thread_cond, &agent_ptr->thread_mutex);
/* create thread */ /* create thread */
task_specific_ptr = malloc (sizeof (task_info_t)); task_specific_ptr = malloc (sizeof (task_info_t));
task_specific_ptr->thread_mutex = &agent_ptr->thread_mutex; task_specific_ptr->thread_mutex = &agent_ptr->thread_mutex;
task_specific_ptr->thread_cond = &agent_ptr->thread_cond; task_specific_ptr->thread_cond = &agent_ptr->thread_cond;
task_specific_ptr->threads_active = &agent_ptr->thread_count; task_specific_ptr->threads_active = &agent_ptr->thread_count;
task_specific_ptr->thread_struct = &thread_ptr[i]; task_specific_ptr->thread_struct = &thread_ptr[i];
task_specific_ptr->msg_type = agent_ptr->msg_type; task_specific_ptr->msg_type = agent_ptr->msg_type;
task_specific_ptr->msg_args = &agent_ptr->msg_args; task_specific_ptr->msg_args = &agent_ptr->msg_args;
pthread_attr_init (&thread_ptr[i].attr); pthread_attr_init (&thread_ptr[i].attr);
pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate (&thread_ptr[i].attr, PTHREAD_CREATE_JOINABLE);
#ifdef PTHREAD_SCOPE_SYSTEM #ifdef PTHREAD_SCOPE_SYSTEM
/* we want 1:1 threads if there is a choice */
pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM); pthread_attr_setscope (&thread_ptr[i].attr, PTHREAD_SCOPE_SYSTEM);
#endif #endif
if (agent_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) { if (agent_ptr->msg_type != REQUEST_REVOKE_JOB_CREDENTIAL) {
...@@ -159,6 +160,8 @@ agent (void *args) ...@@ -159,6 +160,8 @@ agent (void *args)
pthread_mutex_lock(&agent_ptr->thread_mutex); pthread_mutex_lock(&agent_ptr->thread_mutex);
while (agent_ptr->threads_active > 0) while (agent_ptr->threads_active > 0)
pthread_cond_wait(&agent_ptr->thread_cond, &agent_ptr->thread_mutex); pthread_cond_wait(&agent_ptr->thread_cond, &agent_ptr->thread_mutex);
pthread_join (thread_wdog, NULL);
return NULL;
cleanup: cleanup:
#if AGENT_IS_THREAD #if AGENT_IS_THREAD
...@@ -178,13 +181,14 @@ cleanup: ...@@ -178,13 +181,14 @@ cleanup:
static void * static void *
wdog (void *args) wdog (void *args)
{ {
int i, not_done; int i, fail_cnt, work_done;
agent_info_t *agent_ptr = (agent_info_t *) args; agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct; thd_t *thread_ptr = agent_ptr->thread_struct;
time_t min_start; time_t min_start;
while (1) { while (1) {
not_done = 0; /* assume all threads complete for now */ work_done = 1; /* assume all threads complete for now */
fail_cnt = 0; /* assume all threads complete sucessfully for now */
sleep (WDOG_POLL); sleep (WDOG_POLL);
min_start = time(NULL) - COMMAND_TIMEOUT; min_start = time(NULL) - COMMAND_TIMEOUT;
...@@ -192,27 +196,46 @@ wdog (void *args) ...@@ -192,27 +196,46 @@ wdog (void *args)
for (i = 0; i < agent_ptr->thread_count; i++) { for (i = 0; i < agent_ptr->thread_count; i++) {
switch (thread_ptr[i].state) { switch (thread_ptr[i].state) {
case DSH_ACTIVE: case DSH_ACTIVE:
not_done = 1; work_done = 0;
if (thread_ptr[i].start < min_start) if (thread_ptr[i].start < min_start)
pthread_kill(thread_ptr[i].thread, SIGALRM); pthread_kill(thread_ptr[i].thread, SIGALRM);
break; break;
case DSH_NEW: case DSH_NEW:
not_done = 1; work_done = 0;
break; break;
case DSH_DONE: case DSH_DONE:
break;
case DSH_FAILED: case DSH_FAILED:
fail_cnt++;
break; break;
} }
} }
pthread_mutex_unlock (&agent_ptr->thread_mutex); pthread_mutex_unlock (&agent_ptr->thread_mutex);
if (not_done == 0) if (work_done)
return NULL; break;
} }
/* Notify slurmctld of non-responding nodes */
if (fail_cnt) {
char *node_list_ptr;
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_FAILED)
xstrcat (node_list_ptr, thread_ptr[i].node_name);
}
/* send RPC */
/* the following nodes are not responding... */
xfree (node_list_ptr);
}
pthread_exit (NULL);
} }
/* thread_revoke_job_cred - thread to revoke a credential on a collection of nodes */ /* thread_revoke_job_cred - thread to revoke a credential on a collection of nodes */
void * static void *
thread_revoke_job_cred (void *arg) thread_revoke_job_cred (void *args)
{ {
sigset_t set; sigset_t set;
int msg_size ; int msg_size ;
...@@ -223,6 +246,7 @@ thread_revoke_job_cred (void *arg) ...@@ -223,6 +246,7 @@ thread_revoke_job_cred (void *arg)
return_code_msg_t * slurm_rc_msg ; return_code_msg_t * slurm_rc_msg ;
task_info_t *task_ptr = (task_info_t *) args; task_info_t *task_ptr = (task_info_t *) args;
thd_t *thread_ptr = task_ptr->thread_struct; thd_t *thread_ptr = task_ptr->thread_struct;
state_t thread_state = DSH_FAILED;
pthread_mutex_lock (task_ptr->thread_mutex); pthread_mutex_lock (task_ptr->thread_mutex);
thread_ptr->state = DSH_ACTIVE; thread_ptr->state = DSH_ACTIVE;
...@@ -237,7 +261,8 @@ thread_revoke_job_cred (void *arg) ...@@ -237,7 +261,8 @@ thread_revoke_job_cred (void *arg)
/* init message connection for message communication with slurmd */ /* init message connection for message communication with slurmd */
if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) { if ( ( sockfd = slurm_open_msg_conn (& thread_ptr->slurm_addr) ) == SLURM_SOCKET_ERROR ) {
error ("thread_revoke_job_cred/slurm_open_msg_conn error for %s", thread_ptr->node_name); error ("thread_revoke_job_cred/slurm_open_msg_conn error for %s",
thread_ptr->node_name);
goto cleanup; goto cleanup;
} }
...@@ -245,23 +270,27 @@ thread_revoke_job_cred (void *arg) ...@@ -245,23 +270,27 @@ thread_revoke_job_cred (void *arg)
request_msg . msg_type = task_ptr->msg_type ; request_msg . msg_type = task_ptr->msg_type ;
request_msg . data = task_ptr->msg_args ; request_msg . data = task_ptr->msg_args ;
if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) { if ( ( rc = slurm_send_node_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_revoke_job_cred/slurm_send_node_msg error for %s", thread_ptr->node_name); error ("thread_revoke_job_cred/slurm_send_node_msg error for %s",
thread_ptr->node_name);
goto cleanup; goto cleanup;
} }
/* receive message */ /* receive message */
if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) { if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_revoke_job_cred/slurm_receive_msg error for %s", thread_ptr->node_name); error ("thread_revoke_job_cred/slurm_receive_msg error for %s",
thread_ptr->node_name);
goto cleanup; goto cleanup;
} }
/* shutdown message connection */ /* shutdown message connection */
if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) { if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) {
error ("thread_revoke_job_cred/slurm_shutdown_msg_conn error for %s", thread_ptr->node_nam); error ("thread_revoke_job_cred/slurm_shutdown_msg_conn error for %s",
thread_ptr->node_name);
goto cleanup; goto cleanup;
} }
if ( msg_size ) { if ( msg_size ) {
error ("thread_revoke_job_cred/msg_size error %d for %s", msg_size, thread_ptr->node_nam); error ("thread_revoke_job_cred/msg_size error %d for %s",
msg_size, thread_ptr->node_name);
goto cleanup; goto cleanup;
} }
...@@ -272,17 +301,21 @@ thread_revoke_job_cred (void *arg) ...@@ -272,17 +301,21 @@ thread_revoke_job_cred (void *arg)
rc = slurm_rc_msg->return_code; rc = slurm_rc_msg->return_code;
slurm_free_return_code_msg ( slurm_rc_msg ); slurm_free_return_code_msg ( slurm_rc_msg );
if (rc) if (rc)
error ("thread_revoke_job_cred/rc error %d for %s", rc, thread_ptr->node_nam); error ("thread_revoke_job_cred/rc error %d for %s",
rc, thread_ptr->node_name);
else
thread_state = DSH_DONE;
break ; break ;
default: default:
error ("thread_revoke_job_cred/msg_type error %d for %s", error ("thread_revoke_job_cred/msg_type error %d for %s",
response_msg.msg_type, thread_ptr->node_nam); response_msg.msg_type, thread_ptr->node_name);
break ; break ;
} }
cleanup: cleanup:
pthread_mutex_lock (task_ptr->thread_mutex); pthread_mutex_lock (task_ptr->thread_mutex);
thread_ptr->state = DSH_DONE; thread_ptr->state = thread_state;
thread_ptr->finish = time(NULL); thread_ptr->finish = time(NULL);
/* Signal completion so another thread can replace us */ /* Signal completion so another thread can replace us */
......
...@@ -24,7 +24,10 @@ ...@@ -24,7 +24,10 @@
* You should have received a copy of the GNU General Public License along * You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc., * with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
***************************************************************************** \*****************************************************************************/
#ifndef _AGENT_H
#define _AGENT_H
#include <src/slurmctld/agent.h> #include <src/slurmctld/agent.h>
#include <src/slurmctld/slurmctld.h> #include <src/slurmctld/slurmctld.h>
...@@ -58,3 +61,5 @@ typedef struct agent_info { ...@@ -58,3 +61,5 @@ typedef struct agent_info {
} agent_info_t; } agent_info_t;
void *agent (void *args); void *agent (void *args);
#endif /* !_AGENT_H */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment