Skip to content
Snippets Groups Projects
Commit e1591ee3 authored by Danny Auble's avatar Danny Auble
Browse files

CRAY - handle NHC on a native cray system. If the slurmctld is restarted

the process is reran for a step/job that was running it when the slurmctld
was shutdown.
parent c7dda394
No related branches found
No related tags found
No related merge requests found
...@@ -42,14 +42,19 @@ ...@@ -42,14 +42,19 @@
# if HAVE_INTTYPES_H # if HAVE_INTTYPES_H
# include <inttypes.h> # include <inttypes.h>
# endif # endif
# if WITH_PTHREADS
# include <pthread.h>
# endif
#endif #endif
#include <stdio.h> #include <stdio.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h>
#include "src/common/slurm_xlator.h" /* Must be first */ #include "src/common/slurm_xlator.h" /* Must be first */
#include "src/slurmctld/locks.h"
#include "other_select.h" #include "other_select.h"
/** /**
...@@ -58,6 +63,7 @@ ...@@ -58,6 +63,7 @@
* @other_jobinfo: hook into attached, "other" node selection plugin. * @other_jobinfo: hook into attached, "other" node selection plugin.
*/ */
struct select_jobinfo { struct select_jobinfo {
uint16_t cleaning;
uint16_t magic; uint16_t magic;
select_jobinfo_t *other_jobinfo; select_jobinfo_t *other_jobinfo;
}; };
...@@ -74,6 +80,11 @@ struct select_nodeinfo { ...@@ -74,6 +80,11 @@ struct select_nodeinfo {
}; };
#define NODEINFO_MAGIC 0x85ad #define NODEINFO_MAGIC 0x85ad
#define MAX_PTHREAD_RETRIES 1
/* Change CRAY_STATE_VERSION value when changing the state save
* format i.e. state_safe() */
#define CRAY_STATE_VERSION "VER001"
/* These are defined here so when we link with something other than /* These are defined here so when we link with something other than
* the slurmctld we will have these symbols defined. They will get * the slurmctld we will have these symbols defined. They will get
...@@ -132,6 +143,236 @@ const char plugin_type[] = "select/cray"; ...@@ -132,6 +143,236 @@ const char plugin_type[] = "select/cray";
uint32_t plugin_id = 107; uint32_t plugin_id = 107;
const uint32_t plugin_version = 100; const uint32_t plugin_version = 100;
extern int select_p_select_jobinfo_free(select_jobinfo_t *jobinfo);
static int _run_nhc(uint64_t id, char *nodelist, bool step)
{
#ifdef HAVE_NATIVE_CRAY
int argc = 5, status = 1, wait_rc;
char *argv[argc];
pid_t cpid;
DEF_TIMERS;
START_TIMER;
argv[0] = "/opt/cray/nodehealth/default/bin/xtcleanup_after";
if (step)
argv[1] = "-a";
else
argv[1] = "-r";
argv[2] = xstrdup_printf("%"PRIu64"", id);
argv[3] = cray_nodelist2nids(NULL, nodelist);
argv[4] = NULL;
if (debug_flags & DEBUG_FLAG_SELECT_TYPE)
info("Calling NHC for id %"PRIu64" on nodes %s(%s)",
id, nodelist, argv[3]);
if ((cpid = fork()) < 0) {
error("_run_nhc fork error: %m");
goto fini;
}
if (cpid == 0) {
#ifdef SETPGRP_TWO_ARGS
setpgrp(0, 0);
#else
setpgrp();
#endif
execvp(argv[0], argv);
exit(127);
}
while (1) {
wait_rc = waitpid(cpid, &status, 0);
if (wait_rc < 0) {
if (errno == EINTR)
continue;
error("_run_nhc waitpid error: %m");
break;
} else if (wait_rc > 0) {
killpg(cpid, SIGKILL); /* kill children too */
break;
}
}
END_TIMER;
if (status != 0) {
error("_run_nhc %s %"PRIu64" exit status %u:%u took: %s",
step ? "step" : "job", step ? id : id,
WEXITSTATUS(status), WTERMSIG(status), TIME_STR);
} else if (debug_flags & DEBUG_FLAG_SELECT_TYPE)
info("_run_nhc %s %"PRIu64" completed took: %s",
step ? "step" : "job", step ? id : id, TIME_STR);
fini:
xfree(argv[2]);
xfree(argv[3]);
return status;
#else
if (debug_flags & DEBUG_FLAG_SELECT_TYPE)
info("simluating calling NHC for id %"PRIu64" on nodes %s",
id, nodelist);
/* simulate sleeping */
sleep(2);
return 0;
#endif
}
static void *_job_fini(void *args)
{
struct job_record *job_ptr = (struct job_record *)args;
uint32_t job_id = 0;
char *node_list = NULL;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK
};
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
if (!job_ptr) {
error("_job_fini: no job ptr given, this should never happen");
return NULL;
}
lock_slurmctld(job_read_lock);
job_id = job_ptr->job_id;
node_list = xstrdup(job_ptr->nodes);
unlock_slurmctld(job_read_lock);
/* run NHC */
_run_nhc(job_id, node_list, 0);
/***********/
xfree(node_list);
lock_slurmctld(job_write_lock);
if (job_ptr->magic == JOB_MAGIC) {
select_jobinfo_t *jobinfo = NULL;
other_job_fini(job_ptr);
jobinfo = job_ptr->select_jobinfo->data;
jobinfo->cleaning = 0;
} else
error("_job_fini: job %u had a bad magic, "
"this should never happen", job_id);
unlock_slurmctld(job_write_lock);
return NULL;
}
static void *_step_fini(void *args)
{
struct step_record *step_ptr = (struct step_record *)args;
select_jobinfo_t *jobinfo = NULL;
uint64_t apid = 0;
char *node_list = NULL;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK
};
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
if (!step_ptr) {
error("_step_fini: no step ptr given, "
"this should never happen");
return NULL;
}
lock_slurmctld(job_read_lock);
apid = SLURM_ID_HASH(step_ptr->job_ptr->job_id, step_ptr->step_id);
if (!step_ptr->step_layout || !step_ptr->step_layout->node_list) {
if (step_ptr->job_ptr)
node_list = xstrdup(step_ptr->job_ptr->nodes);
} else
node_list = xstrdup(step_ptr->step_layout->node_list);
unlock_slurmctld(job_read_lock);
/* run NHC */
_run_nhc(apid, node_list, 0);
/***********/
xfree(node_list);
lock_slurmctld(job_write_lock);
if (!step_ptr->job_ptr || !step_ptr->step_node_bitmap) {
error("For some reason we don't have a step_node_bitmap or "
"a job_ptr for %"PRIu64". This should never happen.",
apid);
} else {
other_step_finish(step_ptr);
jobinfo = step_ptr->select_jobinfo->data;
jobinfo->cleaning = 0;
/* free resources on the job */
post_job_step(step_ptr);
}
unlock_slurmctld(job_write_lock);
return NULL;
}
static void _spawn_cleanup_thread(
void *obj_ptr, void *(*start_routine) (void *))
{
pthread_attr_t attr_agent;
pthread_t thread_agent;
int retries;
/* spawn an agent */
slurm_attr_init(&attr_agent);
if (pthread_attr_setdetachstate(&attr_agent, PTHREAD_CREATE_DETACHED))
error("pthread_attr_setdetachstate error %m");
retries = 0;
while (pthread_create(&thread_agent, &attr_agent,
start_routine, obj_ptr)) {
error("pthread_create error %m");
if (++retries > MAX_PTHREAD_RETRIES)
fatal("Can't create pthread");
usleep(1000); /* sleep and retry */
}
slurm_attr_destroy(&attr_agent);
}
static void _select_jobinfo_pack(select_jobinfo_t *jobinfo, Buf buffer,
uint16_t protocol_version)
{
if (!jobinfo) {
pack16(0, buffer);
} else {
pack16(jobinfo->cleaning, buffer);
}
}
static int _select_jobinfo_unpack(select_jobinfo_t **jobinfo_pptr,
Buf buffer, uint16_t protocol_version)
{
select_jobinfo_t *jobinfo = xmalloc(sizeof(struct select_jobinfo));
*jobinfo_pptr = jobinfo;
jobinfo->magic = JOBINFO_MAGIC;
safe_unpack16(&jobinfo->cleaning, buffer);
return SLURM_SUCCESS;
unpack_error:
select_p_select_jobinfo_free(jobinfo);
*jobinfo_pptr = NULL;
return SLURM_ERROR;
}
/* /*
* init() is called when the plugin is loaded, before any other functions * init() is called when the plugin is loaded, before any other functions
* are called. Put global initialization here. * are called. Put global initialization here.
...@@ -167,6 +408,39 @@ extern int select_p_state_restore(char *dir_name) ...@@ -167,6 +408,39 @@ extern int select_p_state_restore(char *dir_name)
extern int select_p_job_init(List job_list) extern int select_p_job_init(List job_list)
{ {
if (job_list && list_count(job_list)) {
ListIterator itr = list_iterator_create(job_list);
struct job_record *job_ptr;
if (debug_flags & DEBUG_FLAG_SELECT_TYPE)
info("select_p_job_init: syncing jobs");
while ((job_ptr = list_next(itr))) {
select_jobinfo_t *jobinfo =
job_ptr->select_jobinfo->data;
if (!jobinfo->cleaning && job_ptr->step_list
&& list_count(job_ptr->step_list)) {
ListIterator itr_step = list_iterator_create(
job_ptr->step_list);
struct step_record *step_ptr;
while ((step_ptr = list_next(itr_step))) {
jobinfo =
step_ptr->select_jobinfo->data;
if (!jobinfo->cleaning)
continue;
_spawn_cleanup_thread(step_ptr,
_step_fini);
}
list_iterator_destroy(itr_step);
continue;
}
_spawn_cleanup_thread(job_ptr, _job_fini);
}
list_iterator_destroy(itr);
}
return other_job_init(job_list); return other_job_init(job_list);
} }
...@@ -272,7 +546,13 @@ extern int select_p_job_signal(struct job_record *job_ptr, int signal) ...@@ -272,7 +546,13 @@ extern int select_p_job_signal(struct job_record *job_ptr, int signal)
extern int select_p_job_fini(struct job_record *job_ptr) extern int select_p_job_fini(struct job_record *job_ptr)
{ {
return other_job_fini(job_ptr); select_jobinfo_t *jobinfo = job_ptr->select_jobinfo->data;
jobinfo->cleaning = 1;
_spawn_cleanup_thread(job_ptr, _job_fini);
return SLURM_SUCCESS;
} }
extern int select_p_job_suspend(struct job_record *job_ptr, bool indf_susp) extern int select_p_job_suspend(struct job_record *job_ptr, bool indf_susp)
...@@ -294,7 +574,22 @@ extern bitstr_t *select_p_step_pick_nodes(struct job_record *job_ptr, ...@@ -294,7 +574,22 @@ extern bitstr_t *select_p_step_pick_nodes(struct job_record *job_ptr,
extern int select_p_step_finish(struct step_record *step_ptr) extern int select_p_step_finish(struct step_record *step_ptr)
{ {
return other_step_finish(step_ptr); select_jobinfo_t *jobinfo = step_ptr->select_jobinfo->data;
if (IS_JOB_COMPLETING(step_ptr->job_ptr)) {
debug3("step completion %u.%u was received after job "
"allocation is already completing, no extra NHC needed.",
step_ptr->job_ptr->job_id, step_ptr->step_id);
other_step_finish(step_ptr);
/* free resources on the job */
post_job_step(step_ptr);
return SLURM_SUCCESS;
}
jobinfo->cleaning = 1;
_spawn_cleanup_thread(step_ptr, _step_fini);
return SLURM_SUCCESS;
} }
extern int select_p_pack_select_info(time_t last_query_time, extern int select_p_pack_select_info(time_t last_query_time,
...@@ -413,7 +708,7 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo, ...@@ -413,7 +708,7 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo,
void *data) void *data)
{ {
int rc = SLURM_SUCCESS; int rc = SLURM_SUCCESS;
// uint32_t *uint32 = (uint32_t *) data; uint16_t *uint16 = (uint16_t *) data;
// uint64_t *uint64 = (uint64_t *) data; // uint64_t *uint64 = (uint64_t *) data;
if (jobinfo == NULL) { if (jobinfo == NULL) {
...@@ -426,6 +721,9 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo, ...@@ -426,6 +721,9 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo,
} }
switch (data_type) { switch (data_type) {
case SELECT_JOBDATA_CLEANING:
jobinfo->cleaning = *uint16;
break;
default: default:
rc = other_select_jobinfo_set(jobinfo, data_type, data); rc = other_select_jobinfo_set(jobinfo, data_type, data);
break; break;
...@@ -439,7 +737,7 @@ extern int select_p_select_jobinfo_get(select_jobinfo_t *jobinfo, ...@@ -439,7 +737,7 @@ extern int select_p_select_jobinfo_get(select_jobinfo_t *jobinfo,
void *data) void *data)
{ {
int rc = SLURM_SUCCESS; int rc = SLURM_SUCCESS;
// uint32_t *uint32 = (uint32_t *) data; uint16_t *uint16 = (uint16_t *) data;
// uint64_t *uint64 = (uint64_t *) data; // uint64_t *uint64 = (uint64_t *) data;
select_jobinfo_t **select_jobinfo = (select_jobinfo_t **) data; select_jobinfo_t **select_jobinfo = (select_jobinfo_t **) data;
...@@ -456,6 +754,9 @@ extern int select_p_select_jobinfo_get(select_jobinfo_t *jobinfo, ...@@ -456,6 +754,9 @@ extern int select_p_select_jobinfo_get(select_jobinfo_t *jobinfo,
case SELECT_JOBDATA_PTR: case SELECT_JOBDATA_PTR:
*select_jobinfo = jobinfo->other_jobinfo; *select_jobinfo = jobinfo->other_jobinfo;
break; break;
case SELECT_JOBDATA_CLEANING:
*uint16 = jobinfo->cleaning;
break;
default: default:
rc = other_select_jobinfo_get(jobinfo, data_type, data); rc = other_select_jobinfo_get(jobinfo, data_type, data);
break; break;
...@@ -502,20 +803,28 @@ extern int select_p_select_jobinfo_pack(select_jobinfo_t *jobinfo, Buf buffer, ...@@ -502,20 +803,28 @@ extern int select_p_select_jobinfo_pack(select_jobinfo_t *jobinfo, Buf buffer,
{ {
int rc = SLURM_ERROR; int rc = SLURM_ERROR;
rc = other_select_jobinfo_pack(jobinfo->other_jobinfo, buffer, _select_jobinfo_pack(jobinfo, buffer, protocol_version);
protocol_version); if (jobinfo)
rc = other_select_jobinfo_pack(jobinfo->other_jobinfo, buffer,
protocol_version);
else
rc = other_select_jobinfo_pack(NULL, buffer, protocol_version);
return rc; return rc;
} }
extern int select_p_select_jobinfo_unpack(select_jobinfo_t **jobinfo_pptr, extern int select_p_select_jobinfo_unpack(select_jobinfo_t **jobinfo_pptr,
Buf buffer, uint16_t protocol_version) Buf buffer, uint16_t protocol_version)
{ {
int rc = SLURM_ERROR; int rc;
select_jobinfo_t *jobinfo = xmalloc(sizeof(struct select_jobinfo)); select_jobinfo_t *jobinfo = NULL;
*jobinfo_pptr = jobinfo; rc = _select_jobinfo_unpack(jobinfo_pptr, buffer, protocol_version);
if (rc != SLURM_SUCCESS)
return SLURM_ERROR;
jobinfo = *jobinfo_pptr;
jobinfo->magic = JOBINFO_MAGIC;
rc = other_select_jobinfo_unpack(&jobinfo->other_jobinfo, rc = other_select_jobinfo_unpack(&jobinfo->other_jobinfo,
buffer, protocol_version); buffer, protocol_version);
......
...@@ -220,7 +220,12 @@ static void _internal_step_complete( ...@@ -220,7 +220,12 @@ static void _internal_step_complete(
step_ptr->state = JOB_COMPLETING; step_ptr->state = JOB_COMPLETING;
select_g_step_finish(step_ptr); select_g_step_finish(step_ptr);
#ifndef HAVE_NATIVE_CRAY
/* On a native cray this is ran after the NHC is
called which could take up to 3 minutes.
*/
post_job_step(step_ptr); post_job_step(step_ptr);
#endif
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment