Skip to content
Snippets Groups Projects
Commit 9caea315 authored by Moe Jette's avatar Moe Jette
Browse files

apply gang2.patch from Chris Holmes/HP

parent 6a9ec7dd
No related branches found
No related tags found
No related merge requests found
...@@ -184,6 +184,8 @@ _get_gr_type() { ...@@ -184,6 +184,8 @@ _get_gr_type() {
case CR_SOCKET_MEMORY: case CR_SOCKET_MEMORY:
return GS_SOCKET; return GS_SOCKET;
} }
/* note that CR_MEMORY is node-level scheduling with
* memory management */
return GS_NODE; return GS_NODE;
} }
...@@ -351,6 +353,17 @@ _find_gs_part(char *name) ...@@ -351,6 +353,17 @@ _find_gs_part(char *name)
return NULL; return NULL;
} }
/* find the job_list index of the given job_id in the given partition */
static int
_find_job_index(struct gs_part *p_ptr, uint32_t job_id) {
int i;
for (i = 0; i < p_ptr->num_jobs; i++) {
if (p_ptr->job_list[i]->job_id == job_id)
return i;
}
return -1;
}
/* return 1 if job fits in this row, else return 0 */ /* return 1 if job fits in this row, else return 0 */
static int static int
_can_cpus_fit(bitstr_t *setmap, struct gs_job *j_ptr, struct gs_part *p_ptr) _can_cpus_fit(bitstr_t *setmap, struct gs_job *j_ptr, struct gs_part *p_ptr)
...@@ -423,18 +436,18 @@ _add_job_to_active(struct gs_job *j_ptr, struct gs_part *p_ptr) ...@@ -423,18 +436,18 @@ _add_job_to_active(struct gs_job *j_ptr, struct gs_part *p_ptr)
/* add job to active_bitmap */ /* add job to active_bitmap */
if (!p_ptr->active_bitmap) { if (!p_ptr->active_bitmap) {
/* allocate the active bitmap */ /* allocate the active bitmap */
debug3("_add_job_to_active: using job %d as active base", debug3("sched/gang: _add_job_to_active: using job %d as active base",
j_ptr->job_id); j_ptr->job_id);
p_ptr->active_bitmap = bit_copy(j_ptr->bitmap); p_ptr->active_bitmap = bit_copy(j_ptr->bitmap);
} else if (p_ptr->jobs_running == 0) { } else if (p_ptr->jobs_running == 0) {
/* if the active_bitmap exists but jobs_running is '0', /* if the active_bitmap exists but jobs_running is '0',
* this means to overwrite the bitmap memory */ * this means to overwrite the bitmap memory */
debug3("_add_job_to_active: copying job %d into active base", debug3("sched/gang: _add_job_to_active: copying job %d into active base",
j_ptr->job_id); j_ptr->job_id);
bit_copybits(p_ptr->active_bitmap, j_ptr->bitmap); bit_copybits(p_ptr->active_bitmap, j_ptr->bitmap);
} else { } else {
/* add job to existing jobs in the active bitmap */ /* add job to existing jobs in the active bitmap */
debug3("_add_job_to_active: merging job %d into active bitmap", debug3("sched/gang: _add_job_to_active: merging job %d into active bitmap",
j_ptr->job_id); j_ptr->job_id);
bit_or(p_ptr->active_bitmap, j_ptr->bitmap); bit_or(p_ptr->active_bitmap, j_ptr->bitmap);
} }
...@@ -604,6 +617,88 @@ _get_bitmap(bitstr_t *origmap, uint32_t job_id) ...@@ -604,6 +617,88 @@ _get_bitmap(bitstr_t *origmap, uint32_t job_id)
return newmap; return newmap;
} }
/* rebuild the active row BUT preserve the order and state of existing jobs.
* This is called after one or more jobs have been removed from the partition.
*/
static void
_update_active_row(struct gs_part *p_ptr)
{
int i;
struct gs_job *j_ptr;
/* we need to rebuild the active row BUT ensure that
* we preserve any existing active or filler jobs. */
p_ptr->jobs_running = 0;
for (i = 0; i < p_ptr->num_jobs; i++) {
j_ptr = p_ptr->job_list[i];
if (j_ptr->row_state == GS_ACTIVE)
_add_job_to_active(j_ptr, p_ptr);
else if (j_ptr->row_state == GS_FILLER) {
_add_job_to_active(j_ptr, p_ptr);
j_ptr->row_state = GS_FILLER;
}
}
/* Now make a second pass to "fill in" any new jobs */
for (i = 0; i < p_ptr->num_jobs; i++) {
j_ptr = p_ptr->job_list[i];
if (j_ptr->row_state == GS_NO_ACTIVE &&
(p_ptr->jobs_running == 0 ||
_job_fits_in_active_row(j_ptr, p_ptr))) {
_add_job_to_active(j_ptr, p_ptr);
/* note that this job is a "filler" for this row */
j_ptr->row_state = GS_FILLER;
/* resume the job */
if (j_ptr->sig_state == GS_SUSPEND) {
_signal_job(j_ptr->job_id, GS_RESUME);
j_ptr->sig_state = GS_RESUME;
}
}
}
return;
}
/* remove the given job from the given partition */
static void
_remove_job_from_part(uint32_t job_id, struct gs_part *p_ptr)
{
int i;
struct gs_job *j_ptr;
if (!job_id || !p_ptr)
return;
debug3("sched/gang: _remove_job_from_part: removing job %d", job_id);
/* find the job in the job_list */
i = _find_job_index(p_ptr, job_id);
if (i < 0)
/* job not found */
return;
j_ptr = p_ptr->job_list[i];
/* remove the job from the job_list by shifting everyone else down */
p_ptr->num_jobs -= 1;
for (; i < p_ptr->num_jobs; i++) {
p_ptr->job_list[i] = p_ptr->job_list[i+1];
}
p_ptr->job_list[i] = NULL;
/* make sure the job is not suspended, and then delete it */
if (j_ptr->sig_state == GS_SUSPEND) {
debug3("sched/gang: _remove_job_from_part: resuming suspended job %d",
j_ptr->job_id);
_signal_job(j_ptr->job_id, GS_RESUME);
}
bit_free(j_ptr->bitmap);
j_ptr->bitmap = NULL;
if (j_ptr->alloc_cpus)
xfree(j_ptr->alloc_cpus);
j_ptr->alloc_cpus = NULL;
xfree(j_ptr);
return;
}
/* add the given job to the appropriate partition */ /* add the given job to the appropriate partition */
static int static int
_add_job_to_part(struct gs_part *p_ptr, uint32_t job_id, bitstr_t *job_bitmap) _add_job_to_part(struct gs_part *p_ptr, uint32_t job_id, bitstr_t *job_bitmap)
...@@ -625,18 +720,16 @@ _add_job_to_part(struct gs_part *p_ptr, uint32_t job_id, bitstr_t *job_bitmap) ...@@ -625,18 +720,16 @@ _add_job_to_part(struct gs_part *p_ptr, uint32_t job_id, bitstr_t *job_bitmap)
/* job_list is initialized to be NULL filled */ /* job_list is initialized to be NULL filled */
} }
/* protect against duplicates */ /* protect against duplicates */
for (i = 0; i < p_ptr->num_jobs; i++) { i = _find_job_index(p_ptr, job_id);
if (p_ptr->job_list[i]->job_id == job_id) if (i >= 0) {
break; /* This job already exists, but the resource allocation
} * may have changed. In any case, remove the existing
if (i < p_ptr->num_jobs) { * job before adding this new one.
/* This job already exists. If the bitmap is
* the same, then there's nothing more to do.
*/ */
/* HELP!! Can this happen? I don't think it should, debug3("sched/gang: _add_job_to_part: duplicate job %d detected",
so we will just die for now if it does happen. job_id);
*/ _remove_job_from_part(job_id, p_ptr);
fatal("sched/gang: unsupported duplicate job detected!"); _update_active_row(p_ptr);
} }
if (p_ptr->num_jobs+1 == p_ptr->job_list_size) { if (p_ptr->num_jobs+1 == p_ptr->job_list_size) {
...@@ -682,104 +775,16 @@ _add_job_to_part(struct gs_part *p_ptr, uint32_t job_id, bitstr_t *job_bitmap) ...@@ -682,104 +775,16 @@ _add_job_to_part(struct gs_part *p_ptr, uint32_t job_id, bitstr_t *job_bitmap)
return GS_SUCCESS; return GS_SUCCESS;
} }
/* remove the given job from the given partition */
static void
_remove_job_from_part(uint32_t job_id, struct gs_part *p_ptr)
{
int i;
struct gs_job *j_ptr;
if (!job_id || !p_ptr)
return;
debug3("sched/gang: _remove_job_from_part: removing job %d", job_id);
/* find the job in the job_list */
for (i = 0; i < p_ptr->num_jobs; i++) {
if (p_ptr->job_list[i]->job_id == job_id)
break;
}
if (i >= p_ptr->num_jobs)
/* job not found */
return;
j_ptr = p_ptr->job_list[i];
/* remove the job from the job_list by shifting everyone else down */
p_ptr->num_jobs -= 1;
for (; i < p_ptr->num_jobs; i++) {
p_ptr->job_list[i] = p_ptr->job_list[i+1];
}
p_ptr->job_list[i] = NULL;
/* make sure the job is not suspended, and then delete it */
if (j_ptr->sig_state == GS_SUSPEND) {
debug3("sched/gang: _remove_job_from_part: resuming suspended job %d",
j_ptr->job_id);
_signal_job(j_ptr->job_id, GS_RESUME);
}
bit_free(j_ptr->bitmap);
xfree(j_ptr->alloc_cpus);
xfree(j_ptr);
/* in order to remove this job from the active row,
* we need to rebuild the active row BUT ensure that
* we preserve any existing active or filler jobs. */
p_ptr->jobs_running = 0;
for (i = 0; i < p_ptr->num_jobs; i++) {
j_ptr = p_ptr->job_list[i];
if (j_ptr->row_state == GS_ACTIVE)
_add_job_to_active(j_ptr, p_ptr);
else if (j_ptr->row_state == GS_FILLER) {
_add_job_to_active(j_ptr, p_ptr);
j_ptr->row_state = GS_FILLER;
}
}
/* Now make a second pass to "fill in" any new jobs */
for (i = 0; i < p_ptr->num_jobs; i++) {
j_ptr = p_ptr->job_list[i];
if (j_ptr->row_state == GS_NO_ACTIVE &&
(p_ptr->jobs_running == 0 ||
_job_fits_in_active_row(j_ptr, p_ptr))) {
_add_job_to_active(j_ptr, p_ptr);
/* note that this job is a "filler" for this row */
j_ptr->row_state = GS_FILLER;
/* resume the job */
if (j_ptr->sig_state == GS_SUSPEND) {
_signal_job(j_ptr->job_id, GS_RESUME);
j_ptr->sig_state = GS_RESUME;
}
}
}
return;
}
/* ensure that all jobs running in SLURM are accounted for. /* ensure that all jobs running in SLURM are accounted for.
* this procedure assumes that the gs data has already been * this procedure assumes that the gs data has already been
* locked by the caller! * locked by the caller!
* */
* concerns: make sure we don't add a job that we've just removed?
*
* FIXME: add support for retrieving signal state, in case of failover.
* We're doing this instead of storing and retrieving sched/gang
* state. Rationale below:
*
* This inspires the question: do we need to keep state info? Or can it
* all be retrieved from existing state info? NO - we should preserve job
* order and sig_state.
*
* State info is used only for failover scenarios. Without state info, jobs
* would be re-ordered in _scan_slurm_job_list based on their order in the
* current master job list. We could add code to try to reconstruct
* timeslice order based on the state of the jobs during recovery. Sig state
* could be recovered from _scan_slurm_job_list too...
*/
static void static void
_scan_slurm_job_list() _scan_slurm_job_list()
{ {
struct job_record *job_ptr; struct job_record *job_ptr;
struct gs_part *p_ptr; struct gs_part *p_ptr;
int i, found; int i;
ListIterator job_iterator; ListIterator job_iterator;
if (!job_list) { /* no jobs */ if (!job_list) { /* no jobs */
...@@ -790,23 +795,32 @@ _scan_slurm_job_list() ...@@ -790,23 +795,32 @@ _scan_slurm_job_list()
while ((job_ptr = (struct job_record *) list_next(job_iterator))) { while ((job_ptr = (struct job_record *) list_next(job_iterator))) {
debug3("sched/gang: _scan_slurm_job_list: checking job %d", debug3("sched/gang: _scan_slurm_job_list: checking job %d",
job_ptr->job_id); job_ptr->job_id);
/* if the job is suspended and this plugin is not if (job_ptr->job_state == JOB_PENDING)
aware of it, then we'll just ignore it for now (?) */
if (job_ptr->job_state == JOB_PENDING ||
job_ptr->job_state == JOB_SUSPENDED)
continue; continue;
if (job_ptr->job_state == JOB_RUNNING) { if (job_ptr->job_state == JOB_SUSPENDED ||
job_ptr->job_state == JOB_RUNNING) {
/* are we tracking this job already? */ /* are we tracking this job already? */
p_ptr = _find_gs_part(job_ptr->partition); p_ptr = _find_gs_part(job_ptr->partition);
if (!p_ptr) /* no partition */ if (!p_ptr) /* no partition */
continue; continue;
found = 0; i = _find_job_index(p_ptr, job_ptr->job_id);
for (i = 0; !found && i < p_ptr->num_jobs; i++) { if (i >= 0)
if (p_ptr->job_list[i]->job_id == job_ptr->job_id)
found = 1;
}
if (found)
continue; continue;
/* We're not tracking this job. Resume it if it's
* suspended, and then add it to the job list. */
if (job_ptr->job_state == JOB_SUSPENDED)
/* The likely scenario here is that the slurmctld has
* failed over, and this is a job that the sched/gang
* plugin had previously suspended.
* It's not possible to determine the previous order
* of jobs without preserving sched/gang state, which
* is not worth the extra infrastructure. Just resume
* the job and then add it to the job list.
*/
_signal_job(job_ptr->job_id, GS_RESUME);
_add_job_to_part(p_ptr, job_ptr->job_id, _add_job_to_part(p_ptr, job_ptr->job_id,
job_ptr->node_bitmap); job_ptr->node_bitmap);
continue; continue;
...@@ -821,6 +835,12 @@ _scan_slurm_job_list() ...@@ -821,6 +835,12 @@ _scan_slurm_job_list()
_remove_job_from_part(job_ptr->job_id, p_ptr); _remove_job_from_part(job_ptr->job_id, p_ptr);
} }
list_iterator_destroy(job_iterator); list_iterator_destroy(job_iterator);
/* now that all of the old jobs have been flushed out,
* update the active row of all partitions */
for (p_ptr = gs_part_list; p_ptr; p_ptr = p_ptr->next)
_update_active_row(p_ptr);
return;
} }
...@@ -946,28 +966,6 @@ gs_job_scan(void) ...@@ -946,28 +966,6 @@ gs_job_scan(void)
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
extern int
gs_job_fini(struct job_record *job_ptr)
{
struct gs_part *p_ptr;
debug3("sched/gang: entering gs_job_fini");
pthread_mutex_lock(&data_mutex);
p_ptr = _find_gs_part(job_ptr->partition);
if (!p_ptr) {
pthread_mutex_unlock(&data_mutex);
debug3("sched/gang: leaving gs_job_fini");
return SLURM_SUCCESS;
}
/*remove job from the partition */
_remove_job_from_part(job_ptr->job_id, p_ptr);
pthread_mutex_unlock(&data_mutex);
debug3("sched/gang: leaving gs_job_fini");
return SLURM_SUCCESS;
}
/* rebuild from scratch */ /* rebuild from scratch */
/* A reconfigure can affect this plugin in these ways: /* A reconfigure can affect this plugin in these ways:
* - partitions can be added or removed * - partitions can be added or removed
...@@ -1038,16 +1036,19 @@ gs_reconfig() ...@@ -1038,16 +1036,19 @@ gs_reconfig()
for (i = 0; i < p_ptr->num_jobs; i++) { for (i = 0; i < p_ptr->num_jobs; i++) {
job_ptr = find_job_record(p_ptr->job_list[i]->job_id); job_ptr = find_job_record(p_ptr->job_list[i]->job_id);
if (job_ptr == NULL) { if (job_ptr == NULL) {
/* job doesn't exist in SLURM, so skip it */ /* job no longer exists in SLURM, so drop it */
/* FIXME: should we resume it if necessary?? */
continue; continue;
} }
_add_job_to_part(newp_ptr, job_ptr->job_id, /* resume any job that is suspended */
job_ptr->node_bitmap); if (job_ptr->job_state == JOB_SUSPENDED)
/* if a job in p_ptr was GS_SUSPEND but is now _signal_job(job_ptr->job_id, GS_RESUME);
* GS_FILLER in new_ptr, then it needs to be resumed!
* FIXME!! /* transfer the job as long as it is still active */
*/ if (job_ptr->job_state == JOB_SUSPENDED ||
job_ptr->job_state == JOB_RUNNING) {
_add_job_to_part(newp_ptr, job_ptr->job_id,
job_ptr->node_bitmap);
}
} }
} }
...@@ -1088,7 +1089,7 @@ _cycle_job_list(struct gs_part *p_ptr) ...@@ -1088,7 +1089,7 @@ _cycle_job_list(struct gs_part *p_ptr)
int i, j; int i, j;
struct gs_job *j_ptr; struct gs_job *j_ptr;
debug3("_sched/gang: entering _cycle_job_list"); debug3("sched/gang: entering _cycle_job_list");
_print_jobs(p_ptr); _print_jobs(p_ptr);
/* re-prioritize the job_list and set all row_states to GS_NO_ACTIVE */ /* re-prioritize the job_list and set all row_states to GS_NO_ACTIVE */
for (i = 0; i < p_ptr->num_jobs; i++) { for (i = 0; i < p_ptr->num_jobs; i++) {
...@@ -1105,11 +1106,11 @@ _cycle_job_list(struct gs_part *p_ptr) ...@@ -1105,11 +1106,11 @@ _cycle_job_list(struct gs_part *p_ptr)
p_ptr->job_list[i]->row_state = GS_NO_ACTIVE; p_ptr->job_list[i]->row_state = GS_NO_ACTIVE;
} }
debug3("_sched/gang: _cycle_job_list reordered job list:"); debug3("sched/gang: _cycle_job_list reordered job list:");
_print_jobs(p_ptr); _print_jobs(p_ptr);
/* Rebuild the active row. */ /* Rebuild the active row. */
_build_active_row(p_ptr); _build_active_row(p_ptr);
debug3("_sched/gang: _cycle_job_list new active job list:"); debug3("sched/gang: _cycle_job_list new active job list:");
_print_jobs(p_ptr); _print_jobs(p_ptr);
/* Suspend running jobs that are GS_NO_ACTIVE */ /* Suspend running jobs that are GS_NO_ACTIVE */
...@@ -1133,7 +1134,7 @@ _cycle_job_list(struct gs_part *p_ptr) ...@@ -1133,7 +1134,7 @@ _cycle_job_list(struct gs_part *p_ptr)
p_ptr->job_list[i]->sig_state = GS_RESUME; p_ptr->job_list[i]->sig_state = GS_RESUME;
} }
} }
debug3("_sched/gang: leaving _cycle_job_list"); debug3("sched/gang: leaving _cycle_job_list");
} }
static void * static void *
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment