Newer
Older
/*****************************************************************************\
* agent.c - parallel background communication functions. This is where
* logic could be placed for broadcast communications.
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Moe Jette <jette@llnl.gov>, et. al.
* Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
*****************************************************************************
* Theory of operation:
*
* The functions below permit slurm to initiate parallel tasks as a
* detached thread and let the functions below make sure the work happens.
* For example, when a job's time limit is to be changed slurmctld needs
* to notify the slurmd on every node to which the job was allocated.
* We don't want to hang slurmctld's primary function (the job update RPC)
* to perform this work, so it just initiates an agent to perform the work.
* The agent is passed all details required to perform the work, so it will
* be possible to execute the agent as an pthread, process, or even a daemon
* on some other computer.
* The main agent thread creates a separate thread for each node to be
* communicated with up to AGENT_THREAD_COUNT. A special watchdog thread
* sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)
* for more than COMMAND_TIMEOUT seconds.
* The agent responds to slurmctld via a function call or an RPC as required.
* For example, informing slurmctld that some node is not responding.
* All the state for each thread is maintained in thd_t struct, which is
* used by the watchdog thread as well as the communication threads.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include "src/common/list.h"
#include "src/common/log.h"
#include "src/common/macros.h"

Mark Grondona
committed
#include "src/common/xsignal.h"
#include "src/common/xassert.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"

Mark Grondona
committed
#include "src/common/slurm_protocol_api.h"
#include "src/slurmctld/agent.h"
#include "src/slurmctld/locks.h"
#include "src/slurmctld/ping_nodes.h"
#include "src/slurmctld/slurmctld.h"
#include "src/slurmctld/srun_comm.h"
#if COMMAND_TIMEOUT == 1
# define WDOG_POLL 1 /* secs */
# define WDOG_POLL 2 /* secs */
#define MAX_RETRIES 10
typedef enum {
DSH_NEW, /* Request not yet started */
DSH_ACTIVE, /* Request in progress */
DSH_DONE, /* Request completed normally */
DSH_NO_RESP, /* Request timed out */
DSH_FAILED /* Request resulted in error */
} state_t;
typedef struct thd {
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
state_t state; /* thread state */
time_t start_time; /* start time */
time_t end_time; /* end time or delta time
* upon termination */
struct sockaddr_in slurm_addr; /* network address */
char node_name[MAX_NAME_LEN]; /* node's name */
} thd_t;
typedef struct agent_info {
pthread_mutex_t thread_mutex; /* agent specific mutex */
pthread_cond_t thread_cond; /* agent specific condition */
uint32_t thread_count; /* number of threads records */
uint32_t threads_active; /* currently active threads */
uint16_t retry; /* if set, keep trying */
thd_t *thread_struct; /* thread structures */
bool get_reply; /* flag if reply expected */
slurm_msg_type_t msg_type; /* RPC to be issued */
void **msg_args_pptr; /* RPC data to be used */
pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific
* mutex */
pthread_cond_t *thread_cond_ptr;/* pointer to agent specific
* condition */
uint32_t *threads_active_ptr; /* currently active thread ptr */
thd_t *thread_struct_ptr; /* thread structures ptr */
bool get_reply; /* flag if reply expected */
slurm_msg_type_t msg_type; /* RPC to be issued */
void *msg_args_ptr; /* ptr to RPC data to be used */
typedef struct queued_request {
agent_arg_t* agent_arg_ptr; /* The queued request */
time_t last_attempt; /* Time of last xmit attempt */
} queued_request_t;
static inline void _comm_err(char *node_name);
static void _list_delete_retry(void *retry_entry);
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
static void _notify_slurmctld_jobs(agent_info_t *agent_ptr);
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
int no_resp_cnt, int retry_cnt);
static void _purge_agent_args(agent_arg_t *agent_arg_ptr);
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg);
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr);
static void *_thread_per_node_rpc(void *args);
static int _valid_agent_arg(agent_arg_t *agent_arg_ptr);
static void *_wdog(void *args);
static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
static List retry_list = NULL; /* agent_arg_t list for retry */
static bool run_scheduler = false;
* agent - party responsible for transmitting an common RPC in parallel
* across a set of nodes
* IN pointer to agent_arg_t, which is xfree'd (including slurm_addr,
* node_names and msg_args) upon completion if AGENT_IS_THREAD is set
* RET always NULL (function format just for use as pthread)
void *agent(void *args)
int i, rc, retries = 0;
pthread_attr_t attr_wdog;
pthread_t thread_wdog;
agent_arg_t *agent_arg_ptr = args;
agent_info_t *agent_info_ptr = NULL;
thd_t *thread_ptr;
task_info_t *task_specific_ptr;
/* basic argument value tests */
if (_valid_agent_arg(agent_arg_ptr))
goto cleanup;
xsignal(SIGALRM, _alarm_handler);

Mark Grondona
committed
/* initialize the agent data structures */
agent_info_ptr = _make_agent_info(agent_arg_ptr);
thread_ptr = agent_info_ptr->thread_struct;
/* start the watchdog thread */
if (pthread_attr_init(&attr_wdog))
fatal("pthread_attr_init error %m");
if (pthread_attr_setdetachstate
(&attr_wdog, PTHREAD_CREATE_JOINABLE))
error("pthread_attr_setdetachstate error %m");
if (pthread_attr_setscope(&attr_wdog, PTHREAD_SCOPE_SYSTEM))
error("pthread_attr_setscope error %m");
while (pthread_create(&thread_wdog, &attr_wdog, _wdog,
(void *) agent_info_ptr)) {
error("pthread_create error %m");
if (++retries > MAX_RETRIES)
fatal("Can't create pthread");
sleep(1); /* sleep and again */
#if AGENT_THREAD_COUNT < 1
fatal("AGENT_THREAD_COUNT value is invalid");
#endif
/* start all the other threads (up to AGENT_THREAD_COUNT active) */
for (i = 0; i < agent_info_ptr->thread_count; i++) {
/* wait until "room" for another thread */
slurm_mutex_lock(&agent_info_ptr->thread_mutex);
while (agent_info_ptr->threads_active >=
AGENT_THREAD_COUNT) {
pthread_cond_wait(&agent_info_ptr->thread_cond,
&agent_info_ptr->thread_mutex);
/* create thread specific data, NOTE: freed from
* _thread_per_node_rpc() */
task_specific_ptr = _make_task_data(agent_info_ptr, i);
if (pthread_attr_init(&thread_ptr[i].attr))
fatal("pthread_attr_init error %m");
if (pthread_attr_setdetachstate(&thread_ptr[i].attr,
PTHREAD_CREATE_DETACHED))
error("pthread_attr_setdetachstate error %m");
#ifdef PTHREAD_SCOPE_SYSTEM
if (pthread_attr_setscope(&thread_ptr[i].attr,
PTHREAD_SCOPE_SYSTEM))
error("pthread_attr_setscope error %m");
#endif
while ((rc = pthread_create(&thread_ptr[i].thread,
&thread_ptr[i].attr,
_thread_per_node_rpc,
(void *) task_specific_ptr))) {
error("pthread_create error %m");
if (agent_info_ptr->threads_active)
pthread_cond_wait(&agent_info_ptr->
thread_cond,
&agent_info_ptr->
thread_mutex);
else {
slurm_mutex_unlock(&agent_info_ptr->
thread_mutex);
sleep(1);
slurm_mutex_lock(&agent_info_ptr->
thread_mutex);
agent_info_ptr->threads_active++;
slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
}
/* wait for termination of remaining threads */
pthread_join(thread_wdog, NULL);
_purge_agent_args(agent_arg_ptr);
if (agent_info_ptr) {
xfree(agent_info_ptr->thread_struct);
/* Basic validity test of agent argument */
static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
{
xassert(agent_arg_ptr);
xassert(agent_arg_ptr->slurm_addr);
xassert(agent_arg_ptr->node_names);
xassert((agent_arg_ptr->msg_type == SRUN_PING) ||
(agent_arg_ptr->msg_type == SRUN_TIMEOUT) ||
(agent_arg_ptr->msg_type == SRUN_NODE_FAIL) ||
(agent_arg_ptr->msg_type == REQUEST_KILL_JOB) ||
(agent_arg_ptr->msg_type == REQUEST_KILL_TIMELIMIT) ||
(agent_arg_ptr->msg_type == REQUEST_UPDATE_JOB_TIME) ||
(agent_arg_ptr->msg_type == REQUEST_KILL_TASKS) ||
(agent_arg_ptr->msg_type == REQUEST_PING) ||
(agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) ||
(agent_arg_ptr->msg_type == REQUEST_SHUTDOWN) ||
(agent_arg_ptr->msg_type == REQUEST_RECONFIGURE) ||
(agent_arg_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
(agent_arg_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS));
if (agent_arg_ptr->node_count == 0)
return SLURM_FAILURE; /* no messages to be sent */
return SLURM_SUCCESS;
}
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
{
int i;
agent_info_t *agent_info_ptr;
thd_t *thread_ptr;
agent_info_ptr = xmalloc(sizeof(agent_info_t));
slurm_mutex_init(&agent_info_ptr->thread_mutex);
if (pthread_cond_init(&agent_info_ptr->thread_cond, NULL))
fatal("pthread_cond_init error %m");
agent_info_ptr->thread_count = agent_arg_ptr->node_count;
agent_info_ptr->retry = agent_arg_ptr->retry;
agent_info_ptr->threads_active = 0;
thread_ptr = xmalloc(agent_arg_ptr->node_count * sizeof(thd_t));
agent_info_ptr->thread_struct = thread_ptr;
agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
if ((agent_arg_ptr->msg_type != REQUEST_SHUTDOWN) &&
(agent_arg_ptr->msg_type != REQUEST_RECONFIGURE))
agent_info_ptr->get_reply = true;
for (i = 0; i < agent_info_ptr->thread_count; i++) {
thread_ptr[i].state = DSH_NEW;
thread_ptr[i].slurm_addr = agent_arg_ptr->slurm_addr[i];
strncpy(thread_ptr[i].node_name,
&agent_arg_ptr->node_names[i * MAX_NAME_LEN],
MAX_NAME_LEN);
}
return agent_info_ptr;
}
static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
{
task_info_t *task_info_ptr;
task_info_ptr = xmalloc(sizeof(task_info_t));
task_info_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
task_info_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
task_info_ptr->get_reply = agent_info_ptr->get_reply;
task_info_ptr->msg_type = agent_info_ptr->msg_type;
task_info_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr;
return task_info_ptr;
}
* _wdog - Watchdog thread. Send SIGALRM to threads which have been active
* for too long.
* IN args - pointer to agent_info_t with info on threads to watch
* Sleep for WDOG_POLL seconds between polls.
static void *_wdog(void *args)
int fail_cnt, no_resp_cnt, retry_cnt;
bool work_done, srun_agent = false;
int i, max_delay = 0;
agent_info_t *agent_ptr = (agent_info_t *) args;
thd_t *thread_ptr = agent_ptr->thread_struct;
time_t now;
if ( (agent_ptr->msg_type == SRUN_PING) ||
(agent_ptr->msg_type == SRUN_TIMEOUT) ||
(agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
(agent_ptr->msg_type == SRUN_NODE_FAIL) )
srun_agent = true;
work_done = true; /* assume all threads complete */
fail_cnt = 0; /* assume no threads failures */
no_resp_cnt = 0; /* assume all threads respond */
retry_cnt = 0; /* assume no required retries */
sleep(WDOG_POLL);
now = time(NULL);
slurm_mutex_lock(&agent_ptr->thread_mutex);
for (i = 0; i < agent_ptr->thread_count; i++) {
switch (thread_ptr[i].state) {
case DSH_ACTIVE:
work_done = false;
if (thread_ptr[i].end_time <= now) {
debug3("agent thread %lu timed out\n",
(unsigned long) thread_ptr[i].thread);
if (pthread_kill(thread_ptr[i].thread,
SIGALRM) == ESRCH)
thread_ptr[i].state = DSH_NO_RESP;

Mark Grondona
committed
}
break;
case DSH_NEW:
work_done = false;
break;
case DSH_DONE:
if (max_delay < (int)thread_ptr[i].end_time)
max_delay = (int)thread_ptr[i].end_time;
case DSH_NO_RESP:
no_resp_cnt++;
retry_cnt++;
case DSH_FAILED:
fail_cnt++;
break;
if (work_done)
slurm_mutex_unlock(&agent_ptr->thread_mutex);
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
if (srun_agent)
_notify_slurmctld_jobs(agent_ptr);
else
_notify_slurmctld_nodes(agent_ptr, no_resp_cnt, retry_cnt);
if (max_delay)
debug2("agent maximum delay %d seconds", max_delay);
slurm_mutex_unlock(&agent_ptr->thread_mutex);
return (void *) NULL;
}
static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
{
#if AGENT_IS_THREAD
/* Locks: Write job */
slurmctld_lock_t job_write_lock =
{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
uint32_t job_id = 0, step_id = 0;
thd_t *thread_ptr = agent_ptr->thread_struct;
if (agent_ptr->msg_type == SRUN_PING) {
srun_ping_msg_t *msg = *agent_ptr->msg_args_pptr;
job_id = msg->job_id;
step_id = msg->step_id;
} else if (agent_ptr->msg_type == SRUN_TIMEOUT) {
srun_timeout_msg_t *msg = *agent_ptr->msg_args_pptr;
job_id = msg->job_id;
step_id = msg->step_id;
} else if (agent_ptr->msg_type == SRUN_NODE_FAIL) {
srun_node_fail_msg_t *msg = *agent_ptr->msg_args_pptr;
job_id = msg->job_id;
step_id = msg->step_id;
} else if (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) {
resource_allocation_response_msg_t *msg =
*agent_ptr->msg_args_pptr;
job_id = msg->job_id;
step_id = NO_VAL;
} else {
error("_notify_slurmctld_jobs invalid msg_type %u",
agent_ptr->msg_type);
return;
}
lock_slurmctld(job_write_lock);
if (thread_ptr[0].state == DSH_DONE)
srun_response(job_id, step_id);
unlock_slurmctld(job_write_lock);
#else
fatal("Code development needed here if agent is not thread");
#endif
}
static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
int no_resp_cnt, int retry_cnt)
{
#if AGENT_IS_THREAD
/* Locks: Read config, write job, write node */
slurmctld_lock_t node_write_lock =
{ READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
#endif
thd_t *thread_ptr = agent_ptr->thread_struct;
int i;
/* Notify slurmctld of non-responding nodes */
if (no_resp_cnt) {
#if AGENT_IS_THREAD
/* Update node table data for non-responding nodes */
lock_slurmctld(node_write_lock);
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_NO_RESP)
node_not_resp(thread_ptr[i].node_name,
thread_ptr[i].start_time);
if (agent_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
/* Requeue the request */
batch_job_launch_msg_t *launch_msg_ptr =
*agent_ptr->msg_args_pptr;
uint32_t job_id = launch_msg_ptr->job_id;
info("Non-responding node, requeue JobId=%u", job_id);
job_complete(job_id, 0, true, 0);
}
unlock_slurmctld(node_write_lock);
fatal("Code development needed here if agent is not thread");
if (retry_cnt && agent_ptr->retry)
_queue_agent_retry(agent_ptr, retry_cnt);
/* Update last_response on responding nodes */
#if AGENT_IS_THREAD
lock_slurmctld(node_write_lock);
for (i = 0; i < agent_ptr->thread_count; i++) {
if (thread_ptr[i].state == DSH_FAILED)
set_node_down(thread_ptr[i].node_name,
"Prolog/epilog failure");
if (thread_ptr[i].state == DSH_DONE)
node_did_resp(thread_ptr[i].node_name);
unlock_slurmctld(node_write_lock);
if (run_scheduler) {
run_scheduler = false;
schedule(); /* has own locks */
}
if ((agent_ptr->msg_type == REQUEST_PING) ||
(agent_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS))
ping_end();
#else
fatal("Code development needed here if agent is not thread");
#endif
}
/* Report a communications error for specified node */
static inline void _comm_err(char *node_name)
{
#if AGENT_IS_THREAD
if (is_node_resp (node_name))
#endif
error("agent/send_recv_msg: %s: %m", node_name);
}
* _thread_per_node_rpc - thread to issue an RPC on a collection of nodes
* IN/OUT args - pointer to task_info_t, xfree'd on completion
*/
static void *_thread_per_node_rpc(void *args)
int rc = SLURM_SUCCESS, timeout = 0;

Mark Grondona
committed
slurm_msg_t msg;
task_info_t *task_ptr = (task_info_t *) args;
thd_t *thread_ptr = task_ptr->thread_struct_ptr;
state_t thread_state = DSH_NO_RESP;
slurm_msg_type_t msg_type = task_ptr->msg_type;
bool is_kill_msg, srun_agent;
#if AGENT_IS_THREAD
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
/* Locks: Read job */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };

Mark Grondona
committed
xassert(args != NULL);
is_kill_msg = ( (msg_type == REQUEST_KILL_TIMELIMIT) ||
(msg_type == REQUEST_KILL_JOB) );
srun_agent = ( (msg_type == SRUN_PING) ||
(msg_type == SRUN_TIMEOUT) ||
(msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
(msg_type == SRUN_NODE_FAIL) );
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
thread_ptr->start_time = time(NULL);
/* don't try to communicate with defunct job */
#if AGENT_IS_THREAD
if (srun_agent) {
uint32_t job_id = 0;
enum job_states state = JOB_END;
struct job_record *job_ptr = NULL;
if (msg_type == SRUN_PING) {
srun_ping_msg_t *msg = task_ptr->msg_args_ptr;
job_id = msg->job_id;
} else if (msg_type == SRUN_TIMEOUT) {
srun_timeout_msg_t *msg = task_ptr->msg_args_ptr;
job_id = msg->job_id;
} else if (msg_type == SRUN_NODE_FAIL) {
srun_node_fail_msg_t *msg = task_ptr->msg_args_ptr;
job_id = msg->job_id;
} else if (msg_type == RESPONSE_RESOURCE_ALLOCATION) {
resource_allocation_response_msg_t *msg =
task_ptr->msg_args_ptr;
job_id = msg->job_id;
}
lock_slurmctld(job_read_lock);
if (job_id)
job_ptr = find_job_record(job_id);
if (job_ptr)
state = job_ptr->job_state;
unlock_slurmctld(job_read_lock);
if ((state == JOB_RUNNING) ||
((state & JOB_COMPLETING) && (msg_type == SRUN_NODE_FAIL))) {
; /* proceed with the communication */
} else {
thread_state = DSH_DONE;
goto cleanup;
}
}
#endif
slurm_mutex_lock(task_ptr->thread_mutex_ptr);
thread_ptr->state = DSH_ACTIVE;
slurm_mutex_unlock(task_ptr->thread_mutex_ptr);
/* send request message */

Mark Grondona
committed
msg.address = thread_ptr->slurm_addr;
msg.msg_type = msg_type;

Mark Grondona
committed
msg.data = task_ptr->msg_args_ptr;
thread_ptr->end_time = thread_ptr->start_time + COMMAND_TIMEOUT;
if (task_ptr->get_reply) {
if (slurm_send_recv_rc_msg(&msg, &rc, timeout) < 0) {
if (!srun_agent)
_comm_err(thread_ptr->node_name);
goto cleanup;
}
} else {
if (slurm_send_only_node_msg(&msg) < 0) {
if (!srun_agent)
_comm_err(thread_ptr->node_name);
} else
thread_state = DSH_DONE;
goto cleanup;
}

Mark Grondona
committed
/* SPECIAL CASE: Mark node as IDLE if job already complete */
if (is_kill_msg && (rc == ESLURMD_KILL_JOB_ALREADY_COMPLETE)) {
kill_job_msg_t *kill_job;
kill_job = (kill_job_msg_t *) task_ptr->msg_args_ptr;
rc = SLURM_SUCCESS;
lock_slurmctld(job_write_lock);
if (job_epilog_complete(kill_job->job_id,
thread_ptr->node_name, rc))
run_scheduler = true;
unlock_slurmctld(job_write_lock);
}
/* SPECIAL CASE: Kill non-startable batch job */
if ((msg_type == REQUEST_BATCH_JOB_LAUNCH) && rc) {
batch_job_launch_msg_t *launch_msg_ptr = task_ptr->msg_args_ptr;
uint32_t job_id = launch_msg_ptr->job_id;
info("Killing non-startable batch job %u: %s",
job_id, slurm_strerror(rc));
thread_state = DSH_DONE;
lock_slurmctld(job_write_lock);
job_complete(job_id, 0, false, 1);
unlock_slurmctld(job_write_lock);
goto cleanup;
}

Mark Grondona
committed
switch (rc) {
case SLURM_SUCCESS:
/* debug3("agent processed RPC to node %s",
thread_ptr->node_name); */

Mark Grondona
committed
thread_state = DSH_DONE;

Mark Grondona
committed
case ESLURMD_EPILOG_FAILED:
error("Epilog failure on host %s, setting DOWN",
thread_ptr->node_name);
thread_state = DSH_FAILED;

Mark Grondona
committed
case ESLURMD_PROLOG_FAILED:
error("Prolog failure on host %s, setting DOWN",
thread_ptr->node_name);
thread_state = DSH_FAILED;
break;
case ESLURM_INVALID_JOB_ID: /* Not indicative of a real error */
case ESLURMD_JOB_NOTRUNNING: /* Not indicative of a real error */
debug2("agent processed RPC to node %s: %s",
thread_ptr->node_name, slurm_strerror(rc));

Mark Grondona
committed
thread_state = DSH_DONE;
break;
default:
error("agent error from host %s for msg type %d: %s",
thread_ptr->node_name, task_ptr->msg_type,
slurm_strerror(rc));

Mark Grondona
committed
thread_state = DSH_DONE;
}
cleanup:
slurm_mutex_lock(task_ptr->thread_mutex_ptr);
thread_ptr->state = thread_state;
thread_ptr->end_time = (time_t) difftime(time(NULL),
thread_ptr->start_time);
/* Signal completion so another thread can replace us */
(*task_ptr->threads_active_ptr)--;
slurm_mutex_unlock(task_ptr->thread_mutex_ptr);
pthread_cond_signal(task_ptr->thread_cond_ptr);
return (void *) NULL;
* SIGALRM handler. We are really interested in interrupting hung communictions
* and causing them to return EINTR. Multiple interupts might be required.
xsignal(SIGALRM, _alarm_handler);
}
/*
* _queue_agent_retry - Queue any failed RPCs for later replay
* IN agent_info_ptr - pointer to info on completed agent requests
* IN count - number of agent requests which failed, count to requeue
*/
static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count)
{
agent_arg_t *agent_arg_ptr;
queued_request_t *queued_req_ptr = NULL;
thd_t *thread_ptr = agent_info_ptr->thread_struct;
int i, j;
if (count == 0)
return;
/* build agent argument with just the RPCs to retry */
agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
agent_arg_ptr->node_count = count;
agent_arg_ptr->retry = 1;
agent_arg_ptr->slurm_addr = xmalloc(sizeof(struct sockaddr_in)
* count);
agent_arg_ptr->node_names = xmalloc(MAX_NAME_LEN * count);
agent_arg_ptr->msg_type = agent_info_ptr->msg_type;
agent_arg_ptr->msg_args = *(agent_info_ptr->msg_args_pptr);
*(agent_info_ptr->msg_args_pptr) = NULL;
for (i = 0; i < agent_info_ptr->thread_count; i++) {
if (thread_ptr[i].state != DSH_NO_RESP)
continue;
agent_arg_ptr->slurm_addr[j] = thread_ptr[i].slurm_addr;
strncpy(&agent_arg_ptr->node_names[j * MAX_NAME_LEN],
thread_ptr[i].node_name, MAX_NAME_LEN);
if ((++j) == count)
break;
}
if (count != j) {
error("agent: Retry count (%d) != actual count (%d)",
count, j);
agent_arg_ptr->node_count = j;
}
debug2("Queue RPC msg_type=%u, nodes=%d for retry",
/* add the requeust to a list */
queued_req_ptr = xmalloc(sizeof(queued_request_t));
queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
queued_req_ptr->last_attempt = time(NULL);
slurm_mutex_lock(&retry_mutex);
retry_list = list_create(&_list_delete_retry);
fatal("list_create failed");
if (list_append(retry_list, (void *) queued_req_ptr) == 0)
fatal("list_append failed");
slurm_mutex_unlock(&retry_mutex);
/*
* _list_delete_retry - delete an entry from the retry list,
* see common/list.h for documentation
*/
static void _list_delete_retry(void *retry_entry)
{
queued_request_t *queued_req_ptr;
if (! retry_entry)
return;
queued_req_ptr = (queued_request_t *) retry_entry;
_purge_agent_args(queued_req_ptr->agent_arg_ptr);
xfree(queued_req_ptr);
}
* agent_retry - Agent for retrying pending RPCs. One pending request is
* issued if it has been pending for at least min_wait seconds
* IN min_wait - Minimum wait time between re-issue of a pending RPC
* RET count of queued requests remaining (zero if none are old)
extern int agent_retry (int min_wait)
int list_size = 0;
time_t now = time(NULL);
queued_request_t *queued_req_ptr = NULL;
slurm_mutex_lock(&retry_mutex);
if (retry_list) {
double age = 0;
queued_req_ptr = (queued_request_t *) list_peek(retry_list);
if (queued_req_ptr) {
age = difftime(now, queued_req_ptr->last_attempt);
if (age > min_wait) {
queued_req_ptr = (queued_request_t *)
list_pop(retry_list);
list_size = list_count(retry_list);;
} else /* too new */
queued_req_ptr = NULL;
}
}
slurm_mutex_unlock(&retry_mutex);
if (queued_req_ptr) {
agent_arg_t *agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
xfree(queued_req_ptr);
if (agent_arg_ptr)
_spawn_retry_agent(agent_arg_ptr);
else
error("agent_retry found record with no agent_args");
}
return list_size;
}
/*
* agent_queue_request - put a new request on the queue for later execution
* IN agent_arg_ptr - the request to enqueue
*/
void agent_queue_request(agent_arg_t *agent_arg_ptr)
{
queued_request_t *queued_req_ptr = NULL;
queued_req_ptr = xmalloc(sizeof(queued_request_t));
queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
/* queued_req_ptr->last_attempt = 0; Implicit */
slurm_mutex_lock(&retry_mutex);
if (retry_list == NULL) {
retry_list = list_create(&_list_delete_retry);
if (retry_list == NULL)
fatal("list_create failed");
}
list_prepend(retry_list, (void *)queued_req_ptr);
slurm_mutex_unlock(&retry_mutex);
/* _spawn_retry_agent - pthread_create an agent for the given task */
static void _spawn_retry_agent(agent_arg_t * agent_arg_ptr)
int retries = 0;
pthread_attr_t attr_agent;
pthread_t thread_agent;
if (agent_arg_ptr == NULL)
return;
debug2("Spawning RPC agent for msg_type %u",
agent_arg_ptr->msg_type);
if (pthread_attr_init(&attr_agent))
fatal("pthread_attr_init error %m");
if (pthread_attr_setdetachstate(&attr_agent,
PTHREAD_CREATE_DETACHED))
error("pthread_attr_setdetachstate error %m");
if (pthread_attr_setscope(&attr_agent, PTHREAD_SCOPE_SYSTEM))
error("pthread_attr_setscope error %m");
while (pthread_create(&thread_agent, &attr_agent,
agent, (void *) agent_arg_ptr)) {
error("pthread_create error %m");
if (++retries > MAX_RETRIES)
fatal("Can't create pthread");
sleep(1); /* sleep and try again */
/* _slurmctld_free_job_launch_msg is a variant of slurm_free_job_launch_msg
* because all environment variables currently loaded in one xmalloc
* buffer (see get_job_env()), which is different from how slurmd
* assembles the data from a message
*/
static void _slurmctld_free_job_launch_msg(batch_job_launch_msg_t * msg)
{
if (msg) {
if (msg->environment) {
xfree(msg->environment[0]);
xfree(msg->environment);
slurm_free_job_launch_msg(msg);
/* agent_purge - purge all pending RPC requests */
void agent_purge(void)
{
if (retry_list == NULL)
return;
slurm_mutex_lock(&retry_mutex);
list_destroy(retry_list);
retry_list = NULL;
slurm_mutex_unlock(&retry_mutex);
}
static void _purge_agent_args(agent_arg_t *agent_arg_ptr)
{
if (agent_arg_ptr == NULL)
return;
xfree(agent_arg_ptr->slurm_addr);
xfree(agent_arg_ptr->node_names);
if (agent_arg_ptr->msg_args) {
if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH)
_slurmctld_free_job_launch_msg(agent_arg_ptr->msg_args);
else if (agent_arg_ptr->msg_type ==
RESPONSE_RESOURCE_ALLOCATION)
slurm_free_resource_allocation_response_msg(
agent_arg_ptr->msg_args);
else
xfree(agent_arg_ptr->msg_args);
xfree(agent_arg_ptr);