From 5fdc44e7d9e1ed85282a38a018b790d76b2eba6a Mon Sep 17 00:00:00 2001 From: David Bigagli <david@schedmd.com> Date: Tue, 12 Nov 2013 11:08:16 -0800 Subject: [PATCH] Job array requeue. --- src/scontrol/update_job.c | 202 ++++++++++++++++++++++++++------------ src/slurmctld/proc_req.c | 11 +++ 2 files changed, 149 insertions(+), 64 deletions(-) diff --git a/src/scontrol/update_job.c b/src/scontrol/update_job.c index d807fbacabd..2337f32908a 100644 --- a/src/scontrol/update_job.c +++ b/src/scontrol/update_job.c @@ -47,8 +47,10 @@ static int _parse_restart_args(int argc, char **argv, uint16_t *stick, char **image_dir); static void _update_job_size(uint32_t job_id); static int _parse_requeue_flags(char *, uint32_t *state_flags); -static inline bool is_job_array(const char *); -static uint32_t get_array_job_id(const char *); +static inline bool _is_array_task_id(const char *jobid); +static job_info_msg_t *_get_job_info(const char *jobid, uint32_t *task_id); +static uint32_t *_get_job_ids(const char *jobid, uint32_t *num_ids); + /* * scontrol_checkpoint - perform some checkpoint/resume operation * IN op - checkpoint operation @@ -343,31 +345,32 @@ extern int scontrol_requeue(int argc, char **argv) { int rc = SLURM_SUCCESS; - uint32_t job_id = 0; - char *next_str; + int i; + uint32_t *ids; + uint32_t num_ids; if (! argv[0]) { exit_code = 1; return 0; } - if (is_job_array(argv[0])) { - job_id = get_array_job_id(argv[0]); - if (job_id == NO_VAL) { - fprintf(stderr, "Invalid array job id specified\n"); - exit_code = 1; - return 0; - } - } else { - job_id = (uint32_t)strtol(argv[0], &next_str, 10); - if (next_str[0] != '\0') { - fprintf(stderr, "Invalid job id specified\n"); + ids = _get_job_ids(argv[0], &num_ids); + if (ids == NULL) { + exit_code = 1; + return 0; + } + + for (i = 0; i < num_ids; i++) { + rc = slurm_requeue(ids[i], 0); + if (rc != SLURM_SUCCESS) { + fprintf(stderr, "%s array job_id %u\n", + slurm_strerror(slurm_get_errno()), ids[i]); exit_code = 1; - return 0; + break; } } - rc = slurm_requeue(job_id, 0); + xfree(ids); return rc; } @@ -376,10 +379,11 @@ extern int scontrol_requeue_hold(int argc, char **argv) { int rc = SLURM_SUCCESS; - uint32_t job_id = 0; - char *next_str; - char *job_id_str; + int i; uint32_t state_flag; + uint32_t *ids; + uint32_t num_ids; + char *job_id_str; state_flag = 0; @@ -388,20 +392,10 @@ scontrol_requeue_hold(int argc, char **argv) else job_id_str = argv[1]; - if (is_job_array(job_id_str)) { - job_id = get_array_job_id(job_id_str); - if (job_id == NO_VAL) { - fprintf(stderr, "Invalid array job id specified\n"); - exit_code = 1; - return 0; - } - } else { - job_id = (uint32_t)strtol(job_id_str, &next_str, 10); - if (next_str[0] != '\0') { - fprintf(stderr, "Invalid job id specified\n"); - exit_code = 1; - return 0; - } + ids = _get_job_ids(job_id_str, &num_ids); + if (ids == NULL) { + exit_code = 1; + return 0; } if (argc == 2) { @@ -409,6 +403,7 @@ scontrol_requeue_hold(int argc, char **argv) if (rc < 0) { error("Invalid state specification %s", argv[0]); exit_code = 1; + xfree(ids); return 0; } } @@ -417,7 +412,17 @@ scontrol_requeue_hold(int argc, char **argv) /* Go and requeue the state either in * JOB_SPECIAL_EXIT or HELD state. */ - rc = slurm_requeue(job_id, state_flag); + for (i = 0; i < num_ids; i++) { + rc = slurm_requeue(ids[i], state_flag); + if (rc != SLURM_SUCCESS) { + fprintf(stderr, "%s array job_id %u\n", + slurm_strerror(slurm_get_errno()), ids[i]); + exit_code = 1; + break; + } + } + + xfree(ids); return rc; } @@ -1032,7 +1037,7 @@ _parse_requeue_flags(char *s, uint32_t *state) * Detect the _ jobid separator. */ static inline bool -is_job_array(const char *jobid) +_is_array_task_id(const char *jobid) { int cc; @@ -1042,58 +1047,127 @@ is_job_array(const char *jobid) ++cc; ++jobid; } + if (cc == 1) return true; + return false; } -/* get_array_job_id() +/* _get_job_info() */ -static uint32_t -get_array_job_id(const char *jobid) +static job_info_msg_t * +_get_job_info(const char *jobid, uint32_t *task_id) { - char job_id[64]; + char buf[64]; char *taskid; char *next_str; - int ntaskid; - int njobid; + uint32_t job_id; int cc; - int ujobid; job_info_msg_t *job_info; if (strlen(jobid) > 63) - return NO_VAL; + return NULL; + + strcpy(buf, jobid); + + taskid = strchr(buf, '_'); + if (taskid) { + + *taskid = 0; + ++taskid; + + *task_id = (uint32_t)strtol(taskid, &next_str, 10); + if (next_str[0] != '\0') { + fprintf(stderr, "Invalid task_id specified\n"); + return NULL; + } + } + + job_id = (uint32_t)strtol(buf, &next_str, 10); + if (next_str[0] != '\0') { + fprintf(stderr, "Invalid job_id specified\n"); + return NULL; + } + + cc = slurm_load_job(&job_info, job_id, SHOW_ALL); + if (cc < 0) { + slurm_perror("slurm_load_job"); + return NULL; + } + + return job_info; +} - strcpy(job_id, jobid); +/* _get_job_ids() + */ +static uint32_t * +_get_job_ids(const char *jobid, uint32_t *num_ids) +{ + job_info_msg_t *job_info; + uint32_t *job_ids; + uint32_t task_id; + int i; + int cc; - taskid = strchr(job_id, '_'); - if (taskid == NULL) - return NO_VAL; + job_info = _get_job_info(jobid, &task_id); + if (job_info == NULL) + return NULL; - *taskid = 0; - ++taskid; + if (_is_array_task_id(jobid)) { - ntaskid = (uint32_t)strtol(taskid, &next_str, 10); - if (next_str[0] != '\0') - return NO_VAL; + job_ids = xmalloc(sizeof(uint32_t)); + *num_ids = 1; - njobid = (uint32_t)strtol(job_id, &next_str, 10); - if (next_str[0] != '\0') - return NO_VAL; + /* Search for the job_id of the specified + * task. + */ + for (cc = 0; cc < job_info->record_count; cc++) { + if (task_id == job_info->job_array[cc].array_task_id) { + job_ids[0] = job_info->job_array[cc].job_id; + break; + } + } - cc = slurm_load_job(&job_info, njobid, SHOW_ALL); - if (cc < 0) - return NO_VAL; + slurm_free_job_info_msg(job_info); + return job_ids; + } - ujobid = -1; + if (job_info->record_count == 1) { + /* No task elements beside the + * job itself so it cannot be + * a job array. + */ + job_ids = xmalloc(sizeof(uint32_t)); + *num_ids = 1; + job_ids[0] = job_info->job_array[0].job_id; + slurm_free_job_info_msg(job_info); + + return job_ids; + } + + *num_ids = job_info->record_count; + job_ids = xmalloc((*num_ids) * sizeof(uint32_t)); + /* First save the pending jobs + */ + i = 0; for (cc = 0; cc < job_info->record_count; cc++) { - if (ntaskid == job_info->job_array[cc].array_task_id) { - ujobid = job_info->job_array[cc].job_id; - break; + if (job_info->job_array[cc].job_state == JOB_PENDING) { + job_ids[i] = job_info->job_array[cc].job_id; + ++i; + } + } + /* then the rest of the states + */ + for (cc = 0; cc < job_info->record_count; cc++) { + if (job_info->job_array[cc].job_state != JOB_PENDING) { + job_ids[i] = job_info->job_array[cc].job_id; + ++i; } } + xassert(i == *num_ids); slurm_free_job_info_msg(job_info); - return ujobid; + return job_ids; } diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 0844bca3261..7c0dc3b5bae 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -3658,8 +3658,19 @@ inline static void _slurm_rpc_requeue(slurm_msg_t * msg) job_ptr = find_job_record(req_ptr->job_id); if (job_ptr == NULL) { + slurm_msg_t resp_msg; + return_code_msg_t rc_msg; + info("%s: %u: %s", __func__, req_ptr->job_id, slurm_strerror(ESLURM_INVALID_JOB_ID)); + + slurm_msg_t_init(&resp_msg); + resp_msg.protocol_version = msg->protocol_version; + resp_msg.msg_type = RESPONSE_SLURM_RC; + rc_msg.return_code = ESLURM_INVALID_JOB_ID; + resp_msg.data = &rc_msg; + slurm_send_node_msg(msg->conn_fd, &resp_msg); + return; } -- GitLab