From a23f3c5fc75e10169b9fb0b2911d633668b22837 Mon Sep 17 00:00:00 2001 From: Morris Jette <jette@schedmd.com> Date: Fri, 1 Aug 2014 10:12:49 -0700 Subject: [PATCH] Return mulitple errors for job array update Issue single RPC to update a job array and return separate error codes as needed. --- slurm/slurm.h.in | 11 + src/api/update_config.c | 43 +++- src/plugins/slurmctld/nonstop/do_work.c | 19 +- src/scontrol/update_job.c | 288 ++++++++---------------- src/slurmctld/job_mgr.c | 125 ++++++---- src/slurmctld/proc_req.c | 10 +- src/slurmctld/slurmctld.h | 8 +- 7 files changed, 255 insertions(+), 249 deletions(-) diff --git a/slurm/slurm.h.in b/slurm/slurm.h.in index 2f915366e62..54e2bb88cd3 100644 --- a/slurm/slurm.h.in +++ b/slurm/slurm.h.in @@ -3207,6 +3207,17 @@ extern char *slurm_sprint_job_info PARAMS((slurm_job_info_t * job_ptr, */ extern int slurm_update_job PARAMS((job_desc_msg_t * job_msg)); +/* + * slurm_update_job2 - issue RPC to a job's configuration per request, + * only usable by user root or (for some parameters) the job's owner + * IN job_msg - description of job updates + * OUT resp - per task response to the request, + * free using slurm_free_job_array_resp() + * RET 0 on success, otherwise return -1 and set errno to indicate the error + */ +extern int slurm_update_job2 PARAMS((job_desc_msg_t * job_msg, + job_array_resp_msg_t **resp)); + /* * slurm_xlate_job_id - Translate a Slurm job ID string into a slurm job ID * number. If this job ID contains an array index, map this to the diff --git a/src/api/update_config.c b/src/api/update_config.c index c12d760e964..5a365a0efa5 100644 --- a/src/api/update_config.c +++ b/src/api/update_config.c @@ -72,11 +72,52 @@ slurm_update_front_end (update_front_end_msg_t * front_end_msg) * RET 0 on success, otherwise return -1 and set errno to indicate the error */ int -slurm_update_job ( job_desc_msg_t * job_msg) +slurm_update_job (job_desc_msg_t * job_msg) { + if (job_msg->job_id_str) { + error("Use slurm_update_job2() rather than slurm_update_job() " + "with job_msg->job_id_str to get multiple error codes " + "for various job array task and avoid memory leaks"); + } return _slurm_update ((void *) job_msg, REQUEST_UPDATE_JOB); } +/* + * slurm_update_job2 - issue RPC to a job's configuration per request, + * only usable by user root or (for some parameters) the job's owner + * IN job_msg - description of job updates + * OUT resp - per task response to the request, + * free using slurm_free_job_array_resp() + * RET 0 on success, otherwise return -1 and set errno to indicate the error + */ +extern int +slurm_update_job2 (job_desc_msg_t * job_msg, job_array_resp_msg_t **resp) +{ + int rc = SLURM_SUCCESS; + slurm_msg_t req_msg, resp_msg; + + slurm_msg_t_init(&req_msg); + slurm_msg_t_init(&resp_msg); + req_msg.msg_type = REQUEST_UPDATE_JOB; + req_msg.data = job_msg; + + rc = slurm_send_recv_controller_msg(&req_msg, &resp_msg); + switch (resp_msg.msg_type) { + case RESPONSE_JOB_ARRAY_ERRORS: + *resp = (job_array_resp_msg_t *) resp_msg.data; + break; + case RESPONSE_SLURM_RC: + rc = ((return_code_msg_t *) resp_msg.data)->return_code; + if (rc) + slurm_seterrno(rc); + break; + default: + slurm_seterrno(SLURM_UNEXPECTED_MSG_ERROR); + } + + return rc; +} + /* * slurm_update_node - issue RPC to a node's configuration per request, * only usable by user root diff --git a/src/plugins/slurmctld/nonstop/do_work.c b/src/plugins/slurmctld/nonstop/do_work.c index 0156bc8aec2..ce13aeb66f7 100644 --- a/src/plugins/slurmctld/nonstop/do_work.c +++ b/src/plugins/slurmctld/nonstop/do_work.c @@ -277,6 +277,15 @@ unpack_error: return SLURM_ERROR; } +static int _update_job(job_desc_msg_t * job_specs, uid_t uid) +{ + slurm_msg_t msg; + + msg.data= job_specs; + msg.conn_fd = -1; + return update_job(&msg, uid); +} + /* * Save all nonstop plugin state information */ @@ -1057,7 +1066,7 @@ extern char *drop_node(char *cmd_ptr, uid_t cmd_uid, job_alloc_req.job_id = job_id; job_alloc_req.req_nodes = hostlist_ranged_string_xmalloc(hl); hostlist_destroy(hl); - rc = update_job(&job_alloc_req, cmd_uid); + rc = _update_job(&job_alloc_req, cmd_uid); if (rc) { info("slurmctld/nonstop: can remove failing node %s " "from job %u: %s", @@ -1382,7 +1391,7 @@ merge: new_node_name = strdup(new_job_ptr->nodes); slurm_init_job_desc_msg(&job_alloc_req); job_alloc_req.job_id = new_job_ptr->job_id; job_alloc_req.min_nodes = 0; - rc = update_job(&job_alloc_req, cmd_uid); + rc = _update_job(&job_alloc_req, cmd_uid); /* Without unlock, the job_fini_callback() function will deadlock. * Not a great solution, but perhaps the least bad solution. */ @@ -1405,7 +1414,7 @@ merge: new_node_name = strdup(new_job_ptr->nodes); slurm_init_job_desc_msg(&job_alloc_req); job_alloc_req.job_id = job_id; job_alloc_req.min_nodes = INFINITE; - rc = update_job(&job_alloc_req, cmd_uid); + rc = _update_job(&job_alloc_req, cmd_uid); if (rc) { info("slurmctld/nonstop: can not grow job %u: %s", job_id, slurm_strerror(rc)); @@ -1441,7 +1450,7 @@ merge: new_node_name = strdup(new_job_ptr->nodes); job_alloc_req.job_id = job_id; job_alloc_req.req_nodes = hostlist_ranged_string_xmalloc(hl); hostlist_destroy(hl); - rc = update_job(&job_alloc_req, cmd_uid); + rc = _update_job(&job_alloc_req, cmd_uid); if (rc) { info("slurmctld/nonstop: can remove failing node %s " "from job %u: %s", @@ -1674,7 +1683,7 @@ extern char *time_incr(char *cmd_ptr, uid_t cmd_uid, uint32_t protocol_version) job_specs.job_id = job_id; job_specs.time_limit = job_fail_ptr->job_ptr->time_limit; job_specs.time_limit += minutes; - rc = update_job(&job_specs, cmd_uid); + rc = _update_job(&job_specs, cmd_uid); } if (rc) { xstrfmtcat(resp, "%s EJOBUPDATE %s", SLURM_VERSION_STRING, diff --git a/src/scontrol/update_job.c b/src/scontrol/update_job.c index 1cca1acaac4..739f792054c 100644 --- a/src/scontrol/update_job.c +++ b/src/scontrol/update_job.c @@ -55,10 +55,8 @@ static int _parse_restart_args(int argc, char **argv, uint16_t *stick, char **image_dir); static void _update_job_size(uint32_t job_id); static int _parse_requeue_flags(char *, uint32_t *state_flags); -static job_info_msg_t *_get_job_info(const char *jobid, uint32_t *task_id); -static job_ids_t *_get_job_ids2(const char *jobid, uint32_t *num_ids); -static void _free_job_ids(job_ids_t *job_ids, uint32_t num_ids); -static int _parse_job_ids(const char *, uint32_t *, uint32_t *); +static int _get_job_count(const char *job_id_str); +static uint32_t _get_job_time(const char *job_id_str); /* * scontrol_checkpoint - perform some checkpoint/resume operation @@ -212,31 +210,6 @@ _parse_restart_args(int argc, char **argv, uint16_t *stick, char **image_dir) return 0; } -/* Return the current time limit of the specified job_id or NO_VAL if the - * information is not available */ -static uint32_t _get_job_time(uint32_t job_id) -{ - uint32_t time_limit = NO_VAL; - int i, rc; - job_info_msg_t *resp; - - rc = slurm_load_job(&resp, job_id, SHOW_ALL); - if (rc == SLURM_SUCCESS) { - for (i = 0; i < resp->record_count; i++) { - if (resp->job_array[i].job_id != job_id) - continue; /* should not happen */ - time_limit = resp->job_array[i].time_limit; - break; - } - slurm_free_job_info_msg(resp); - } else { - error("Could not load state information for job %u: %m", - job_id); - } - - return time_limit; -} - /* * scontrol_hold - perform some job hold/release operation * IN op - hold/release operation @@ -507,9 +480,8 @@ scontrol_update_job (int argc, char *argv[]) char *tag, *val; int taglen, vallen; job_desc_msg_t job_msg; - job_ids_t *ids = NULL; - uint32_t num_ids = 0; - char *job_id_str; + int32_t num_ids = -1; + job_array_resp_msg_t *resp = NULL; slurm_init_job_desc_msg (&job_msg); @@ -533,17 +505,22 @@ scontrol_update_job (int argc, char *argv[]) exit_code = 1; fprintf (stderr, "Invalid input: %s\n", argv[i]); fprintf (stderr, "Request aborted\n"); - _free_job_ids(ids, num_ids); return -1; } if (strncasecmp(tag, "JobId", MAX(taglen, 3)) == 0) { - ids = _get_job_ids2(val, &num_ids); - if (ids == NULL) { + job_msg.job_id_str = val; + num_ids = _get_job_count(val); + if (num_ids < 0) { error ("Invalid JobId value: %s", val); exit_code = 1; return 0; } + if (num_ids == 0) { + error ("JobId not found"); + exit_code = 1; + return 0; + } } else if (strncasecmp(tag, "Comment", MAX(taglen, 3)) == 0) { job_msg.comment = val; @@ -561,29 +538,25 @@ scontrol_update_job (int argc, char *argv[]) if ((time_limit < 0) && (time_limit != INFINITE)) { error("Invalid TimeLimit value"); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } if (incr || decr) { - if (num_ids == 0) { + if (num_ids < 1) { error("JobId must preceed TimeLimit " "increment or decrement"); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } if (num_ids > 1) { error("TimeLimit increment/decrement " "not supported for job arrays"); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } - job_current_time = _get_job_time(ids[0]. - job_id); + job_current_time = _get_job_time(job_msg. + job_id_str); if (job_current_time == NO_VAL) { exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } if (incr) { @@ -593,7 +566,6 @@ scontrol_update_job (int argc, char *argv[]) " current time limit (%u > %u)", time_limit, job_current_time); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } else { time_limit = job_current_time - @@ -608,7 +580,6 @@ scontrol_update_job (int argc, char *argv[]) if ((time_min < 0) && (time_min != INFINITE)) { error("Invalid TimeMin value"); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } job_msg.time_min = time_min; @@ -618,7 +589,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint32(val, &job_msg.priority)) { error ("Invalid Priority value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -631,7 +601,6 @@ scontrol_update_job (int argc, char *argv[]) "-%d and %d", NICE_OFFSET, NICE_OFFSET); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } job_msg.nice = NICE_OFFSET + nice; @@ -645,7 +614,6 @@ scontrol_update_job (int argc, char *argv[]) (max_cpus && (max_cpus < min_cpus))) { error ("Invalid NumCPUs value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } job_msg.min_cpus = min_cpus; @@ -659,7 +627,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint32(val, &job_msg.num_tasks)) { error ("Invalid NumTasks value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -668,7 +635,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint16(val, &job_msg.requeue)) { error ("Invalid Requeue value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -687,10 +653,8 @@ scontrol_update_job (int argc, char *argv[]) rc = get_resource_arg_range( val, "requested node count", &min_nodes, &max_nodes, false); - if (!rc) { - _free_job_ids(ids, num_ids); + if (!rc) return rc; - } job_msg.min_nodes = (uint32_t) min_nodes; job_msg.max_nodes = (uint32_t) max_nodes; } @@ -701,7 +665,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint16(val, &job_msg.sockets_per_node)) { error ("Invalid ReqSockets value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -710,7 +673,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint16(val, &job_msg.cores_per_socket)) { error ("Invalid ReqCores value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -719,7 +681,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint16(val, &job_msg.ntasks_per_node)) { error ("Invalid TasksPerNode value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -728,7 +689,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint16(val, &job_msg.threads_per_core)) { error ("Invalid ReqThreads value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -737,7 +697,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint16(val, &job_msg.pn_min_cpus)) { error ("Invalid MinCPUsNode value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -747,7 +706,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint32(val, &job_msg.pn_min_memory)) { error ("Invalid MinMemoryNode value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -757,7 +715,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint32(val, &job_msg.pn_min_memory)) { error ("Invalid MinMemoryCPU value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } job_msg.pn_min_memory |= MEM_PER_CPU; @@ -768,7 +725,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint32(val, &job_msg.pn_min_tmp_disk)) { error ("Invalid MinTmpDiskNode value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -813,7 +769,6 @@ scontrol_update_job (int argc, char *argv[]) if (parse_uint32(val, &job_msg.wait4switch)) { error ("Invalid wait-for-switch value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -826,7 +781,6 @@ scontrol_update_job (int argc, char *argv[]) else if (parse_uint16(val, &job_msg.shared)) { error ("Invalid wait-for-switch value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -839,7 +793,6 @@ scontrol_update_job (int argc, char *argv[]) else if (parse_uint16(val, &job_msg.contiguous)) { error ("Invalid Contiguous value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -850,7 +803,6 @@ scontrol_update_job (int argc, char *argv[]) else if (parse_uint16(val, &job_msg.core_spec)) { error ("Invalid CoreSpec value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -935,7 +887,6 @@ scontrol_update_job (int argc, char *argv[]) else if (parse_uint16(val, &job_msg.rotate)) { error ("Invalid rotate value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -969,7 +920,6 @@ scontrol_update_job (int argc, char *argv[]) else if (parse_uint16(val, &job_msg.reboot)) { error ("Invalid reboot value: %s", val); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } update_cnt++; @@ -979,7 +929,6 @@ scontrol_update_job (int argc, char *argv[]) fprintf (stderr, "Update of this parameter is not " "supported: %s\n", argv[i]); fprintf (stderr, "Request aborted\n"); - _free_job_ids(ids, num_ids); return 0; } } @@ -987,55 +936,47 @@ scontrol_update_job (int argc, char *argv[]) if (update_cnt == 0) { exit_code = 1; fprintf (stderr, "No changes specified\n"); - _free_job_ids(ids, num_ids); return 0; } if (num_ids == 0) { error("No job ID specified"); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } if ((num_ids > 1) && update_size) { error("Job resizing not supported for entire job array"); error("Each job array element must be re-sized individually"); exit_code = 1; - _free_job_ids(ids, num_ids); return 0; } - job_id_str = NULL; - for (i = 0; i < num_ids; i++) { - - if (ids[i].array_task_str) { - xstrfmtcat(job_id_str, "%u_%s", - ids[i].array_job_id, ids[i].array_task_str); - } else if (ids[i].array_task_id != NO_VAL) { - xstrfmtcat(job_id_str, "%u_%u", - ids[i].array_job_id, ids[i].array_task_id); - } else { - xstrfmtcat(job_id_str, "%u", ids[i].job_id); + rc = slurm_update_job2(&job_msg, &resp); + if (rc != SLURM_SUCCESS) { + exit_code = 1; + if (quiet_flag != 1) { + fprintf(stderr, "%s for job %s\n", + slurm_strerror(slurm_get_errno()), + job_msg.job_id_str); } - - job_msg.job_id_str = job_id_str; - rc = slurm_update_job(&job_msg); - - if (rc != SLURM_SUCCESS) + } else if (resp) { + for (i = 0; i < resp->job_array_count; i++) { + if ((resp->error_code[i] == SLURM_SUCCESS) && + (resp->job_array_count == 1)) + continue; exit_code = 1; - if ((rc != SLURM_SUCCESS) && (quiet_flag != 1)) { - fprintf(stderr, "%s for job %s\n", - slurm_strerror(slurm_get_errno()), job_id_str); + if (quiet_flag == 1) + continue; + fprintf(stderr, "%s: %s\n", + resp->job_array_id[i], + slurm_strerror(resp->error_code[i])); } - - xfree(job_id_str); + slurm_free_job_array_resp(resp); } if (update_size) /* See check above for one job ID */ _update_job_size(job_msg.job_id); - _free_job_ids(ids, num_ids); - return rc; } @@ -1196,124 +1137,83 @@ _parse_requeue_flags(char *s, uint32_t *state) return -1; } -/* _get_job_info() - */ -static job_info_msg_t * -_get_job_info(const char *jobid, uint32_t *task_id) +/* _get_job_count() - Return count of job records for given job_id */ +static int _get_job_count(const char *job_id_str) { uint32_t job_id; - int cc; - job_info_msg_t *job_info; - - if (_parse_job_ids(jobid, &job_id, task_id) < 0) - return NULL; - - cc = slurm_load_job(&job_info, job_id, SHOW_ALL); - if (cc < 0) { - slurm_perror("slurm_load_job"); - return NULL; + char *next_str = NULL, *under; + job_info_msg_t *job_info = NULL; + int rc; + + under = strchr(job_id_str, '_'); + if (under) { + if (strchr(under, ',') || strchr(under, '-')) + return 2; /* At least two tasks of job array */ + return 1; } - return job_info; -} - -/* _get_job_ids2() - Return array of job record for all incomplete jobs - * NOTE: Call _free_job_ids() to free allocated memory - */ -static job_ids_t *_get_job_ids2(const char *jobid, uint32_t *num_ids) -{ - job_info_msg_t *job_info; - job_ids_t *job_ids = NULL; - uint32_t job_state; - uint32_t task_id; - uint32_t job_id; - int i; - int cc; - - if (_parse_job_ids(jobid, &job_id, &task_id) < 0) - return NULL; - - if (task_id != NO_VAL) { - - *num_ids = 1; - job_ids = xmalloc(sizeof(job_ids_t)); - job_ids->array_job_id = job_id; - job_ids->array_task_id = task_id; - - return job_ids; + job_id = (uint32_t)strtol(job_id_str, &next_str, 10); + if (next_str[0] != '\0') { + fprintf(stderr, "Invalid JobID specified: %s\n", job_id_str); + return -1; } - *num_ids = 0; - task_id = 0; - job_info = _get_job_info(jobid, &task_id); - if (job_info == NULL) - return NULL; - - *num_ids = job_info->record_count; - job_ids = xmalloc(job_info->record_count * sizeof(job_ids_t)); - i = 0; - for (cc = 0; cc < job_info->record_count; cc++) { - job_state = job_info->job_array[cc].job_state & JOB_STATE_BASE; - if (job_state <= JOB_SUSPENDED) { - job_ids[i].array_job_id = - job_info->job_array[cc].array_job_id; - job_ids[i].array_task_id = - job_info->job_array[cc].array_task_id; - job_ids[i].array_task_str = - xstrdup(job_info-> - job_array[cc].array_task_str); - job_ids[i].job_id = job_info->job_array[cc].job_id; - ++i; - } + rc = slurm_load_job(&job_info, job_id, SHOW_ALL); + if (rc < 0) { + slurm_perror("slurm_load_job"); + return -1; } - *num_ids = i; + + rc = (int) job_info->record_count; slurm_free_job_info_msg(job_info); - return job_ids; + return rc; } -static void _free_job_ids(job_ids_t *job_ids, uint32_t num_ids) -{ - int i; - - if (!job_ids || (num_ids == 0)) - return; - for (i = 0; i < num_ids; i++) - xfree(job_ids[i].array_task_str); - xfree(job_ids); -} -static int -_parse_job_ids(const char *jobid, uint32_t *job_id, uint32_t *task_id) +/* Return the current time limit of the specified job_id or NO_VAL if the + * information is not available */ +static uint32_t _get_job_time(const char *job_id_str) { - char buf[64]; - char *next_str; - char *taskid; - - if (strlen(jobid) > 63) - return -1; - - strcpy(buf, jobid); + uint32_t job_id, task_id; + char *next_str = NULL; + uint32_t time_limit = NO_VAL; + int i, rc; + job_info_msg_t *resp; + bitstr_t *array_bitmap; - taskid = strchr(buf, '_'); - if (taskid) { - *taskid = 0; - ++taskid; + job_id = (uint32_t)strtol(job_id_str, &next_str, 10); + if (next_str[0] == '_') + task_id = (uint32_t)strtol(next_str+1, NULL, 10); + else + task_id = NO_VAL; - *task_id = (uint32_t)strtol(taskid, &next_str, 10); - if (next_str[0] != '\0') { - fprintf(stderr, "Invalid task_id specified\n"); - return -1; + rc = slurm_load_job(&resp, job_id, SHOW_ALL); + if (rc == SLURM_SUCCESS) { + for (i = 0; i < resp->record_count; i++) { + if (resp->job_array[i].job_id == job_id) { + /* Regular job match */ + time_limit = resp->job_array[i].time_limit; + break; + } + if (resp->job_array[i].array_job_id != job_id) + continue; + array_bitmap = (bitstr_t *) + resp->job_array[i].array_bitmap; + if ((task_id == NO_VAL) || + (resp->job_array[i].array_task_id == task_id) || + ((task_id < bit_size(array_bitmap)) && + bit_test(array_bitmap, task_id))) { + /* Array job with task_id match */ + time_limit = resp->job_array[i].time_limit; + break; + } } + slurm_free_job_info_msg(resp); } else { - *task_id = NO_VAL; + error("Could not load state information for job %s: %m", + job_id_str); } - *job_id = (uint32_t)strtol(buf, &next_str, 10); - if (next_str[0] != '\0') { - fprintf(stderr, "Invalid job_id specified\n"); - return -1; - } - - return 0; + return time_limit; } diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c index c5617bd69cc..320f3316438 100644 --- a/src/slurmctld/job_mgr.c +++ b/src/slurmctld/job_mgr.c @@ -208,6 +208,8 @@ static int _reset_detail_bitmaps(struct job_record *job_ptr); static void _reset_step_bitmaps(struct job_record *job_ptr); static void _resp_array_add(resp_array_struct_t **resp, struct job_record *job_ptr, uint32_t rc); +static void _resp_array_add_id(resp_array_struct_t **resp, uint32_t job_id, + uint32_t task_id, uint32_t rc); static void _resp_array_free(resp_array_struct_t *resp); static job_array_resp_msg_t *_resp_array_xlate(resp_array_struct_t *resp, uint32_t job_id); @@ -339,6 +341,20 @@ static void _resp_array_add(resp_array_struct_t **resp, loc_resp->resp_array_cnt++; return; } +/* Add record to resp_array_struct_t, free with _resp_array_free(). + * This is a variant of _resp_array_add for the case where a job/task ID + * is not found, so we use a dummy job record based upon the input IDs. */ +static void _resp_array_add_id(resp_array_struct_t **resp, uint32_t job_id, + uint32_t task_id, uint32_t rc) +{ + struct job_record job_ptr; + + job_ptr.job_id = job_id; + job_ptr.array_job_id = job_id; + job_ptr.array_task_id = task_id; + job_ptr.array_recs = NULL; + _resp_array_add(resp, &job_ptr, rc); +} /* Free resp_array_struct_t built by _resp_array_add() */ static void _resp_array_free(resp_array_struct_t *resp) @@ -4013,13 +4029,13 @@ extern int job_str_signal(char *job_id_str, uint16_t signal, uint16_t flags, if (job_ptr && job_ptr->array_recs) { /* This is a job array */ + job_ptr_done = job_ptr; rc = _job_signal(job_ptr, signal, flags, uid, preempt); jobs_signalled++; if (rc == ESLURM_ALREADY_DONE) { jobs_done++; rc = SLURM_SUCCESS; } - job_ptr_done = job_ptr; } /* Signal all tasks of this job array */ @@ -9998,40 +10014,43 @@ fini: /* * update_job - update a job's parameters per the supplied specifications - * IN job_specs - a job's specification + * IN msg - RPC to update job, including change specification * IN uid - uid of user issuing RPC * RET returns an error code from slurm_errno.h * global: job_list - global list of job entries * last_job_update - time of last job table update */ -extern int update_job(job_desc_msg_t * job_specs, uid_t uid) +extern int update_job(slurm_msg_t *msg, uid_t uid) { + job_desc_msg_t *job_specs = (job_desc_msg_t *) msg->data; struct job_record *job_ptr; - - /* Make sure anything that may be put in the database will be - * lower case */ - xstrtolower(job_specs->account); - xstrtolower(job_specs->wckey); + int rc; job_ptr = find_job_record(job_specs->job_id); if (job_ptr == NULL) { error("update_job: job_id %u does not exist.", job_specs->job_id); - return ESLURM_INVALID_JOB_ID; + rc = ESLURM_INVALID_JOB_ID; + } else { + rc = _update_job(job_ptr, job_specs, uid); } - return _update_job(job_ptr, job_specs, uid); + + slurm_send_rc_msg(msg, rc); + return rc; } /* - * update_job_str - update a job's parameters per the supplied specifications + * IN msg - RPC to update job, including change specification * IN job_specs - a job's specification * IN uid - uid of user issuing RPC * RET returns an error code from slurm_errno.h * global: job_list - global list of job entries * last_job_update - time of last job table update */ -extern int update_job_str(job_desc_msg_t *job_specs, uid_t uid) +extern int update_job_str(slurm_msg_t *msg, uid_t uid) { + slurm_msg_t resp_msg; + job_desc_msg_t *job_specs = (job_desc_msg_t *) msg->data; struct job_record *job_ptr, *new_job_ptr; slurm_ctl_conf_t *conf; long int long_id; @@ -10039,16 +10058,14 @@ extern int update_job_str(job_desc_msg_t *job_specs, uid_t uid) bitstr_t *array_bitmap, *tmp_bitmap; bool valid = true; int32_t i, i_first, i_last; - int len, rc, rc2; + int len, rc = SLURM_SUCCESS, rc2; char *end_ptr, *tok, *tmp; - char buf[32], *job_id_str; + char *job_id_str; + resp_array_struct_t *resp_array = NULL; + job_array_resp_msg_t *resp_array_msg; + return_code_msg_t rc_msg; - if (job_specs->job_id_str) { - job_id_str = job_specs->job_id_str; - } else { - snprintf(buf, sizeof(buf),"%u",job_specs->job_id); - job_id_str = buf; - } + job_id_str = job_specs->job_id_str; if (max_array_size == NO_VAL) { conf = slurm_conf_lock(); @@ -10056,49 +10073,50 @@ extern int update_job_str(job_desc_msg_t *job_specs, uid_t uid) slurm_conf_unlock(); } - /* Make sure anything that may be put in the database will be - * lower case */ - xstrtolower(job_specs->account); - xstrtolower(job_specs->wckey); - long_id = strtol(job_id_str, &end_ptr, 10); if ((long_id <= 0) || (long_id == LONG_MAX) || ((end_ptr[0] != '\0') && (end_ptr[0] != '_'))) { info("update_job_str: invalid job id %s", job_id_str); - return ESLURM_INVALID_JOB_ID; + rc = ESLURM_INVALID_JOB_ID; + goto reply; } job_id = (uint32_t) long_id; if (end_ptr[0] == '\0') { /* Single job (or full job array) */ struct job_record *job_ptr_done = NULL; job_ptr = find_job_record(job_id); - if (job_ptr && (job_ptr->array_task_id == NO_VAL) && - (job_ptr->array_recs == NULL)) { - /* This is a regular job, not a job array */ - return _update_job(job_ptr, job_specs, uid); + if (job_ptr && + (((job_ptr->array_task_id == NO_VAL) && + (job_ptr->array_recs == NULL)) || + ((job_ptr->array_task_id != NO_VAL) && + (job_ptr->array_job_id != job_id)))) { + /* This is a regular job or single task of job array */ + rc = _update_job(job_ptr, job_specs, uid); + goto reply; } if (job_ptr && job_ptr->array_recs) { /* This is a job array */ - rc = _update_job(job_ptr, job_specs, uid); job_ptr_done = job_ptr; + rc2 = _update_job(job_ptr, job_specs, uid); + _resp_array_add(&resp_array, job_ptr, rc2); } /* Update all tasks of this job array */ job_ptr = job_array_hash_j[JOB_HASH_INX(job_id)]; if (!job_ptr && !job_ptr_done) { info("update_job_str: invalid job id %u", job_id); - return ESLURM_INVALID_JOB_ID; + rc = ESLURM_INVALID_JOB_ID; + goto reply; } while (job_ptr) { if ((job_ptr->array_job_id == job_id) && (job_ptr != job_ptr_done)) { rc2 = _update_job(job_ptr, job_specs, uid); - rc = MAX(rc, rc2); + _resp_array_add(&resp_array, job_ptr, rc2); } job_ptr = job_ptr->job_array_next_j; } - return rc; - + goto reply; } array_bitmap = bit_alloc(max_array_size); @@ -10117,7 +10135,8 @@ extern int update_job_str(job_desc_msg_t *job_specs, uid_t uid) } if (!valid) { info("update_job_str: invalid job id %s", job_id_str); - return ESLURM_INVALID_JOB_ID; + rc = ESLURM_INVALID_JOB_ID; + goto reply; } job_ptr = find_job_record(job_id); @@ -10139,7 +10158,8 @@ extern int update_job_str(job_desc_msg_t *job_specs, uid_t uid) } else if (bit_super_set(job_ptr->array_recs->task_id_bitmap, array_bitmap)) { /* Update the record with all pending tasks */ - rc = _update_job(job_ptr, job_specs, uid); + rc2 = _update_job(job_ptr, job_specs, uid); + _resp_array_add(&resp_array, job_ptr, rc2); bit_not(job_ptr->array_recs->task_id_bitmap); bit_and(array_bitmap, job_ptr->array_recs->task_id_bitmap); @@ -10180,14 +10200,32 @@ extern int update_job_str(job_desc_msg_t *job_specs, uid_t uid) job_ptr = find_job_array_rec(job_id, i); if (job_ptr == NULL) { info("update_job_str: invalid job id %u_%d", job_id, i); - rc = ESLURM_INVALID_JOB_ID; + _resp_array_add_id(&resp_array, job_id, i, + ESLURM_INVALID_JOB_ID); continue; } rc2 = _update_job(job_ptr, job_specs, uid); - rc = MAX(rc, rc2); + _resp_array_add(&resp_array, job_ptr, rc2); } +reply: + if (msg->conn_fd >= 0) { + slurm_msg_t_init(&resp_msg); + resp_msg.protocol_version = msg->protocol_version; + if (resp_array) { + resp_array_msg = _resp_array_xlate(resp_array, job_id); + resp_msg.msg_type = RESPONSE_JOB_ARRAY_ERRORS; + resp_msg.data = resp_array_msg; + } else { + resp_msg.msg_type = RESPONSE_SLURM_RC; + rc_msg.return_code = rc; + resp_msg.data = &rc_msg; + } + slurm_send_node_msg(msg->conn_fd, &resp_msg); + } + _resp_array_free(resp_array); + return rc; } @@ -11845,6 +11883,7 @@ extern int job_suspend2(suspend_msg_t *sus_ptr, uid_t uid, if (job_ptr && job_ptr->array_recs) { /* This is a job array */ + job_ptr_done = job_ptr; rc2 = _job_suspend(job_ptr, sus_ptr->op, indf_susp); _resp_array_add(&resp_array, job_ptr, rc2); } @@ -11898,7 +11937,8 @@ extern int job_suspend2(suspend_msg_t *sus_ptr, uid_t uid, job_ptr = find_job_array_rec(job_id, i); if (job_ptr == NULL) { info("job_suspend2: invalid job id %u_%d", job_id, i); - rc = ESLURM_INVALID_JOB_ID; + _resp_array_add_id(&resp_array, job_id, i, + ESLURM_INVALID_JOB_ID); continue; } rc2 = _job_suspend(job_ptr, sus_ptr->op, indf_susp); @@ -12151,9 +12191,9 @@ extern int job_requeue2(uid_t uid, requeue_msg_t *req_ptr, if (job_ptr && job_ptr->array_recs) { /* This is a job array */ + job_ptr_done = job_ptr; rc2 = _job_requeue(uid, job_ptr, preempt, state); _resp_array_add(&resp_array, job_ptr, rc2); - job_ptr_done = job_ptr; } /* Requeue all tasks of this job array */ @@ -12204,7 +12244,8 @@ extern int job_requeue2(uid_t uid, requeue_msg_t *req_ptr, job_ptr = find_job_array_rec(job_id, i); if (job_ptr == NULL) { info("job_requeue2: invalid job id %u_%d", job_id, i); - rc = ESLURM_INVALID_JOB_ID; + _resp_array_add_id(&resp_array, job_id, i, + ESLURM_INVALID_JOB_ID); continue; } diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index f7119d85829..4fd1d34d64a 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -3191,8 +3191,14 @@ static void _slurm_rpc_update_job(slurm_msg_t * msg) /* do RPC call */ dump_job_desc(job_desc_msg); + /* Insure everything that may be written to database is lower case */ + xstrtolower(job_desc_msg->account); + xstrtolower(job_desc_msg->wckey); lock_slurmctld(job_write_lock); - error_code = update_job_str(job_desc_msg, uid); + if (job_desc_msg->job_id_str) + error_code = update_job_str(msg, uid); + else + error_code = update_job(msg, uid); unlock_slurmctld(job_write_lock); END_TIMER2("_slurm_rpc_update_job"); @@ -3200,11 +3206,9 @@ static void _slurm_rpc_update_job(slurm_msg_t * msg) if (error_code) { info("_slurm_rpc_update_job JobId=%u uid=%d: %s", job_desc_msg->job_id, uid, slurm_strerror(error_code)); - slurm_send_rc_msg(msg, error_code); } else { info("_slurm_rpc_update_job complete JobId=%u uid=%d %s", job_desc_msg->job_id, uid, TIME_STR); - slurm_send_rc_msg(msg, SLURM_SUCCESS); /* Below functions provide their own locking */ schedule_job_save(); schedule_node_save(); diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 979dd88f908..6f6dac0fb1c 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -1972,23 +1972,23 @@ extern void sync_job_priorities(void); /* * update_job - update a job's parameters per the supplied specifications - * IN job_specs - a job's specification + * IN msg - RPC to update job, including change specification * IN uid - uid of user issuing RPC * RET returns an error code from slurm_errno.h * global: job_list - global list of job entries * last_job_update - time of last job table update */ -extern int update_job(job_desc_msg_t * job_specs, uid_t uid); +extern int update_job(slurm_msg_t *msg, uid_t uid); /* - * update_job_str - update a job's parameters per the supplied specifications + * IN msg - RPC to update job, including change specification * IN job_specs - a job's specification * IN uid - uid of user issuing RPC * RET returns an error code from slurm_errno.h * global: job_list - global list of job entries * last_job_update - time of last job table update */ -extern int update_job_str(job_desc_msg_t * job_specs, uid_t uid); +extern int update_job_str(slurm_msg_t *msg, uid_t uid); /* * Modify the account associated with a pending job -- GitLab