Skip to content
Snippets Groups Projects
Commit c3f0e5c5 authored by Moe Jette's avatar Moe Jette
Browse files

Add alloc_sid and alloc_node to job submit/allocate descriptor.

Initialize/free/set alloc_sid and alloc_node in API functions.
Pack/unpack/free new elements in job descriptor RPCs.
Load/dump/pack new elements into job table records.
parent 9625e5ba
No related branches found
No related tags found
No related merge requests found
...@@ -459,7 +459,7 @@ static void *_wdog(void *args) ...@@ -459,7 +459,7 @@ static void *_wdog(void *args)
*/ */
static void *_thread_per_node_rpc(void *args) static void *_thread_per_node_rpc(void *args)
{ {
int msg_size; int msg_size = 0;
int rc; int rc;
slurm_fd sockfd; slurm_fd sockfd;
slurm_msg_t request_msg; slurm_msg_t request_msg;
......
...@@ -122,7 +122,6 @@ static void * _service_connection(void *arg); ...@@ -122,7 +122,6 @@ static void * _service_connection(void *arg);
static int _shutdown_backup_controller(void); static int _shutdown_backup_controller(void);
inline static void _slurm_rpc_allocate_resources(slurm_msg_t * msg); inline static void _slurm_rpc_allocate_resources(slurm_msg_t * msg);
inline static void _slurm_rpc_allocate_and_run(slurm_msg_t * msg); inline static void _slurm_rpc_allocate_and_run(slurm_msg_t * msg);
inline static void _slurm_rpc_batch_launch_resp(slurm_msg_t * msg);
inline static void _slurm_rpc_dump_conf(slurm_msg_t * msg); inline static void _slurm_rpc_dump_conf(slurm_msg_t * msg);
inline static void _slurm_rpc_dump_nodes(slurm_msg_t * msg); inline static void _slurm_rpc_dump_nodes(slurm_msg_t * msg);
inline static void _slurm_rpc_dump_partitions(slurm_msg_t * msg); inline static void _slurm_rpc_dump_partitions(slurm_msg_t * msg);
...@@ -764,10 +763,6 @@ static void _slurmctld_req (slurm_msg_t * msg) ...@@ -764,10 +763,6 @@ static void _slurmctld_req (slurm_msg_t * msg)
_slurm_rpc_update_partition(msg); _slurm_rpc_update_partition(msg);
slurm_free_update_part_msg(msg->data); slurm_free_update_part_msg(msg->data);
break; break;
case RESPONSE_BATCH_JOB_LAUNCH:
_slurm_rpc_batch_launch_resp(msg);
slurm_free_batch_resp_msg(msg->data);
break;
default: default:
error("invalid RPC message type %d", msg->msg_type); error("invalid RPC message type %d", msg->msg_type);
slurm_send_rc_msg(msg, EINVAL); slurm_send_rc_msg(msg, EINVAL);
...@@ -1952,26 +1947,6 @@ static void _slurm_rpc_node_registration(slurm_msg_t * msg) ...@@ -1952,26 +1947,6 @@ static void _slurm_rpc_node_registration(slurm_msg_t * msg)
} }
} }
/* Process RPC registering batch job launch */
static void _slurm_rpc_batch_launch_resp(slurm_msg_t * msg)
{
uid_t uid;
batch_launch_response_msg_t *launch_resp_msg =
(batch_launch_response_msg_t *) msg->data;
/* Locks: Write job */
slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK,
NO_LOCK, NO_LOCK };
debug("Processing RPC: RESPONSE_BATCH_JOB_LAUNCH");
uid = slurm_auth_uid(msg->cred);
/* do RPC call */
lock_slurmctld(job_write_lock);
(void) set_batch_job_sid(uid, launch_resp_msg->job_id,
launch_resp_msg->sid);
unlock_slurmctld(job_write_lock);
}
/* /*
* _slurmctld_shutdown - issue RPC to have slurmctld shutdown, knocks * _slurmctld_shutdown - issue RPC to have slurmctld shutdown, knocks
* loose an slurm_accept_msg_conn() if we have a thread hung there * loose an slurm_accept_msg_conn() if we have a thread hung there
......
...@@ -386,7 +386,7 @@ static void _dump_job_state(struct job_record *dump_job_ptr, Buf buffer) ...@@ -386,7 +386,7 @@ static void _dump_job_state(struct job_record *dump_job_ptr, Buf buffer)
pack32(dump_job_ptr->user_id, buffer); pack32(dump_job_ptr->user_id, buffer);
pack32(dump_job_ptr->time_limit, buffer); pack32(dump_job_ptr->time_limit, buffer);
pack32(dump_job_ptr->priority, buffer); pack32(dump_job_ptr->priority, buffer);
pack32(dump_job_ptr->batch_sid, buffer); pack32(dump_job_ptr->alloc_sid, buffer);
pack_time(dump_job_ptr->start_time, buffer); pack_time(dump_job_ptr->start_time, buffer);
pack_time(dump_job_ptr->end_time, buffer); pack_time(dump_job_ptr->end_time, buffer);
...@@ -400,6 +400,7 @@ static void _dump_job_state(struct job_record *dump_job_ptr, Buf buffer) ...@@ -400,6 +400,7 @@ static void _dump_job_state(struct job_record *dump_job_ptr, Buf buffer)
packstr(dump_job_ptr->nodes, buffer); packstr(dump_job_ptr->nodes, buffer);
packstr(dump_job_ptr->partition, buffer); packstr(dump_job_ptr->partition, buffer);
packstr(dump_job_ptr->name, buffer); packstr(dump_job_ptr->name, buffer);
packstr(dump_job_ptr->alloc_node, buffer);
/* Dump job details, if available */ /* Dump job details, if available */
detail_ptr = dump_job_ptr->details; detail_ptr = dump_job_ptr->details;
...@@ -426,11 +427,12 @@ static void _dump_job_state(struct job_record *dump_job_ptr, Buf buffer) ...@@ -426,11 +427,12 @@ static void _dump_job_state(struct job_record *dump_job_ptr, Buf buffer)
/* Unpack a job's state information from a buffer */ /* Unpack a job's state information from a buffer */
static int _load_job_state(Buf buffer) static int _load_job_state(Buf buffer)
{ {
uint32_t job_id, user_id, time_limit, priority, batch_sid; uint32_t job_id, user_id, time_limit, priority, alloc_sid;
time_t start_time, end_time; time_t start_time, end_time;
uint16_t job_state, next_step_id, details, batch_flag, step_flag; uint16_t job_state, next_step_id, details, batch_flag, step_flag;
uint16_t kill_on_node_fail, kill_on_step_done, name_len; uint16_t kill_on_node_fail, kill_on_step_done, name_len;
char *nodes = NULL, *partition = NULL, *name = NULL; char *nodes = NULL, *partition = NULL, *name = NULL;
char *alloc_node = NULL;
bitstr_t *node_bitmap = NULL; bitstr_t *node_bitmap = NULL;
struct job_record *job_ptr; struct job_record *job_ptr;
struct part_record *part_ptr; struct part_record *part_ptr;
...@@ -440,7 +442,7 @@ static int _load_job_state(Buf buffer) ...@@ -440,7 +442,7 @@ static int _load_job_state(Buf buffer)
safe_unpack32(&user_id, buffer); safe_unpack32(&user_id, buffer);
safe_unpack32(&time_limit, buffer); safe_unpack32(&time_limit, buffer);
safe_unpack32(&priority, buffer); safe_unpack32(&priority, buffer);
safe_unpack32(&batch_sid, buffer); safe_unpack32(&alloc_sid, buffer);
safe_unpack_time(&start_time, buffer); safe_unpack_time(&start_time, buffer);
safe_unpack_time(&end_time, buffer); safe_unpack_time(&end_time, buffer);
...@@ -454,6 +456,7 @@ static int _load_job_state(Buf buffer) ...@@ -454,6 +456,7 @@ static int _load_job_state(Buf buffer)
safe_unpackstr_xmalloc(&nodes, &name_len, buffer); safe_unpackstr_xmalloc(&nodes, &name_len, buffer);
safe_unpackstr_xmalloc(&partition, &name_len, buffer); safe_unpackstr_xmalloc(&partition, &name_len, buffer);
safe_unpackstr_xmalloc(&name, &name_len, buffer); safe_unpackstr_xmalloc(&name, &name_len, buffer);
safe_unpackstr_xmalloc(&alloc_node, &name_len, buffer);
/* validity test as possible */ /* validity test as possible */
if ((job_state >= JOB_END) || (batch_flag > 1)) { if ((job_state >= JOB_END) || (batch_flag > 1)) {
...@@ -511,7 +514,7 @@ static int _load_job_state(Buf buffer) ...@@ -511,7 +514,7 @@ static int _load_job_state(Buf buffer)
job_ptr->user_id = user_id; job_ptr->user_id = user_id;
job_ptr->time_limit = time_limit; job_ptr->time_limit = time_limit;
job_ptr->priority = priority; job_ptr->priority = priority;
job_ptr->batch_sid = batch_sid; job_ptr->alloc_sid = alloc_sid;
job_ptr->start_time = start_time; job_ptr->start_time = start_time;
job_ptr->end_time = end_time; job_ptr->end_time = end_time;
job_ptr->time_last_active = time(NULL); job_ptr->time_last_active = time(NULL);
...@@ -521,6 +524,8 @@ static int _load_job_state(Buf buffer) ...@@ -521,6 +524,8 @@ static int _load_job_state(Buf buffer)
xfree(name); xfree(name);
job_ptr->nodes = nodes; job_ptr->nodes = nodes;
nodes = NULL; /* reused, nothing left to free */ nodes = NULL; /* reused, nothing left to free */
job_ptr->alloc_node = alloc_node;
alloc_node = NULL; /* reused, nothing left to free */
job_ptr->node_bitmap = node_bitmap; job_ptr->node_bitmap = node_bitmap;
xfree(partition); xfree(partition);
job_ptr->kill_on_node_fail = kill_on_node_fail; job_ptr->kill_on_node_fail = kill_on_node_fail;
...@@ -541,6 +546,7 @@ static int _load_job_state(Buf buffer) ...@@ -541,6 +546,7 @@ static int _load_job_state(Buf buffer)
xfree(nodes); xfree(nodes);
xfree(partition); xfree(partition);
xfree(name); xfree(name);
xfree(alloc_node);
FREE_NULL_BITMAP(node_bitmap); FREE_NULL_BITMAP(node_bitmap);
return SLURM_FAILURE; return SLURM_FAILURE;
} }
...@@ -963,9 +969,12 @@ void dump_job_desc(job_desc_msg_t * job_specs) ...@@ -963,9 +969,12 @@ void dump_job_desc(job_desc_msg_t * job_specs)
job_specs->environment[1], job_specs->environment[1],
job_specs->environment[2]); job_specs->environment[2]);
debug3(" in=%s out=%s err=%s work_dir=%s", debug3(" in=%s out=%s err=%s",
job_specs->in, job_specs->out, job_specs->err, job_specs->in, job_specs->out, job_specs->err);
job_specs->work_dir);
debug3(" work_dir=%s alloc_node:sid=%s:%u",
job_specs->work_dir,
job_specs->alloc_node, job_specs->alloc_sid);
} }
...@@ -1774,9 +1783,11 @@ _copy_job_desc_to_job_record(job_desc_msg_t * job_desc, ...@@ -1774,9 +1783,11 @@ _copy_job_desc_to_job_record(job_desc_msg_t * job_desc,
strncpy(job_ptr->name, job_desc->name, strncpy(job_ptr->name, job_desc->name,
sizeof(job_ptr->name)); sizeof(job_ptr->name));
} }
job_ptr->user_id = (uid_t) job_desc->user_id; job_ptr->user_id = (uid_t) job_desc->user_id;
job_ptr->job_state = JOB_PENDING; job_ptr->job_state = JOB_PENDING;
job_ptr->time_limit = job_desc->time_limit; job_ptr->time_limit = job_desc->time_limit;
job_ptr->alloc_sid = job_desc->alloc_sid;
job_ptr->alloc_node = xstrdup(job_desc->alloc_node);
if ((job_desc->priority != if ((job_desc->priority !=
NO_VAL) /* also check submit UID is root */ ) NO_VAL) /* also check submit UID is root */ )
job_ptr->priority = job_desc->priority; job_ptr->priority = job_desc->priority;
...@@ -2020,6 +2031,7 @@ static void _list_delete_job(void *job_entry) ...@@ -2020,6 +2031,7 @@ static void _list_delete_job(void *job_entry)
delete_job_details(job_record_point); delete_job_details(job_record_point);
xfree(job_record_point->alloc_node);
xfree(job_record_point->nodes); xfree(job_record_point->nodes);
FREE_NULL_BITMAP(job_record_point->node_bitmap); FREE_NULL_BITMAP(job_record_point->node_bitmap);
xfree(job_record_point->cpus_per_node); xfree(job_record_point->cpus_per_node);
...@@ -2145,7 +2157,7 @@ void pack_job(struct job_record *dump_job_ptr, Buf buffer) ...@@ -2145,7 +2157,7 @@ void pack_job(struct job_record *dump_job_ptr, Buf buffer)
pack16((uint16_t) dump_job_ptr->job_state, buffer); pack16((uint16_t) dump_job_ptr->job_state, buffer);
pack16((uint16_t) dump_job_ptr->batch_flag, buffer); pack16((uint16_t) dump_job_ptr->batch_flag, buffer);
pack32(dump_job_ptr->batch_sid, buffer); pack32(dump_job_ptr->alloc_sid, buffer);
pack32(dump_job_ptr->time_limit, buffer); pack32(dump_job_ptr->time_limit, buffer);
pack_time(dump_job_ptr->start_time, buffer); pack_time(dump_job_ptr->start_time, buffer);
...@@ -2155,6 +2167,7 @@ void pack_job(struct job_record *dump_job_ptr, Buf buffer) ...@@ -2155,6 +2167,7 @@ void pack_job(struct job_record *dump_job_ptr, Buf buffer)
packstr(dump_job_ptr->nodes, buffer); packstr(dump_job_ptr->nodes, buffer);
packstr(dump_job_ptr->partition, buffer); packstr(dump_job_ptr->partition, buffer);
packstr(dump_job_ptr->name, buffer); packstr(dump_job_ptr->name, buffer);
packstr(dump_job_ptr->alloc_node, buffer);
safe_pack_bit_fmt(dump_job_ptr->node_bitmap, safe_pack_bit_fmt(dump_job_ptr->node_bitmap,
MAX_STR_PACK, buffer); MAX_STR_PACK, buffer);
...@@ -2948,35 +2961,3 @@ _xmit_new_end_time(struct job_record *job_ptr) ...@@ -2948,35 +2961,3 @@ _xmit_new_end_time(struct job_record *job_ptr)
return; return;
} }
/*
* set_batch_job_sid - set the batch_sid for a specified job_id
* IN uid - originating uid of the RPC
* IN job_id - the id of a batch job
* IN batch_sid - local session id for the job
* RET int - 0 or an error code
* global: job_list - global list of job entries is updated
*/
int set_batch_job_sid(uid_t uid, uint32_t job_id, uint32_t batch_sid)
{
struct job_record *job_ptr;
job_ptr = find_job_record(job_id);
if (job_ptr == NULL) {
error("set_batch_job_sid: job_id %u does not exist.", job_id);
return ESLURM_INVALID_JOB_ID;
}
if ((uid != 0) &&
(uid != getuid()) &&
(uid != job_ptr->user_id)) {
error("Security violation, RESPONSE_BATCH_JOB_LAUNCH RPC from uid %d",
uid);
return ESLURM_USER_ID_MISSING;
}
/* debug3("set_batch_job_sid:%u,%u,%u",uid, job_id, batch_sid); */
job_ptr->batch_sid = batch_sid;
return SLURM_SUCCESS;
}
...@@ -242,7 +242,7 @@ struct job_record { ...@@ -242,7 +242,7 @@ struct job_record {
uint16_t kill_on_step_done; /* 1 if job should be killed when uint16_t kill_on_step_done; /* 1 if job should be killed when
the job step completes, 2 if kill the job step completes, 2 if kill
in progress */ in progress */
char *nodes; /* list of nodes allocated to job */ char *nodes; /* list of nodes allocated to job */
bitstr_t *node_bitmap; /* bitmap of nodes allocated to job */ bitstr_t *node_bitmap; /* bitmap of nodes allocated to job */
uint32_t time_limit; /* time_limit minutes or INFINITE */ uint32_t time_limit; /* time_limit minutes or INFINITE */
time_t start_time; /* time execution begins, time_t start_time; /* time execution begins,
...@@ -258,7 +258,8 @@ struct job_record { ...@@ -258,7 +258,8 @@ struct job_record {
uint32_t *cpus_per_node; /* array of cpus per node allocated */ uint32_t *cpus_per_node; /* array of cpus per node allocated */
uint32_t *cpu_count_reps; /* array of consecutive nodes with uint32_t *cpu_count_reps; /* array of consecutive nodes with
same cpu count */ same cpu count */
uint32_t batch_sid; /* local session id for batch job */ uint32_t alloc_sid; /* local sid making resource alloc */
char *alloc_node; /* local node making resource alloc */
uint16_t next_step_id; /* next step id to be used */ uint16_t next_step_id; /* next step id to be used */
uint16_t node_cnt; /* count of nodes allocated to job */ uint16_t node_cnt; /* count of nodes allocated to job */
slurm_addr *node_addr; /* addresses of the nodes allocated to slurm_addr *node_addr; /* addresses of the nodes allocated to
...@@ -877,16 +878,6 @@ extern int schedule (void); ...@@ -877,16 +878,6 @@ extern int schedule (void);
*/ */
extern int select_nodes (struct job_record *job_ptr, bool test_only); extern int select_nodes (struct job_record *job_ptr, bool test_only);
/*
* set_batch_job_sid - set the batch_sid for a specified job_id
* IN uid - originating uid of the RPC
* IN job_id - the id of a batch job
* IN batch_sid - local session id for the job
* RET int - 0 or an error code
* global: job_list - global list of job entries is updated
*/
extern int set_batch_job_sid(uid_t uid, uint32_t job_id, uint32_t batch_sid);
/* set_node_down - make the specified node's state DOWN /* set_node_down - make the specified node's state DOWN
* IN name - name of the node */ * IN name - name of the node */
extern void set_node_down (char *name); extern void set_node_down (char *name);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment