Skip to content
Snippets Groups Projects
Commit 5fdc44e7 authored by David Bigagli's avatar David Bigagli
Browse files

Job array requeue.

parent 6b210cc8
No related branches found
No related tags found
No related merge requests found
...@@ -47,8 +47,10 @@ static int _parse_restart_args(int argc, char **argv, ...@@ -47,8 +47,10 @@ static int _parse_restart_args(int argc, char **argv,
uint16_t *stick, char **image_dir); uint16_t *stick, char **image_dir);
static void _update_job_size(uint32_t job_id); static void _update_job_size(uint32_t job_id);
static int _parse_requeue_flags(char *, uint32_t *state_flags); static int _parse_requeue_flags(char *, uint32_t *state_flags);
static inline bool is_job_array(const char *); static inline bool _is_array_task_id(const char *jobid);
static uint32_t get_array_job_id(const char *); static job_info_msg_t *_get_job_info(const char *jobid, uint32_t *task_id);
static uint32_t *_get_job_ids(const char *jobid, uint32_t *num_ids);
/* /*
* scontrol_checkpoint - perform some checkpoint/resume operation * scontrol_checkpoint - perform some checkpoint/resume operation
* IN op - checkpoint operation * IN op - checkpoint operation
...@@ -343,31 +345,32 @@ extern int ...@@ -343,31 +345,32 @@ extern int
scontrol_requeue(int argc, char **argv) scontrol_requeue(int argc, char **argv)
{ {
int rc = SLURM_SUCCESS; int rc = SLURM_SUCCESS;
uint32_t job_id = 0; int i;
char *next_str; uint32_t *ids;
uint32_t num_ids;
if (! argv[0]) { if (! argv[0]) {
exit_code = 1; exit_code = 1;
return 0; return 0;
} }
if (is_job_array(argv[0])) { ids = _get_job_ids(argv[0], &num_ids);
job_id = get_array_job_id(argv[0]); if (ids == NULL) {
if (job_id == NO_VAL) { exit_code = 1;
fprintf(stderr, "Invalid array job id specified\n"); return 0;
exit_code = 1; }
return 0;
} for (i = 0; i < num_ids; i++) {
} else { rc = slurm_requeue(ids[i], 0);
job_id = (uint32_t)strtol(argv[0], &next_str, 10); if (rc != SLURM_SUCCESS) {
if (next_str[0] != '\0') { fprintf(stderr, "%s array job_id %u\n",
fprintf(stderr, "Invalid job id specified\n"); slurm_strerror(slurm_get_errno()), ids[i]);
exit_code = 1; exit_code = 1;
return 0; break;
} }
} }
rc = slurm_requeue(job_id, 0); xfree(ids);
return rc; return rc;
} }
...@@ -376,10 +379,11 @@ extern int ...@@ -376,10 +379,11 @@ extern int
scontrol_requeue_hold(int argc, char **argv) scontrol_requeue_hold(int argc, char **argv)
{ {
int rc = SLURM_SUCCESS; int rc = SLURM_SUCCESS;
uint32_t job_id = 0; int i;
char *next_str;
char *job_id_str;
uint32_t state_flag; uint32_t state_flag;
uint32_t *ids;
uint32_t num_ids;
char *job_id_str;
state_flag = 0; state_flag = 0;
...@@ -388,20 +392,10 @@ scontrol_requeue_hold(int argc, char **argv) ...@@ -388,20 +392,10 @@ scontrol_requeue_hold(int argc, char **argv)
else else
job_id_str = argv[1]; job_id_str = argv[1];
if (is_job_array(job_id_str)) { ids = _get_job_ids(job_id_str, &num_ids);
job_id = get_array_job_id(job_id_str); if (ids == NULL) {
if (job_id == NO_VAL) { exit_code = 1;
fprintf(stderr, "Invalid array job id specified\n"); return 0;
exit_code = 1;
return 0;
}
} else {
job_id = (uint32_t)strtol(job_id_str, &next_str, 10);
if (next_str[0] != '\0') {
fprintf(stderr, "Invalid job id specified\n");
exit_code = 1;
return 0;
}
} }
if (argc == 2) { if (argc == 2) {
...@@ -409,6 +403,7 @@ scontrol_requeue_hold(int argc, char **argv) ...@@ -409,6 +403,7 @@ scontrol_requeue_hold(int argc, char **argv)
if (rc < 0) { if (rc < 0) {
error("Invalid state specification %s", argv[0]); error("Invalid state specification %s", argv[0]);
exit_code = 1; exit_code = 1;
xfree(ids);
return 0; return 0;
} }
} }
...@@ -417,7 +412,17 @@ scontrol_requeue_hold(int argc, char **argv) ...@@ -417,7 +412,17 @@ scontrol_requeue_hold(int argc, char **argv)
/* Go and requeue the state either in /* Go and requeue the state either in
* JOB_SPECIAL_EXIT or HELD state. * JOB_SPECIAL_EXIT or HELD state.
*/ */
rc = slurm_requeue(job_id, state_flag); for (i = 0; i < num_ids; i++) {
rc = slurm_requeue(ids[i], state_flag);
if (rc != SLURM_SUCCESS) {
fprintf(stderr, "%s array job_id %u\n",
slurm_strerror(slurm_get_errno()), ids[i]);
exit_code = 1;
break;
}
}
xfree(ids);
return rc; return rc;
} }
...@@ -1032,7 +1037,7 @@ _parse_requeue_flags(char *s, uint32_t *state) ...@@ -1032,7 +1037,7 @@ _parse_requeue_flags(char *s, uint32_t *state)
* Detect the _ jobid separator. * Detect the _ jobid separator.
*/ */
static inline bool static inline bool
is_job_array(const char *jobid) _is_array_task_id(const char *jobid)
{ {
int cc; int cc;
...@@ -1042,58 +1047,127 @@ is_job_array(const char *jobid) ...@@ -1042,58 +1047,127 @@ is_job_array(const char *jobid)
++cc; ++cc;
++jobid; ++jobid;
} }
if (cc == 1) if (cc == 1)
return true; return true;
return false; return false;
} }
/* get_array_job_id() /* _get_job_info()
*/ */
static uint32_t static job_info_msg_t *
get_array_job_id(const char *jobid) _get_job_info(const char *jobid, uint32_t *task_id)
{ {
char job_id[64]; char buf[64];
char *taskid; char *taskid;
char *next_str; char *next_str;
int ntaskid; uint32_t job_id;
int njobid;
int cc; int cc;
int ujobid;
job_info_msg_t *job_info; job_info_msg_t *job_info;
if (strlen(jobid) > 63) if (strlen(jobid) > 63)
return NO_VAL; return NULL;
strcpy(buf, jobid);
taskid = strchr(buf, '_');
if (taskid) {
*taskid = 0;
++taskid;
*task_id = (uint32_t)strtol(taskid, &next_str, 10);
if (next_str[0] != '\0') {
fprintf(stderr, "Invalid task_id specified\n");
return NULL;
}
}
job_id = (uint32_t)strtol(buf, &next_str, 10);
if (next_str[0] != '\0') {
fprintf(stderr, "Invalid job_id specified\n");
return NULL;
}
cc = slurm_load_job(&job_info, job_id, SHOW_ALL);
if (cc < 0) {
slurm_perror("slurm_load_job");
return NULL;
}
return job_info;
}
strcpy(job_id, jobid); /* _get_job_ids()
*/
static uint32_t *
_get_job_ids(const char *jobid, uint32_t *num_ids)
{
job_info_msg_t *job_info;
uint32_t *job_ids;
uint32_t task_id;
int i;
int cc;
taskid = strchr(job_id, '_'); job_info = _get_job_info(jobid, &task_id);
if (taskid == NULL) if (job_info == NULL)
return NO_VAL; return NULL;
*taskid = 0; if (_is_array_task_id(jobid)) {
++taskid;
ntaskid = (uint32_t)strtol(taskid, &next_str, 10); job_ids = xmalloc(sizeof(uint32_t));
if (next_str[0] != '\0') *num_ids = 1;
return NO_VAL;
njobid = (uint32_t)strtol(job_id, &next_str, 10); /* Search for the job_id of the specified
if (next_str[0] != '\0') * task.
return NO_VAL; */
for (cc = 0; cc < job_info->record_count; cc++) {
if (task_id == job_info->job_array[cc].array_task_id) {
job_ids[0] = job_info->job_array[cc].job_id;
break;
}
}
cc = slurm_load_job(&job_info, njobid, SHOW_ALL); slurm_free_job_info_msg(job_info);
if (cc < 0) return job_ids;
return NO_VAL; }
ujobid = -1; if (job_info->record_count == 1) {
/* No task elements beside the
* job itself so it cannot be
* a job array.
*/
job_ids = xmalloc(sizeof(uint32_t));
*num_ids = 1;
job_ids[0] = job_info->job_array[0].job_id;
slurm_free_job_info_msg(job_info);
return job_ids;
}
*num_ids = job_info->record_count;
job_ids = xmalloc((*num_ids) * sizeof(uint32_t));
/* First save the pending jobs
*/
i = 0;
for (cc = 0; cc < job_info->record_count; cc++) { for (cc = 0; cc < job_info->record_count; cc++) {
if (ntaskid == job_info->job_array[cc].array_task_id) { if (job_info->job_array[cc].job_state == JOB_PENDING) {
ujobid = job_info->job_array[cc].job_id; job_ids[i] = job_info->job_array[cc].job_id;
break; ++i;
}
}
/* then the rest of the states
*/
for (cc = 0; cc < job_info->record_count; cc++) {
if (job_info->job_array[cc].job_state != JOB_PENDING) {
job_ids[i] = job_info->job_array[cc].job_id;
++i;
} }
} }
xassert(i == *num_ids);
slurm_free_job_info_msg(job_info); slurm_free_job_info_msg(job_info);
return ujobid; return job_ids;
} }
...@@ -3658,8 +3658,19 @@ inline static void _slurm_rpc_requeue(slurm_msg_t * msg) ...@@ -3658,8 +3658,19 @@ inline static void _slurm_rpc_requeue(slurm_msg_t * msg)
job_ptr = find_job_record(req_ptr->job_id); job_ptr = find_job_record(req_ptr->job_id);
if (job_ptr == NULL) { if (job_ptr == NULL) {
slurm_msg_t resp_msg;
return_code_msg_t rc_msg;
info("%s: %u: %s", __func__, req_ptr->job_id, info("%s: %u: %s", __func__, req_ptr->job_id,
slurm_strerror(ESLURM_INVALID_JOB_ID)); slurm_strerror(ESLURM_INVALID_JOB_ID));
slurm_msg_t_init(&resp_msg);
resp_msg.protocol_version = msg->protocol_version;
resp_msg.msg_type = RESPONSE_SLURM_RC;
rc_msg.return_code = ESLURM_INVALID_JOB_ID;
resp_msg.data = &rc_msg;
slurm_send_node_msg(msg->conn_fd, &resp_msg);
return; return;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment