diff --git a/src/plugins/select/cray/select_cray.c b/src/plugins/select/cray/select_cray.c index b72d4244ae11f0954c509ae4e6c66c689e557cfd..b2aba00f81e0a07e529c4a71bd28cb743bfb8d50 100644 --- a/src/plugins/select/cray/select_cray.c +++ b/src/plugins/select/cray/select_cray.c @@ -42,14 +42,19 @@ # if HAVE_INTTYPES_H # include <inttypes.h> # endif +# if WITH_PTHREADS +# include <pthread.h> +# endif #endif #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> +#include <stdlib.h> #include "src/common/slurm_xlator.h" /* Must be first */ +#include "src/slurmctld/locks.h" #include "other_select.h" /** @@ -58,6 +63,7 @@ * @other_jobinfo: hook into attached, "other" node selection plugin. */ struct select_jobinfo { + uint16_t cleaning; uint16_t magic; select_jobinfo_t *other_jobinfo; }; @@ -74,6 +80,11 @@ struct select_nodeinfo { }; #define NODEINFO_MAGIC 0x85ad +#define MAX_PTHREAD_RETRIES 1 + +/* Change CRAY_STATE_VERSION value when changing the state save + * format i.e. state_safe() */ +#define CRAY_STATE_VERSION "VER001" /* These are defined here so when we link with something other than * the slurmctld we will have these symbols defined. They will get @@ -132,6 +143,236 @@ const char plugin_type[] = "select/cray"; uint32_t plugin_id = 107; const uint32_t plugin_version = 100; +extern int select_p_select_jobinfo_free(select_jobinfo_t *jobinfo); + +static int _run_nhc(uint64_t id, char *nodelist, bool step) +{ +#ifdef HAVE_NATIVE_CRAY + int argc = 5, status = 1, wait_rc; + char *argv[argc]; + pid_t cpid; + DEF_TIMERS; + + START_TIMER; + argv[0] = "/opt/cray/nodehealth/default/bin/xtcleanup_after"; + if (step) + argv[1] = "-a"; + else + argv[1] = "-r"; + argv[2] = xstrdup_printf("%"PRIu64"", id); + argv[3] = cray_nodelist2nids(NULL, nodelist); + argv[4] = NULL; + + if (debug_flags & DEBUG_FLAG_SELECT_TYPE) + info("Calling NHC for id %"PRIu64" on nodes %s(%s)", + id, nodelist, argv[3]); + + if ((cpid = fork()) < 0) { + error("_run_nhc fork error: %m"); + goto fini; + } + if (cpid == 0) { +#ifdef SETPGRP_TWO_ARGS + setpgrp(0, 0); +#else + setpgrp(); +#endif + execvp(argv[0], argv); + exit(127); + } + + while (1) { + wait_rc = waitpid(cpid, &status, 0); + if (wait_rc < 0) { + if (errno == EINTR) + continue; + error("_run_nhc waitpid error: %m"); + break; + } else if (wait_rc > 0) { + killpg(cpid, SIGKILL); /* kill children too */ + break; + } + } + END_TIMER; + if (status != 0) { + error("_run_nhc %s %"PRIu64" exit status %u:%u took: %s", + step ? "step" : "job", step ? id : id, + WEXITSTATUS(status), WTERMSIG(status), TIME_STR); + } else if (debug_flags & DEBUG_FLAG_SELECT_TYPE) + info("_run_nhc %s %"PRIu64" completed took: %s", + step ? "step" : "job", step ? id : id, TIME_STR); + + fini: + xfree(argv[2]); + xfree(argv[3]); + return status; +#else + if (debug_flags & DEBUG_FLAG_SELECT_TYPE) + info("simluating calling NHC for id %"PRIu64" on nodes %s", + id, nodelist); + + /* simulate sleeping */ + sleep(2); + return 0; +#endif +} + +static void *_job_fini(void *args) +{ + struct job_record *job_ptr = (struct job_record *)args; + uint32_t job_id = 0; + char *node_list = NULL; + + /* Locks: Write job, write node */ + slurmctld_lock_t job_write_lock = { + NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK + }; + slurmctld_lock_t job_read_lock = { + NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; + + + if (!job_ptr) { + error("_job_fini: no job ptr given, this should never happen"); + return NULL; + } + + lock_slurmctld(job_read_lock); + job_id = job_ptr->job_id; + node_list = xstrdup(job_ptr->nodes); + unlock_slurmctld(job_read_lock); + + /* run NHC */ + _run_nhc(job_id, node_list, 0); + /***********/ + xfree(node_list); + + lock_slurmctld(job_write_lock); + if (job_ptr->magic == JOB_MAGIC) { + select_jobinfo_t *jobinfo = NULL; + other_job_fini(job_ptr); + + jobinfo = job_ptr->select_jobinfo->data; + jobinfo->cleaning = 0; + } else + error("_job_fini: job %u had a bad magic, " + "this should never happen", job_id); + + unlock_slurmctld(job_write_lock); + + return NULL; +} + +static void *_step_fini(void *args) +{ + struct step_record *step_ptr = (struct step_record *)args; + select_jobinfo_t *jobinfo = NULL; + uint64_t apid = 0; + char *node_list = NULL; + + /* Locks: Write job, write node */ + slurmctld_lock_t job_write_lock = { + NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK + }; + slurmctld_lock_t job_read_lock = { + NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; + + + if (!step_ptr) { + error("_step_fini: no step ptr given, " + "this should never happen"); + return NULL; + } + + lock_slurmctld(job_read_lock); + apid = SLURM_ID_HASH(step_ptr->job_ptr->job_id, step_ptr->step_id); + + if (!step_ptr->step_layout || !step_ptr->step_layout->node_list) { + if (step_ptr->job_ptr) + node_list = xstrdup(step_ptr->job_ptr->nodes); + } else + node_list = xstrdup(step_ptr->step_layout->node_list); + unlock_slurmctld(job_read_lock); + + /* run NHC */ + _run_nhc(apid, node_list, 0); + /***********/ + + xfree(node_list); + + lock_slurmctld(job_write_lock); + if (!step_ptr->job_ptr || !step_ptr->step_node_bitmap) { + error("For some reason we don't have a step_node_bitmap or " + "a job_ptr for %"PRIu64". This should never happen.", + apid); + } else { + other_step_finish(step_ptr); + + jobinfo = step_ptr->select_jobinfo->data; + jobinfo->cleaning = 0; + + /* free resources on the job */ + post_job_step(step_ptr); + } + unlock_slurmctld(job_write_lock); + + return NULL; +} + +static void _spawn_cleanup_thread( + void *obj_ptr, void *(*start_routine) (void *)) +{ + pthread_attr_t attr_agent; + pthread_t thread_agent; + int retries; + + /* spawn an agent */ + slurm_attr_init(&attr_agent); + if (pthread_attr_setdetachstate(&attr_agent, PTHREAD_CREATE_DETACHED)) + error("pthread_attr_setdetachstate error %m"); + + retries = 0; + while (pthread_create(&thread_agent, &attr_agent, + start_routine, obj_ptr)) { + error("pthread_create error %m"); + if (++retries > MAX_PTHREAD_RETRIES) + fatal("Can't create pthread"); + usleep(1000); /* sleep and retry */ + } + slurm_attr_destroy(&attr_agent); +} + +static void _select_jobinfo_pack(select_jobinfo_t *jobinfo, Buf buffer, + uint16_t protocol_version) +{ + if (!jobinfo) { + pack16(0, buffer); + } else { + pack16(jobinfo->cleaning, buffer); + } +} + +static int _select_jobinfo_unpack(select_jobinfo_t **jobinfo_pptr, + Buf buffer, uint16_t protocol_version) +{ + select_jobinfo_t *jobinfo = xmalloc(sizeof(struct select_jobinfo)); + + *jobinfo_pptr = jobinfo; + + jobinfo->magic = JOBINFO_MAGIC; + + safe_unpack16(&jobinfo->cleaning, buffer); + + return SLURM_SUCCESS; + +unpack_error: + select_p_select_jobinfo_free(jobinfo); + *jobinfo_pptr = NULL; + + return SLURM_ERROR; + + +} + /* * init() is called when the plugin is loaded, before any other functions * are called. Put global initialization here. @@ -167,6 +408,39 @@ extern int select_p_state_restore(char *dir_name) extern int select_p_job_init(List job_list) { + if (job_list && list_count(job_list)) { + ListIterator itr = list_iterator_create(job_list); + struct job_record *job_ptr; + + if (debug_flags & DEBUG_FLAG_SELECT_TYPE) + info("select_p_job_init: syncing jobs"); + + while ((job_ptr = list_next(itr))) { + select_jobinfo_t *jobinfo = + job_ptr->select_jobinfo->data; + + if (!jobinfo->cleaning && job_ptr->step_list + && list_count(job_ptr->step_list)) { + ListIterator itr_step = list_iterator_create( + job_ptr->step_list); + struct step_record *step_ptr; + while ((step_ptr = list_next(itr_step))) { + jobinfo = + step_ptr->select_jobinfo->data; + + if (!jobinfo->cleaning) + continue; + _spawn_cleanup_thread(step_ptr, + _step_fini); + } + list_iterator_destroy(itr_step); + continue; + } + _spawn_cleanup_thread(job_ptr, _job_fini); + } + list_iterator_destroy(itr); + } + return other_job_init(job_list); } @@ -272,7 +546,13 @@ extern int select_p_job_signal(struct job_record *job_ptr, int signal) extern int select_p_job_fini(struct job_record *job_ptr) { - return other_job_fini(job_ptr); + select_jobinfo_t *jobinfo = job_ptr->select_jobinfo->data; + + jobinfo->cleaning = 1; + + _spawn_cleanup_thread(job_ptr, _job_fini); + + return SLURM_SUCCESS; } extern int select_p_job_suspend(struct job_record *job_ptr, bool indf_susp) @@ -294,7 +574,22 @@ extern bitstr_t *select_p_step_pick_nodes(struct job_record *job_ptr, extern int select_p_step_finish(struct step_record *step_ptr) { - return other_step_finish(step_ptr); + select_jobinfo_t *jobinfo = step_ptr->select_jobinfo->data; + + if (IS_JOB_COMPLETING(step_ptr->job_ptr)) { + debug3("step completion %u.%u was received after job " + "allocation is already completing, no extra NHC needed.", + step_ptr->job_ptr->job_id, step_ptr->step_id); + other_step_finish(step_ptr); + /* free resources on the job */ + post_job_step(step_ptr); + return SLURM_SUCCESS; + } + + jobinfo->cleaning = 1; + _spawn_cleanup_thread(step_ptr, _step_fini); + + return SLURM_SUCCESS; } extern int select_p_pack_select_info(time_t last_query_time, @@ -413,7 +708,7 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo, void *data) { int rc = SLURM_SUCCESS; -// uint32_t *uint32 = (uint32_t *) data; + uint16_t *uint16 = (uint16_t *) data; // uint64_t *uint64 = (uint64_t *) data; if (jobinfo == NULL) { @@ -426,6 +721,9 @@ extern int select_p_select_jobinfo_set(select_jobinfo_t *jobinfo, } switch (data_type) { + case SELECT_JOBDATA_CLEANING: + jobinfo->cleaning = *uint16; + break; default: rc = other_select_jobinfo_set(jobinfo, data_type, data); break; @@ -439,7 +737,7 @@ extern int select_p_select_jobinfo_get(select_jobinfo_t *jobinfo, void *data) { int rc = SLURM_SUCCESS; -// uint32_t *uint32 = (uint32_t *) data; + uint16_t *uint16 = (uint16_t *) data; // uint64_t *uint64 = (uint64_t *) data; select_jobinfo_t **select_jobinfo = (select_jobinfo_t **) data; @@ -456,6 +754,9 @@ extern int select_p_select_jobinfo_get(select_jobinfo_t *jobinfo, case SELECT_JOBDATA_PTR: *select_jobinfo = jobinfo->other_jobinfo; break; + case SELECT_JOBDATA_CLEANING: + *uint16 = jobinfo->cleaning; + break; default: rc = other_select_jobinfo_get(jobinfo, data_type, data); break; @@ -502,20 +803,28 @@ extern int select_p_select_jobinfo_pack(select_jobinfo_t *jobinfo, Buf buffer, { int rc = SLURM_ERROR; - rc = other_select_jobinfo_pack(jobinfo->other_jobinfo, buffer, - protocol_version); + _select_jobinfo_pack(jobinfo, buffer, protocol_version); + if (jobinfo) + rc = other_select_jobinfo_pack(jobinfo->other_jobinfo, buffer, + protocol_version); + else + rc = other_select_jobinfo_pack(NULL, buffer, protocol_version); return rc; } extern int select_p_select_jobinfo_unpack(select_jobinfo_t **jobinfo_pptr, Buf buffer, uint16_t protocol_version) { - int rc = SLURM_ERROR; - select_jobinfo_t *jobinfo = xmalloc(sizeof(struct select_jobinfo)); + int rc; + select_jobinfo_t *jobinfo = NULL; - *jobinfo_pptr = jobinfo; + rc = _select_jobinfo_unpack(jobinfo_pptr, buffer, protocol_version); + + if (rc != SLURM_SUCCESS) + return SLURM_ERROR; + + jobinfo = *jobinfo_pptr; - jobinfo->magic = JOBINFO_MAGIC; rc = other_select_jobinfo_unpack(&jobinfo->other_jobinfo, buffer, protocol_version); diff --git a/src/slurmctld/step_mgr.c b/src/slurmctld/step_mgr.c index d765f71c9df40c8871e7bcf644c32a1d38edd89d..93c9b6ebe237443465130195ab0590e81c6cf8f6 100644 --- a/src/slurmctld/step_mgr.c +++ b/src/slurmctld/step_mgr.c @@ -220,7 +220,12 @@ static void _internal_step_complete( step_ptr->state = JOB_COMPLETING; select_g_step_finish(step_ptr); +#ifndef HAVE_NATIVE_CRAY + /* On a native cray this is ran after the NHC is + called which could take up to 3 minutes. + */ post_job_step(step_ptr); +#endif } }