Skip to content
Snippets Groups Projects
Commit d2158351 authored by Moe Jette's avatar Moe Jette
Browse files

Major expansion of job start/termination logic. Change partition owner,

kill jobs, etc.
parent e5aef0aa
No related branches found
No related tags found
No related merge requests found
......@@ -42,17 +42,28 @@
#include "src/common/list.h"
#include "src/common/macros.h"
#include "src/common/node_select.h"
#include "src/common/uid.h"
#include "src/slurmctld/proc_req.h"
#include "bgl_job_run.h"
#include "bluegene.h"
#ifdef HAVE_BGL_FILES
#define MAX_POLL_RETRIES 30
#define MAX_PTHREAD_RETRIES 1
#define POLL_INTERVAL 2
typedef struct bgl_update {
bool start; /* true=start, false=terminate */
uid_t uid; /* new owner */
bool start; /* true=start, false=terminate */
uid_t uid; /* new owner */
uint32_t job_id; /* SLURM job id */
pm_partition_id_t bgl_part_id;
} bgl_update_t;
List bgl_update_list = NULL;
static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
static int agent_cnt = 0;
/* Delete a bgl_update_t record */
static void _bgl_list_del(void *x)
......@@ -65,9 +76,239 @@ static void _bgl_list_del(void *x)
}
}
/* Perform an operation upon a BGL block for starting or terminating a job */
static void _block_op(void)
/* Kill a job and remove its record from DB2 */
static int _remove_job(db_job_id_t job_id)
{
int i, rc;
rm_job_t *job_rec;
rm_job_state_t job_state;
debug("removing job %d from DB2", job_id);
for (i=0; i<MAX_POLL_RETRIES; i++) {
if (i > 0)
sleep(POLL_INTERVAL);
/* Find the job */
if ((rc = rm_get_job(job_id, &job_rec)) != STATUS_OK) {
if (rc == JOB_NOT_FOUND) {
debug("job %d removed from DB2", job_id);
rc = STATUS_OK;
} else
error("rm_get_job(%d) errno=%d", job_id, rc);
return rc;
}
if ((rc = rm_get_data(job_rec, RM_JobState, &job_state)) !=
STATUS_OK) {
if (rc == JOB_NOT_FOUND) {
debug("job %d removed from DB2", job_id);
rc = STATUS_OK;
} else
error("rm_get_data(RM_JobState) for jobid=%d "
"errno=%d", job_id, rc);
rm_free_job(job_rec);
return rc;
}
rm_free_job(job_rec);
/* Cancel or remove the job */
if (job_state == RM_JOB_RUNNING)
rc = jm_cancel_job(job_id);
else
rc = rm_remove_job(job_id);
if (rc != STATUS_OK) {
if (rc == JOB_NOT_FOUND) {
debug("job %d removed from DB2", job_id);
rc = STATUS_OK;
} else if (job_state == RM_JOB_RUNNING)
error("jm_cancel_job(%d) errno=%d", job_id, rc);
else
error("rm_remove_job(%d) errno=%d", job_id, rc);
return rc;
}
}
(void) rm_remove_job(job_id);
error("Failed to remove job %d from DB2", job_id);
return INTERNAL_ERROR;
}
/* Set the owner of an existing partition */
static int _set_part_owner(pm_partition_id_t bgl_part_id, char *user)
{
int i, rc;
rm_partition_t * part_elem;
rm_partition_state_t part_state;
if (user && user[0])
info("Setting partition %s owner to %s", bgl_part_id, user);
else
info("Clearing partition %s owner", bgl_part_id);
/* Make partition state FREE */
for (i=0; i<MAX_POLL_RETRIES; i++) {
if (i > 0)
sleep(POLL_INTERVAL);
/* find the partition */
if ((rc = rm_get_partition(bgl_part_id, &part_elem)) !=
STATUS_OK) {
error("rm_get_partition(%s) errno=%d", bgl_part_id, rc);
return rc;
}
/* find its state */
rc = rm_get_data(part_elem, RM_PartitionState, &part_state);
(void) rm_free_partition(part_elem);
if (rc != STATUS_OK) {
error("rm_get_data(RM_PartitionState) errno=%d", rc);
return rc;
}
if (part_state == RM_PARTITION_FREE)
break;
#ifdef USE_BGL_FILES
if ((i == 0)
&& ((rc = pm_destroy_partition(bgl_part_id)) != STATUS_OK)) {
error("pm_destroy_partition(%s) errno=%d",
bgl_part_id, rc);
return rc;
}
#else
break;
#endif
}
if (part_state != RM_PARTITION_FREE) {
error("Could not free partition %s", bgl_part_id);
return INTERNAL_ERROR;
}
#ifdef USE_BGL_FILES
if ((rc = rm_set_part_owner(bgl_part_id, user)) != STATUS_OK)
error("rm_set_part_owner(%s,%s) errno=%d", bgl_part_id, user);
#endif
return rc;
}
/* Perform job initiation work */
static void _start_agent(bgl_update_t *bgl_update_ptr)
{
if (_set_part_owner(bgl_update_ptr->bgl_part_id,
uid_to_string(bgl_update_ptr->uid)) != STATUS_OK) {
error("Could not change partition owner to start jobid=%u",
bgl_update_ptr->job_id);
sleep(1); /* wait for the slurmd to begin the
* batch script, this is a no-op if
* issued prior to the script initiation */
(void) slurm_fail_job(bgl_update_ptr->job_id);
}
}
/* Perform job termination work */
static void _term_agent(bgl_update_t *bgl_update_ptr)
{
int jobs, rc;
rm_job_list_t *job_list;
int live_states;
live_states = JOB_ALL_FLAG;// & (~RM_JOB_TERMINATED) & (~RM_JOB_KILLED);
if ((rc = rm_get_jobs(live_states, &job_list)) != STATUS_OK) {
error("rm_get_jobs() errno=%d", rc);
return;
}
if ((rc = rm_get_data(job_list, RM_JobListSize, &jobs)) != STATUS_OK) {
error("rm_get_data(RM_JobListSize) errno=%d", rc);
return;
}
/* FIXME: job count is bogus */
info("job count=%d", jobs);
#ifdef USE_BGL_FILES
for (i=0; i<jobs; i++) {
rm_elemment_t job_elem;
pm_partition_id_t part_id;
db_job_id_t job_id;
if (i) {
if ((rc = rm_get_data(job_list, RM_JobListNextPart,
&job_elem)) != STATUS_OK) {
error("rm_get_data(RM_JobListNextPart) "
"errno=%d", rc);
continue;
}
} else {
if ((rc = rm_get_data(job_list, RM_JobListFirstPart,
&job_elem)) != STATUS_OK) {
error("rm_get_data(RM_JobListFirstPart) "
"errno=%d", rc);
continue;
}
}
if ((rc = rm_get_data(job_elem, RM_JobPartitionID, &part_id)
!= STATUS_OK) {
error("rm_get_data(RM_JobPartitionID) errno=%d", rc);
continue;
}
if (strcmp(part_id, bgl_update_ptr->bgl_part_id) != 0)
continue;
if ((rc = rm_get_data(job_elem, RM_JobDBJobID, &job_id)
!= STATUS_OK) {
error("rm_get_data(RM_JobDBJobID) errno=%d", rc);
continue;
}
(void) _remove_job(job_id);
}
if ((rc = rm_free_job_list(job_list)) != STATUS_OK)
error("rm_free_job_list() errno=%d", rc);
#endif
/* Change the block's owner */
_set_part_owner(bgl_update_ptr->bgl_part_id, "");
}
/* Process requests off the bgl_update_list queue and exit when done */
static void *_part_agent(void *args)
{
bgl_update_t *bgl_update_ptr;
while (1) {
slurm_mutex_lock(&agent_cnt_mutex);
bgl_update_ptr = list_dequeue(bgl_update_list);
if (!bgl_update_ptr) {
agent_cnt = 0;
slurm_mutex_unlock(&agent_cnt_mutex);
return NULL;
}
slurm_mutex_unlock(&agent_cnt_mutex);
if (bgl_update_ptr->start)
_start_agent(bgl_update_ptr);
else
_term_agent(bgl_update_ptr);
_bgl_list_del(bgl_update_ptr);
}
}
/* Perform an operation upon a BGL partition (block) for starting or
* terminating a job */
static void _part_op(bgl_update_t *bgl_update_ptr)
{
slurm_mutex_lock(&agent_cnt_mutex);
if (list_enqueue(bgl_update_list, bgl_update_ptr) == NULL)
fatal("malloc failure in _part_op/list_enqueue");
if (agent_cnt == 0) {
/* spawn an agent */
pthread_attr_t attr_agent;
pthread_t thread_agent;
int retries = 0;
agent_cnt = 1;
slurm_attr_init(&attr_agent);
while (pthread_create(&thread_agent, &attr_agent, _part_agent,
NULL)) {
error("pthread_create error %m");
if (++retries > MAX_PTHREAD_RETRIES)
fatal("Can't create pthread");
sleep(1); /* sleep and retry */
}
}
slurm_mutex_unlock(&agent_cnt_mutex);
}
#endif
......@@ -89,7 +330,7 @@ extern int start_job(struct job_record *job_ptr)
select_g_get_jobinfo(job_ptr->select_jobinfo,
SELECT_DATA_PART_ID, &bgl_part_id);
info("Starting job %u in BGL partition %s",
info("Queue start of job %u in BGL partition %s",
job_ptr->job_id, bgl_part_id);
if ((bgl_update_list == NULL)
......@@ -101,12 +342,9 @@ extern int start_job(struct job_record *job_ptr)
bgl_update_ptr = xmalloc(sizeof(bgl_update_t));
bgl_update_ptr->start = true;
bgl_update_ptr->uid = job_ptr->user_id;
bgl_update_ptr->job_id = job_ptr->job_id;
bgl_update_ptr->bgl_part_id = bgl_part_id;
if (list_push(bgl_update_list, bgl_update_ptr) == NULL) {
fatal("malloc failure in start_job/list_push");
return SLURM_ERROR;
}
_part_op(bgl_update_ptr);
#endif
return rc;
}
......@@ -130,24 +368,15 @@ extern int term_job(struct job_record *job_ptr)
/* Identify the BGL block */
select_g_get_jobinfo(job_ptr->select_jobinfo,
SELECT_DATA_PART_ID, &bgl_part_id);
info("Terminating job %u in BGL partition %s",
info("Queue termination of job %u in BGL partition %s",
job_ptr->job_id, bgl_part_id);
bgl_update_ptr = xmalloc(sizeof(bgl_update_t));
bgl_update_ptr->start = false;
bgl_update_ptr->uid = job_ptr->user_id;
bgl_update_ptr->job_id = job_ptr->job_id;
bgl_update_ptr->bgl_part_id = bgl_part_id;
if (list_push(bgl_update_list, bgl_update_ptr) == NULL) {
fatal("malloc failure in start_job/list_push");
return SLURM_ERROR;
}
/* Find and kill any jobs in this block */
/* Wait for termination of all jobs */
/* Change the block's owner */
_part_op(bgl_update_ptr);
#endif
return rc;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment