Skip to content
Snippets Groups Projects
Commit 94c2dea6 authored by Moe Jette's avatar Moe Jette
Browse files

Added logic for checkpoint timeout, plugin has pthread and list of active

checkpoints. It sends an abort signal on timeout.
parent 73f46248
No related branches found
No related tags found
No related merge requests found
...@@ -35,6 +35,9 @@ ...@@ -35,6 +35,9 @@
#if HAVE_INTTYPES_H #if HAVE_INTTYPES_H
# include <inttypes.h> # include <inttypes.h>
#endif #endif
#ifdef WITH_PTHREADS
# include <pthread.h>
#endif
#include <signal.h> #include <signal.h>
#include <stdio.h> #include <stdio.h>
...@@ -42,6 +45,7 @@ ...@@ -42,6 +45,7 @@
#include <slurm/slurm.h> #include <slurm/slurm.h>
#include <slurm/slurm_errno.h> #include <slurm/slurm_errno.h>
#include "src/common/list.h"
#include "src/common/log.h" #include "src/common/log.h"
#include "src/common/pack.h" #include "src/common/pack.h"
#include "src/common/xassert.h" #include "src/common/xassert.h"
...@@ -51,7 +55,7 @@ ...@@ -51,7 +55,7 @@
#include "src/slurmctld/slurmctld.h" #include "src/slurmctld/slurmctld.h"
struct check_job_info { struct check_job_info {
uint16_t disabled; uint16_t disabled; /* counter, checkpointable only if zero */
uint16_t node_cnt; uint16_t node_cnt;
uint16_t reply_cnt; uint16_t reply_cnt;
uint16_t wait_time; uint16_t wait_time;
...@@ -62,8 +66,32 @@ struct check_job_info { ...@@ -62,8 +66,32 @@ struct check_job_info {
static void _comp_msg(struct step_record *step_ptr, static void _comp_msg(struct step_record *step_ptr,
struct check_job_info *check_ptr); struct check_job_info *check_ptr);
static int _step_sig(struct step_record * step_ptr, uint16_t wait, int signal); static void _send_sig(uint32_t job_id, uint32_t step_id, uint16_t signal,
char *node_name, slurm_addr node_addr);
static int _step_sig(struct step_record * step_ptr, uint16_t wait,
uint16_t signal, uint16_t sig_timeout);
/* checkpoint request timeout processing */
static pthread_t ckpt_agent_tid = 0;
static pthread_mutex_t ckpt_agent_mutex = PTHREAD_MUTEX_INITIALIZER;
static List ckpt_timeout_list = NULL;
struct ckpt_timeout_info {
uint32_t job_id;
uint32_t step_id;
uint16_t signal;
time_t start_time;
time_t end_time;
char *node_name;
slurm_addr node_addr;
};
static void *_ckpt_agent_thr(void *arg);
static void _ckpt_enqueue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time, uint16_t signal, uint16_t wait_time,
char *node_name, slurm_addr node_addr);
static void _ckpt_dequeue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time);
static void _ckpt_timeout_free(void *rec);
static void _ckpt_signal_step(struct ckpt_timeout_info *rec);
/* /*
* These variables are required by the generic plugin interface. If they * These variables are required by the generic plugin interface. If they
* are not found in the plugin, the plugin loader will ignore it. * are not found in the plugin, the plugin loader will ignore it.
...@@ -102,13 +130,36 @@ const uint32_t plugin_version = 90; ...@@ -102,13 +130,36 @@ const uint32_t plugin_version = 90;
*/ */
extern int init ( void ) extern int init ( void )
{ {
pthread_attr_t attr;
slurm_attr_init(&attr);
if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))
error("pthread_attr_setdetachstate: %m");
if (pthread_create(&ckpt_agent_tid, &attr, _ckpt_agent_thr, NULL)) {
error("pthread_create: %m");
return SLURM_ERROR;
}
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
extern int fini ( void ) extern int fini ( void )
{ {
return SLURM_SUCCESS; int i;
if (!&ckpt_agent_tid)
return SLURM_SUCCESS;
for (i=0; i<4; i++) {
if (pthread_cancel(ckpt_agent_tid)) {
ckpt_agent_tid = 0;
return SLURM_SUCCESS;
}
usleep(1000);
}
error("Could not kill checkpoint pthread");
return SLURM_ERROR;
} }
/* /*
...@@ -131,15 +182,16 @@ extern int slurm_ckpt_op ( uint16_t op, uint16_t data, ...@@ -131,15 +182,16 @@ extern int slurm_ckpt_op ( uint16_t op, uint16_t data,
if (check_ptr->disabled) if (check_ptr->disabled)
rc = ESLURM_DISABLED; rc = ESLURM_DISABLED;
else { else {
*event_time = check_ptr->time_stamp; if (check_ptr->reply_cnt < check_ptr->node_cnt)
*event_time = check_ptr->time_stamp;
rc = SLURM_SUCCESS; rc = SLURM_SUCCESS;
} }
break; break;
case CHECK_DISABLE: case CHECK_DISABLE:
check_ptr->disabled = 1; check_ptr->disabled++;
break; break;
case CHECK_ENABLE: case CHECK_ENABLE:
check_ptr->disabled = 0; check_ptr->disabled--;
break; break;
case CHECK_CREATE: case CHECK_CREATE:
check_ptr->time_stamp = time(NULL); check_ptr->time_stamp = time(NULL);
...@@ -147,11 +199,11 @@ extern int slurm_ckpt_op ( uint16_t op, uint16_t data, ...@@ -147,11 +199,11 @@ extern int slurm_ckpt_op ( uint16_t op, uint16_t data,
check_ptr->error_code = 0; check_ptr->error_code = 0;
xfree(check_ptr->error_msg); xfree(check_ptr->error_msg);
#ifdef SIGSOUND #ifdef SIGSOUND
rc = _step_sig(step_ptr, data, SIGSOUND); rc = _step_sig(step_ptr, data, SIGSOUND, SIGWINCH);
#else #else
/* No checkpoint, SIGWINCH for testing purposes */ /* No checkpoint, SIGWINCH for testing purposes */
info("Checkpoint not supported, sending SIGWINCH"); info("Checkpoint not supported, sending SIGWINCH");
rc = _step_sig(step_ptr, data, SIGWINCH); rc = _step_sig(step_ptr, data, SIGWINCH, SIGWINCH);
#endif #endif
break; break;
case CHECK_VACATE: case CHECK_VACATE:
...@@ -160,11 +212,11 @@ extern int slurm_ckpt_op ( uint16_t op, uint16_t data, ...@@ -160,11 +212,11 @@ extern int slurm_ckpt_op ( uint16_t op, uint16_t data,
check_ptr->error_code = 0; check_ptr->error_code = 0;
xfree(check_ptr->error_msg); xfree(check_ptr->error_msg);
#ifdef SIGMIGRATE #ifdef SIGMIGRATE
rc = _step_sig(step_ptr, data, SIGMIGRATE); rc = _step_sig(step_ptr, data, SIGMIGRATE, SIGTERM);
#else #else
/* No checkpoint, kill job now, useful for testing */ /* No checkpoint, kill job now, useful for testing */
info("Checkpoint not supported, sending SIGTERM"); info("Checkpoint not supported, sending SIGTERM");
rc = _step_sig(step_ptr, data, SIGTERM); rc = _step_sig(step_ptr, data, SIGTERM, SIGTERM);
#endif #endif
break; break;
case CHECK_RESTART: case CHECK_RESTART:
...@@ -208,6 +260,8 @@ extern int slurm_ckpt_comp ( struct step_record * step_ptr, time_t event_time, ...@@ -208,6 +260,8 @@ extern int slurm_ckpt_comp ( struct step_record * step_ptr, time_t event_time,
info("Checkpoint complete for job %u.%u", info("Checkpoint complete for job %u.%u",
step_ptr->job_ptr->job_id, step_ptr->step_id); step_ptr->job_ptr->job_id, step_ptr->step_id);
check_ptr->time_stamp = time(NULL); check_ptr->time_stamp = time(NULL);
_ckpt_dequeue_timeout(step_ptr->job_ptr->job_id,
step_ptr->step_id, event_time);
} }
return SLURM_SUCCESS; return SLURM_SUCCESS;
...@@ -264,13 +318,37 @@ extern int slurm_ckpt_unpack_job(check_jobinfo_t jobinfo, Buf buffer) ...@@ -264,13 +318,37 @@ extern int slurm_ckpt_unpack_job(check_jobinfo_t jobinfo, Buf buffer)
return SLURM_ERROR; return SLURM_ERROR;
} }
/* Send specified signal only to the process launched on node 0 */ /* Send a signal RPC to a specific node */
static int _step_sig(struct step_record * step_ptr, uint16_t wait, int signal) static void _send_sig(uint32_t job_id, uint32_t step_id, uint16_t signal,
char *node_name, slurm_addr node_addr)
{
agent_arg_t *agent_args;
kill_tasks_msg_t *kill_tasks_msg;
kill_tasks_msg = xmalloc(sizeof(kill_tasks_msg_t));
kill_tasks_msg->job_id = job_id;
kill_tasks_msg->job_step_id = step_id;
kill_tasks_msg->signal = signal;
agent_args = xmalloc(sizeof(agent_arg_t));
agent_args->msg_type = REQUEST_KILL_TASKS;
agent_args->retry = 1;
agent_args->msg_args = kill_tasks_msg;
agent_args->slurm_addr = xmalloc(sizeof(slurm_addr));
agent_args->slurm_addr[0] = node_addr;
agent_args->node_names = xstrdup(node_name);
agent_args->node_count = 1;
agent_queue_request(agent_args);
}
/* Send specified signal only to the process launched on node 0.
* If the request times out, send sig_timeout. */
static int _step_sig(struct step_record * step_ptr, uint16_t wait,
uint16_t signal, uint16_t sig_timeout)
{ {
struct check_job_info *check_ptr; struct check_job_info *check_ptr;
struct job_record *job_ptr; struct job_record *job_ptr;
agent_arg_t *agent_args = NULL;
kill_tasks_msg_t *kill_tasks_msg;
int i; int i;
xassert(step_ptr); xassert(step_ptr);
...@@ -291,31 +369,23 @@ static int _step_sig(struct step_record * step_ptr, uint16_t wait, int signal) ...@@ -291,31 +369,23 @@ static int _step_sig(struct step_record * step_ptr, uint16_t wait, int signal)
continue; continue;
if (check_ptr->node_cnt++ > 0) if (check_ptr->node_cnt++ > 0)
continue; continue;
kill_tasks_msg = xmalloc(sizeof(kill_tasks_msg_t)); check_ptr->time_stamp = time(NULL);
kill_tasks_msg->job_id = step_ptr->job_ptr->job_id; check_ptr->wait_time = wait;
kill_tasks_msg->job_step_id = step_ptr->step_id; _send_sig(step_ptr->job_ptr->job_id, step_ptr->step_id,
kill_tasks_msg->signal = signal; signal, node_record_table_ptr[i].name,
agent_args = xmalloc(sizeof(agent_arg_t)); node_record_table_ptr[i].slurm_addr);
agent_args->msg_type = REQUEST_KILL_TASKS; _ckpt_enqueue_timeout(step_ptr->job_ptr->job_id,
agent_args->retry = 1; step_ptr->step_id, check_ptr->time_stamp,
agent_args->msg_args = kill_tasks_msg; sig_timeout, wait, node_record_table_ptr[i].name,
agent_args->slurm_addr = xmalloc(sizeof(struct sockaddr_in)); node_record_table_ptr[i].slurm_addr);
agent_args->node_names = xmalloc(MAX_NAME_LEN);
agent_args->slurm_addr[0] = node_record_table_ptr[i].slurm_addr;
strncpy(&agent_args->node_names[0],
node_record_table_ptr[i].name, MAX_NAME_LEN);
agent_args->node_count++;
} }
if (agent_args == NULL) { if (!check_ptr->node_cnt) {
error("_step_sig: job %u.%u has no nodes", job_ptr->job_id, error("_step_sig: job %u.%u has no nodes", job_ptr->job_id,
step_ptr->step_id); step_ptr->step_id);
return ESLURM_INVALID_NODE_NAME; return ESLURM_INVALID_NODE_NAME;
} }
agent_queue_request(agent_args);
check_ptr->time_stamp = time(NULL);
check_ptr->wait_time = wait;
info("checkpoint requested for job %u.%u", job_ptr->job_id, info("checkpoint requested for job %u.%u", job_ptr->job_id,
step_ptr->step_id); step_ptr->step_id);
return SLURM_SUCCESS; return SLURM_SUCCESS;
...@@ -329,3 +399,101 @@ static void _comp_msg(struct step_record *step_ptr, ...@@ -329,3 +399,101 @@ static void _comp_msg(struct step_record *step_ptr,
step_ptr->job_ptr->job_id, step_ptr->step_id, step_ptr->job_ptr->job_id, step_ptr->step_id,
delay, check_ptr->error_code); delay, check_ptr->error_code);
} }
/* Checkpoint processing pthread
* Never returns, but is cancelled on plugin termiantion */
static void *_ckpt_agent_thr(void *arg)
{
ListIterator iter;
struct ckpt_timeout_info *rec;
time_t now;
while (1) {
sleep(1);
if (!ckpt_timeout_list)
continue;
now = time(NULL);
iter = list_iterator_create(ckpt_timeout_list);
slurm_mutex_lock(&ckpt_agent_mutex);
/* look for and process any timeouts */
while (rec = list_next(iter)) {
if (rec->end_time > now)
continue;
info("checkpoint timeout for %u.%u",
rec->job_id, rec->step_id);
_ckpt_signal_step(rec);
list_delete(iter);
}
slurm_mutex_unlock(&ckpt_agent_mutex);
list_iterator_destroy(iter);
}
}
static void _ckpt_signal_step(struct ckpt_timeout_info *rec)
{
/* debug("signal %u.%u %u", rec->job_id, rec->step_id, rec->signal); */
_send_sig(rec->job_id, rec->step_id, rec->signal,
rec->node_name, rec->node_addr);
}
/* Queue a checkpoint request timeout */
static void _ckpt_enqueue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time, uint16_t signal, uint16_t wait_time,
char *node_name, slurm_addr node_addr)
{
struct ckpt_timeout_info *rec;
if ((wait_time == 0) || (signal == 0))
return;
slurm_mutex_lock(&ckpt_agent_mutex);
if (!ckpt_timeout_list)
ckpt_timeout_list = list_create(_ckpt_timeout_free);
rec = xmalloc(sizeof(struct ckpt_timeout_info));
rec->job_id = job_id;
rec->step_id = step_id;
rec->signal = signal;
rec->start_time = start_time;
rec->end_time = start_time + wait_time;
rec->node_name = xstrdup(node_name);
rec->node_addr = node_addr;
/* debug("enqueue %u.%u %u", job_id, step_id, wait_time); */
list_enqueue(ckpt_timeout_list, rec);
slurm_mutex_unlock(&ckpt_agent_mutex);
}
static void _ckpt_timeout_free(void *rec)
{
struct ckpt_timeout_info *ckpt_rec = (struct ckpt_timeout_info *)rec;
if (ckpt_rec) {
xfree(ckpt_rec->node_name);
xfree(ckpt_rec);
}
}
/* De-queue a checkpoint timeout request. The operation completed */
static void _ckpt_dequeue_timeout(uint32_t job_id, uint32_t step_id,
time_t start_time)
{
ListIterator iter;
struct ckpt_timeout_info *rec;
slurm_mutex_lock(&ckpt_agent_mutex);
if (!ckpt_timeout_list)
goto fini;
iter = list_iterator_create(ckpt_timeout_list);
while (rec = list_next(iter)) {
if ((rec->job_id != job_id) || (rec->step_id != step_id)
|| (start_time && (rec->start_time != start_time)))
continue;
/* debug("dequeue %u.%u", job_id, step_id); */
list_delete(iter);
break;
}
list_iterator_destroy(iter);
fini:
slurm_mutex_unlock(&ckpt_agent_mutex);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment