Skip to content
Snippets Groups Projects
Commit 2fb0357d authored by Moe Jette's avatar Moe Jette
Browse files

major change in gang's data structures, make better use of list.c infrastructure for

cleaner and tighter code
parent 1259a2a9
No related branches found
No related tags found
No related merge requests found
...@@ -144,13 +144,11 @@ static uint32_t timeslicer_seconds = 0; ...@@ -144,13 +144,11 @@ static uint32_t timeslicer_seconds = 0;
static uint16_t gr_type = GS_NODE; static uint16_t gr_type = GS_NODE;
static uint32_t gs_debug_flags = 0; static uint32_t gs_debug_flags = 0;
static uint16_t gs_fast_schedule = 0; static uint16_t gs_fast_schedule = 0;
static struct gs_part *gs_part_list = NULL; static List gs_part_list = NULL;
static uint32_t default_job_list_size = 64; static uint32_t default_job_list_size = 64;
static pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER;
static uint16_t *gs_bits_per_node = NULL; static uint16_t *gs_bits_per_node = NULL;
static struct gs_part **gs_part_sorted = NULL;
static uint32_t num_sorted_part = 0; static uint32_t num_sorted_part = 0;
/* function declarations */ /* function declarations */
...@@ -294,31 +292,19 @@ static uint16_t _get_socket_cnt(int node_index) ...@@ -294,31 +292,19 @@ static uint16_t _get_socket_cnt(int node_index)
return node_ptr->sockets; return node_ptr->sockets;
} }
/* The gs_part_list is a single large array of gs_part entities. static void _destroy_parts(void *x)
* To destroy it, step down the array and destroy the pieces of
* each gs_part entity, and then delete the whole array.
* To destroy a gs_part entity, you need to delete the name, the
* list of jobs, the shadow list, and the active_resmap.
*/
static void _destroy_parts(void)
{ {
int i; int i;
struct gs_part *tmp, *ptr = gs_part_list; struct gs_part *gs_part_ptr = (struct gs_part *) x;
while (ptr) { xfree(gs_part_ptr->part_name);
tmp = ptr; for (i = 0; i < gs_part_ptr->num_jobs; i++)
ptr = ptr->next; xfree(gs_part_ptr->job_list[i]);
xfree(gs_part_ptr->shadow);
xfree(tmp->part_name); FREE_NULL_BITMAP(gs_part_ptr->active_resmap);
for (i = 0; i < tmp->num_jobs; i++) { xfree(gs_part_ptr->active_cpus);
xfree(tmp->job_list[i]); xfree(gs_part_ptr->job_list);
} xfree(gs_part_ptr);
xfree(tmp->shadow);
FREE_NULL_BITMAP(tmp->active_resmap);
xfree(tmp->active_cpus);
xfree(tmp->job_list);
}
xfree(gs_part_list);
} }
/* Build the gs_part_list. The job_list will be created later, /* Build the gs_part_list. The job_list will be created later,
...@@ -327,45 +313,44 @@ static void _build_parts(void) ...@@ -327,45 +313,44 @@ static void _build_parts(void)
{ {
ListIterator part_iterator; ListIterator part_iterator;
struct part_record *p_ptr; struct part_record *p_ptr;
int i, num_parts; struct gs_part *gs_part_ptr;
int num_parts;
if (gs_part_list) if (gs_part_list) {
_destroy_parts(); list_destroy(gs_part_list);
gs_part_list = NULL;
}
/* reset the sorted list, since it's currently /* reset the sorted list, since it's currently
* pointing to partitions we just destroyed */ * pointing to partitions we just destroyed */
num_sorted_part = 0; num_sorted_part = 0;
num_parts = list_count(part_list); num_parts = list_count(part_list);
if (!num_parts) if (num_parts == 0)
return; return;
gs_part_list = list_create(_destroy_parts);
part_iterator = list_iterator_create(part_list); part_iterator = list_iterator_create(part_list);
if (part_iterator == NULL) if (part_iterator == NULL)
fatal ("memory allocation failure"); fatal ("memory allocation failure");
gs_part_list = xmalloc(num_parts * sizeof(struct gs_part));
i = -1;
while ((p_ptr = (struct part_record *) list_next(part_iterator))) { while ((p_ptr = (struct part_record *) list_next(part_iterator))) {
i++; gs_part_ptr = xmalloc(sizeof(struct gs_part));
gs_part_list[i].part_name = xstrdup(p_ptr->name); gs_part_ptr->part_name = xstrdup(p_ptr->name);
gs_part_list[i].priority = p_ptr->priority; gs_part_ptr->priority = p_ptr->priority;
/* everything else is already set to zero/NULL */ /* everything else is already set to zero/NULL */
gs_part_list[i].next = &(gs_part_list[i+1]); list_append(gs_part_list, gs_part_ptr);
} }
gs_part_list[i].next = NULL;
list_iterator_destroy(part_iterator); list_iterator_destroy(part_iterator);
} }
/* Find the gs_part entity with the given name */ /* Find the gs_part entity with the given name */
static struct gs_part *_find_gs_part(char *name) static int _find_gs_part(void *x, void *key)
{ {
struct gs_part *p_ptr = gs_part_list; struct gs_part *gs_part_ptr = (struct gs_part *) x;
for (; p_ptr; p_ptr = p_ptr->next) { char *name = (char *) key;
if (strcmp(name, p_ptr->part_name) == 0) if (!strcmp(name, gs_part_ptr->part_name))
return p_ptr; return 1;
} return 0;
return NULL;
} }
/* Find the job_list index of the given job_id in the given partition */ /* Find the job_list index of the given job_id in the given partition */
...@@ -639,54 +624,27 @@ static void _preempt_job_dequeue(void) ...@@ -639,54 +624,27 @@ static void _preempt_job_dequeue(void)
return; return;
} }
/* construct gs_part_sorted as a sorted list of the current partitions */ /* This is the reverse order defined by list.h so to generated a list in
static void _sort_partitions(void) * descending order rather than ascending order */
static int _sort_partitions(void *part1, void *part2)
{ {
struct gs_part *p_ptr; int prio1 = ((struct gs_part *) part1)->priority;
int i, j, size = 0; int prio2 = ((struct gs_part *) part2)->priority;
return prio1 - prio2;
/* sort all partitions by priority */
for (p_ptr = gs_part_list; p_ptr; p_ptr = p_ptr->next, size++);
/* sorted array is new, or number of partitions has changed */
if (size != num_sorted_part) {
xfree(gs_part_sorted);
gs_part_sorted = xmalloc(size * sizeof(struct gs_part *));
num_sorted_part = size;
/* load the array */
i = 0;
for (p_ptr = gs_part_list; p_ptr; p_ptr = p_ptr->next)
gs_part_sorted[i++] = p_ptr;
}
if (size <= 1) {
gs_part_sorted[0] = gs_part_list;
return;
}
/* sort array (new array or priorities may have changed) */
for (j = 0; j < size; j++) {
for (i = j+1; i < size; i++) {
if (gs_part_sorted[i]->priority >
gs_part_sorted[j]->priority) {
struct gs_part *tmp_ptr;
tmp_ptr = gs_part_sorted[j];
gs_part_sorted[j] = gs_part_sorted[i];
gs_part_sorted[i] = tmp_ptr;
}
}
}
} }
/* Scan the partition list. Add the given job as a "shadow" to every /* Scan the partition list. Add the given job as a "shadow" to every
* partition with a lower priority than the given partition */ * partition with a lower priority than the given partition */
static void _cast_shadow(struct gs_job *j_ptr, uint16_t priority) static void _cast_shadow(struct gs_job *j_ptr, uint16_t priority)
{ {
ListIterator part_iterator;
struct gs_part *p_ptr; struct gs_part *p_ptr;
int i; int i;
for (p_ptr = gs_part_list; p_ptr; p_ptr = p_ptr->next) { part_iterator = list_iterator_create(gs_part_list);
if (part_iterator == NULL)
fatal("memory allocation failure");
while ((p_ptr = (struct gs_part *) list_next(part_iterator))) {
if (p_ptr->priority >= priority) if (p_ptr->priority >= priority)
continue; continue;
...@@ -714,17 +672,21 @@ static void _cast_shadow(struct gs_job *j_ptr, uint16_t priority) ...@@ -714,17 +672,21 @@ static void _cast_shadow(struct gs_job *j_ptr, uint16_t priority)
} }
p_ptr->shadow[p_ptr->num_shadows++] = j_ptr; p_ptr->shadow[p_ptr->num_shadows++] = j_ptr;
} }
list_iterator_destroy(part_iterator);
} }
/* Remove the given job as a "shadow" from all partitions */ /* Remove the given job as a "shadow" from all partitions */
static void _clear_shadow(struct gs_job *j_ptr) static void _clear_shadow(struct gs_job *j_ptr)
{ {
ListIterator part_iterator;
struct gs_part *p_ptr; struct gs_part *p_ptr;
int i; int i;
for (p_ptr = gs_part_list; p_ptr; p_ptr = p_ptr->next) { part_iterator = list_iterator_create(gs_part_list);
if (part_iterator == NULL)
fatal("memory allocation failure");
while ((p_ptr = (struct gs_part *) list_next(part_iterator))) {
if (!p_ptr->shadow) if (!p_ptr->shadow)
continue; continue;
...@@ -743,6 +705,7 @@ static void _clear_shadow(struct gs_job *j_ptr) ...@@ -743,6 +705,7 @@ static void _clear_shadow(struct gs_job *j_ptr)
p_ptr->shadow[i] = p_ptr->shadow[i+1]; p_ptr->shadow[i] = p_ptr->shadow[i+1];
p_ptr->shadow[p_ptr->num_shadows] = NULL; p_ptr->shadow[p_ptr->num_shadows] = NULL;
} }
list_iterator_destroy(part_iterator);
} }
...@@ -842,16 +805,20 @@ static void _update_active_row(struct gs_part *p_ptr, int add_new_jobs) ...@@ -842,16 +805,20 @@ static void _update_active_row(struct gs_part *p_ptr, int add_new_jobs)
*/ */
static void _update_all_active_rows(void) static void _update_all_active_rows(void)
{ {
int i; ListIterator part_iterator;
struct gs_part *p_ptr;
/* Sort the partitions. This way the shadows of any high-priority /* Sort the partitions. This way the shadows of any high-priority
* jobs are appropriately adjusted before the lower priority * jobs are appropriately adjusted before the lower priority
* partitions are updated */ * partitions are updated */
_sort_partitions(); list_sort(gs_part_list, _sort_partitions);
for (i = 0; i < num_sorted_part; i++) { part_iterator = list_iterator_create(gs_part_list);
_update_active_row(gs_part_sorted[i], 1); if (part_iterator == NULL)
} fatal("memory allocation failure");
while ((p_ptr = (struct gs_part *) list_next(part_iterator)))
_update_active_row(p_ptr, 1);
list_iterator_destroy(part_iterator);
} }
/* remove the given job from the given partition /* remove the given job from the given partition
...@@ -1028,7 +995,8 @@ static void _scan_slurm_job_list(void) ...@@ -1028,7 +995,8 @@ static void _scan_slurm_job_list(void)
if (IS_JOB_SUSPENDED(job_ptr) || IS_JOB_RUNNING(job_ptr)) { if (IS_JOB_SUSPENDED(job_ptr) || IS_JOB_RUNNING(job_ptr)) {
/* are we tracking this job already? */ /* are we tracking this job already? */
p_ptr = _find_gs_part(job_ptr->partition); p_ptr = list_find_first(gs_part_list, _find_gs_part,
job_ptr->partition);
if (!p_ptr) /* no partition */ if (!p_ptr) /* no partition */
continue; continue;
i = _find_job_index(p_ptr, job_ptr->job_id); i = _find_job_index(p_ptr, job_ptr->job_id);
...@@ -1058,7 +1026,8 @@ static void _scan_slurm_job_list(void) ...@@ -1058,7 +1026,8 @@ static void _scan_slurm_job_list(void)
/* if the job is not pending, suspended, or running, then /* if the job is not pending, suspended, or running, then
* it's completing or completed. Make sure we've released * it's completing or completed. Make sure we've released
* this job */ * this job */
p_ptr = _find_gs_part(job_ptr->partition); p_ptr = list_find_first(gs_part_list, _find_gs_part,
job_ptr->partition);
if (!p_ptr) /* no partition */ if (!p_ptr) /* no partition */
continue; continue;
_remove_job_from_part(job_ptr->job_id, p_ptr, false); _remove_job_from_part(job_ptr->job_id, p_ptr, false);
...@@ -1165,9 +1134,8 @@ extern int gs_fini(void) ...@@ -1165,9 +1134,8 @@ extern int gs_fini(void)
list_destroy(preempt_job_list); list_destroy(preempt_job_list);
pthread_mutex_lock(&data_mutex); pthread_mutex_lock(&data_mutex);
_destroy_parts(); list_destroy(gs_part_list);
xfree(gs_part_sorted); gs_part_list = NULL;
gs_part_sorted = NULL;
xfree(gs_bits_per_node); xfree(gs_bits_per_node);
pthread_mutex_unlock(&data_mutex); pthread_mutex_unlock(&data_mutex);
if (gs_debug_flags & DEBUG_FLAG_GANG) if (gs_debug_flags & DEBUG_FLAG_GANG)
...@@ -1186,7 +1154,8 @@ extern int gs_job_start(struct job_record *job_ptr) ...@@ -1186,7 +1154,8 @@ extern int gs_job_start(struct job_record *job_ptr)
info("gang: entering gs_job_start for job %u", job_ptr->job_id); info("gang: entering gs_job_start for job %u", job_ptr->job_id);
/* add job to partition */ /* add job to partition */
pthread_mutex_lock(&data_mutex); pthread_mutex_lock(&data_mutex);
p_ptr = _find_gs_part(job_ptr->partition); p_ptr = list_find_first(gs_part_list, _find_gs_part,
job_ptr->partition);
if (p_ptr) { if (p_ptr) {
job_state = _add_job_to_part(p_ptr, job_ptr); job_state = _add_job_to_part(p_ptr, job_ptr);
/* if this job is running then check for preemption */ /* if this job is running then check for preemption */
...@@ -1255,7 +1224,8 @@ extern int gs_job_fini(struct job_record *job_ptr) ...@@ -1255,7 +1224,8 @@ extern int gs_job_fini(struct job_record *job_ptr)
if (gs_debug_flags & DEBUG_FLAG_GANG) if (gs_debug_flags & DEBUG_FLAG_GANG)
info("gang: entering gs_job_fini for job %u", job_ptr->job_id); info("gang: entering gs_job_fini for job %u", job_ptr->job_id);
pthread_mutex_lock(&data_mutex); pthread_mutex_lock(&data_mutex);
p_ptr = _find_gs_part(job_ptr->partition); p_ptr = list_find_first(gs_part_list, _find_gs_part,
job_ptr->partition);
if (!p_ptr) { if (!p_ptr) {
pthread_mutex_unlock(&data_mutex); pthread_mutex_unlock(&data_mutex);
if (gs_debug_flags & DEBUG_FLAG_GANG) if (gs_debug_flags & DEBUG_FLAG_GANG)
...@@ -1299,7 +1269,9 @@ extern int gs_job_fini(struct job_record *job_ptr) ...@@ -1299,7 +1269,9 @@ extern int gs_job_fini(struct job_record *job_ptr)
extern int gs_reconfig(void) extern int gs_reconfig(void)
{ {
int i; int i;
struct gs_part *p_ptr, *old_part_list, *newp_ptr; ListIterator part_iterator;
struct gs_part *p_ptr, *newp_ptr;
List old_part_list;
struct job_record *job_ptr; struct job_record *job_ptr;
if (!timeslicer_thread_id) { if (!timeslicer_thread_id) {
...@@ -1323,8 +1295,13 @@ extern int gs_reconfig(void) ...@@ -1323,8 +1295,13 @@ extern int gs_reconfig(void)
_build_parts(); _build_parts();
/* scan the old part list and add existing jobs to the new list */ /* scan the old part list and add existing jobs to the new list */
for (p_ptr = old_part_list; p_ptr; p_ptr = p_ptr->next) { part_iterator = list_iterator_create(old_part_list);
newp_ptr = _find_gs_part(p_ptr->part_name); if (part_iterator == NULL)
fatal ("memory allocation failure");
while ((p_ptr = (struct gs_part *) list_next(part_iterator))) {
newp_ptr = (struct gs_part *) list_find_first(gs_part_list,
_find_gs_part,
p_ptr->part_name);
if (!newp_ptr) { if (!newp_ptr) {
/* this partition was removed, so resume /* this partition was removed, so resume
* any suspended jobs and continue */ * any suspended jobs and continue */
...@@ -1378,17 +1355,13 @@ extern int gs_reconfig(void) ...@@ -1378,17 +1355,13 @@ extern int gs_reconfig(void)
} }
} }
} }
list_iterator_destroy(part_iterator);
/* confirm all jobs. Scan the master job_list and confirm that we /* confirm all jobs. Scan the master job_list and confirm that we
* are tracking all jobs */ * are tracking all jobs */
_scan_slurm_job_list(); _scan_slurm_job_list();
/* Finally, destroy the old data */ list_destroy(old_part_list);
p_ptr = gs_part_list;
gs_part_list = old_part_list;
_destroy_parts();
gs_part_list = p_ptr;
pthread_mutex_unlock(&data_mutex); pthread_mutex_unlock(&data_mutex);
_preempt_job_dequeue(); /* MUST BE OUTSIDE OF data_mutex lock */ _preempt_job_dequeue(); /* MUST BE OUTSIDE OF data_mutex lock */
...@@ -1528,8 +1501,8 @@ static void *_timeslicer_thread(void *arg) ...@@ -1528,8 +1501,8 @@ static void *_timeslicer_thread(void *arg)
/* Write locks on job and read lock on nodes */ /* Write locks on job and read lock on nodes */
slurmctld_lock_t job_write_lock = { slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK }; NO_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
ListIterator part_iterator;
struct gs_part *p_ptr; struct gs_part *p_ptr;
int i;
if (gs_debug_flags & DEBUG_FLAG_GANG) if (gs_debug_flags & DEBUG_FLAG_GANG)
info("gang: starting timeslicer loop"); info("gang: starting timeslicer loop");
...@@ -1540,22 +1513,26 @@ static void *_timeslicer_thread(void *arg) ...@@ -1540,22 +1513,26 @@ static void *_timeslicer_thread(void *arg)
lock_slurmctld(job_write_lock); lock_slurmctld(job_write_lock);
pthread_mutex_lock(&data_mutex); pthread_mutex_lock(&data_mutex);
_sort_partitions(); list_sort(gs_part_list, _sort_partitions);
/* scan each partition... */ /* scan each partition... */
if (gs_debug_flags & DEBUG_FLAG_GANG) if (gs_debug_flags & DEBUG_FLAG_GANG)
info("gang: _timeslicer_thread: scanning partitions"); info("gang: _timeslicer_thread: scanning partitions");
for (i = 0; i < num_sorted_part; i++) { part_iterator = list_iterator_create(gs_part_list);
p_ptr = gs_part_sorted[i]; if (part_iterator == NULL)
fatal("memory allocation failure");
while ((p_ptr = (struct gs_part *) list_next(part_iterator))) {
if (gs_debug_flags & DEBUG_FLAG_GANG) { if (gs_debug_flags & DEBUG_FLAG_GANG) {
info("gang: _timeslicer_thread: part %s: " info("gang: _timeslicer_thread: part %s: "
"run %u total %u", p_ptr->part_name, "run %u total %u", p_ptr->part_name,
p_ptr->jobs_active, p_ptr->num_jobs); p_ptr->jobs_active, p_ptr->num_jobs);
} }
if (p_ptr->jobs_active < if (p_ptr->jobs_active <
p_ptr->num_jobs + p_ptr->num_shadows) (p_ptr->num_jobs + p_ptr->num_shadows)) {
_cycle_job_list(p_ptr); _cycle_job_list(p_ptr);
}
} }
list_iterator_destroy(part_iterator);
pthread_mutex_unlock(&data_mutex); pthread_mutex_unlock(&data_mutex);
/* Preempt jobs that were formerly only suspended */ /* Preempt jobs that were formerly only suspended */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment