Newer
Older
NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_JOB_WILL_RUN");
/* do RPC call */
dump_job_desc(job_desc_msg);
uid = g_slurm_auth_get_uid(msg->cred);
if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, JOB_WILL_RUN RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(job_write_lock);
error_code = job_allocate(job_desc_msg, &job_id,
&node_list_ptr, &num_cpu_groups,
&cpus_per_node, &cpu_count_reps,
false, true, true, uid, NULL,
NULL);
unlock_slurmctld(job_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_job_will_run: %s",
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_job_will_run success %s", TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
}
/* _slurm_rpc_node_registration - process RPC to determine if a node's
* actual configuration satisfies the configured specification */
static void _slurm_rpc_node_registration(slurm_msg_t * msg)
{
/* init */
DEF_TIMERS;
int error_code = SLURM_SUCCESS;
slurm_node_registration_status_msg_t *node_reg_stat_msg =
(slurm_node_registration_status_msg_t *) msg->data;
/* Locks: Write job and node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: MESSAGE_NODE_REGISTRATION_STATUS");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, NODE_REGISTER RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(job_write_lock);
validate_jobs_on_node(node_reg_stat_msg->node_name,
&node_reg_stat_msg->job_count,
node_reg_stat_msg->job_id,
node_reg_stat_msg->step_id);
error_code =
validate_node_specs(node_reg_stat_msg->node_name,
node_reg_stat_msg->cpus,
node_reg_stat_msg->
real_memory_size,
node_reg_stat_msg->
temporary_disk_space,
node_reg_stat_msg->job_count,
node_reg_stat_msg->status);
unlock_slurmctld(job_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
error("_slurm_rpc_node_registration node=%s: %s",
node_reg_stat_msg->node_name,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_node_registration complete for %s %s",
node_reg_stat_msg->node_name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
}
/* _slurm_rpc_old_job_alloc - process RPC to get details on existing job */
static void _slurm_rpc_old_job_alloc(slurm_msg_t * msg)
{
int error_code = SLURM_SUCCESS;
slurm_msg_t response_msg;
DEF_TIMERS;
old_job_alloc_msg_t *job_desc_msg =
(old_job_alloc_msg_t *) msg->data;
char *node_list_ptr = NULL;
uint16_t num_cpu_groups = 0;
uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
resource_allocation_response_msg_t alloc_msg;
/* Locks: Read job, read node */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK };
uint16_t node_cnt;
slurm_addr *node_addr;
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_OLD_JOB_RESOURCE_ALLOCATION");
/* do RPC call */
uid = g_slurm_auth_get_uid(msg->cred);
if ( (uid != job_desc_msg->uid) && (!_is_super_user(uid)) ) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, RESOURCE_ALLOCATE from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(job_read_lock);
error_code = old_job_info(job_desc_msg->uid,
job_desc_msg->job_id,
&node_list_ptr, &num_cpu_groups,
&cpus_per_node, &cpu_count_reps,
&node_cnt, &node_addr);
unlock_slurmctld(job_read_lock);
END_TIMER;
}
/* return result */
if (error_code) {
debug2("_slurm_rpc_old_job_alloc: JobId=%u, uid=%u: %s",
job_desc_msg->job_id, job_desc_msg->uid,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_old_job_alloc JobId=%u NodeList=%s %s",
job_desc_msg->job_id, node_list_ptr, TIME_STR);
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
/* send job_ID and node_name_ptr */
alloc_msg.job_id = job_desc_msg->job_id;
alloc_msg.node_list = node_list_ptr;
alloc_msg.num_cpu_groups = num_cpu_groups;
alloc_msg.cpus_per_node = cpus_per_node;
alloc_msg.cpu_count_reps = cpu_count_reps;
alloc_msg.node_cnt = node_cnt;
alloc_msg.node_addr = node_addr;
response_msg.msg_type = RESPONSE_RESOURCE_ALLOCATION;
response_msg.data = &alloc_msg;
slurm_send_node_msg(msg->conn_fd, &response_msg);
}
}
/* _slurm_rpc_ping - process ping RPC */
static void _slurm_rpc_ping(slurm_msg_t * msg)
{
/* We could authenticate here, if desired */
/* return result */
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* _slurm_rpc_reconfigure_controller - process RPC to re-initialize
* slurmctld from configuration file */
static void _slurm_rpc_reconfigure_controller(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
/* Locks: Write configuration, job, node and partition */
slurmctld_lock_t config_write_lock = {
WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_RECONFIGURE");
uid = g_slurm_auth_get_uid(msg->cred);\
if (!_is_super_user(uid)) {
error("Security violation, RECONFIGURE RPC from uid=%u",
(unsigned int) uid);
error_code = ESLURM_USER_ID_MISSING;
}
/* do RPC call */
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(config_write_lock);
error_code = read_slurm_conf(0);
if (error_code == SLURM_SUCCESS)
msg_to_slurmd(REQUEST_RECONFIGURE);
unlock_slurmctld(config_write_lock);
}
if (error_code == SLURM_SUCCESS) { /* Stuff to do after unlock */
_update_cred_key();
if (slurmctld_config.daemonize &&
chdir(slurmctld_conf.state_save_location) < 0) {
error("chdir to %s error %m",
slurmctld_conf.state_save_location);
}
}
END_TIMER;
/* return result */
if (error_code) {
error("_slurm_rpc_reconfigure_controller: %s",
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info("_slurm_rpc_reconfigure_controller: completed %s",
TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
schedule();
save_all_state();
}
}
/* _slurm_rpc_shutdown_controller - process RPC to shutdown slurmctld */
static void _slurm_rpc_shutdown_controller(slurm_msg_t * msg)
{
int error_code = SLURM_SUCCESS, i;
uint16_t core_arg = 0;
shutdown_msg_t *shutdown_msg = (shutdown_msg_t *) msg->data;
uid_t uid;
/* Locks: Read node */
slurmctld_lock_t node_read_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error("Security violation, SHUTDOWN RPC from uid=%u",
(unsigned int) uid);
error_code = ESLURM_USER_ID_MISSING;
}
if (error_code);
else if (msg->msg_type == REQUEST_CONTROL) {
info("Performing RPC: REQUEST_CONTROL");
/* resume backup mode */
slurmctld_config.resume_backup = true;
} else {
debug2("Performing RPC: REQUEST_SHUTDOWN");
core_arg = shutdown_msg->core;
}
/* do RPC call */
if (error_code);
else if (core_arg)
info("performing immeditate shutdown without state save");
else if (slurmctld_config.shutdown_time)
debug2("shutdown RPC issued when already in progress");
else {
if (msg->msg_type == REQUEST_SHUTDOWN) {
/* This means (msg->msg_type != REQUEST_CONTROL) */
lock_slurmctld(node_read_lock);
msg_to_slurmd(REQUEST_SHUTDOWN);
unlock_slurmctld(node_read_lock);
}
if (slurmctld_config.thread_id_sig) /* signal clean-up */
pthread_kill(slurmctld_config.thread_id_sig, SIGTERM);
else {
error("thread_id_sig undefined, hard shutdown");
slurmctld_config.shutdown_time = time(NULL);
/* send REQUEST_SHUTDOWN_IMMEDIATE RPC */
slurmctld_shutdown();
}
}
if (msg->msg_type == REQUEST_CONTROL) {
/* Wait for workload to dry up before sending reply.
* One thread should remain, this one. */
for (i = 1; i < CONTROL_TIMEOUT; i++) {
if (slurmctld_config.server_thread_count <= 1)
break;
sleep(1);
}
if (slurmctld_config.server_thread_count > 1)
error("REQUEST_CONTROL reply with %d active threads",
slurmctld_config.server_thread_count);
/* save_all_state(); performed by _slurmctld_background */
}
slurm_send_rc_msg(msg, error_code);
if ((error_code == SLURM_SUCCESS) && core_arg)
fatal("Aborting per RPC request");
}
/* _slurm_rpc_shutdown_controller_immediate - process RPC to shutdown
* slurmctld */
static void _slurm_rpc_shutdown_controller_immediate(slurm_msg_t * msg)
{
int error_code = SLURM_SUCCESS;
uid_t uid;
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
("Security violation, SHUTDOWN_IMMEDIATE RPC from uid=%u",
(unsigned int) uid);
error_code = ESLURM_USER_ID_MISSING;
}
/* do RPC call */
/* No op: just used to knock loose accept RPC thread */
if (error_code == SLURM_SUCCESS)
debug("Performing RPC: REQUEST_SHUTDOWN_IMMEDIATE");
}
/* _slurm_rpc_submit_batch_job - process RPC to submit a batch job */
static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
uint32_t job_id;
slurm_msg_t response_msg;
submit_response_msg_t submit_msg;
job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
/* Locks: Write job, read node, read partition */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_SUBMIT_BATCH_JOB");
/* do RPC call */
dump_job_desc(job_desc_msg);
uid = g_slurm_auth_get_uid(msg->cred);
if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, SUBMIT_JOB from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(job_write_lock);
error_code = job_allocate(job_desc_msg, &job_id,
(char **) NULL,
(uint16_t *) NULL,
(uint32_t **) NULL,
(uint32_t **) NULL, false, false,
false, uid, NULL, NULL);
unlock_slurmctld(job_write_lock);
END_TIMER;
}
/* return result */
if ((error_code != SLURM_SUCCESS) &&
(error_code != ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE)) {
info("_slurm_rpc_submit_batch_job: %s",
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info(
"_slurm_rpc_submit_batch_job JobId=%u %s",
job_id, TIME_STR);
/* send job_ID */
submit_msg.job_id = job_id;
submit_msg.error_code = error_code;
response_msg.msg_type = RESPONSE_SUBMIT_BATCH_JOB;
response_msg.data = &submit_msg;
slurm_send_node_msg(msg->conn_fd, &response_msg);
schedule(); /* has own locks */
(void) dump_all_job_state(); /* has own locks */
}
}
/* _slurm_rpc_update_job - process RPC to update the configuration of a
* job (e.g. priority) */
static void _slurm_rpc_update_job(slurm_msg_t * msg)
{
/* init */
int error_code;
DEF_TIMERS;
job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
/* Locks: Write job, read node, read partition */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_UPDATE_JOB");
/* do RPC call */
uid = g_slurm_auth_get_uid(msg->cred);
lock_slurmctld(job_write_lock);
error_code = update_job(job_desc_msg, uid);
unlock_slurmctld(job_write_lock);
END_TIMER;
/* return result */
if (error_code) {
error("_slurm_rpc_update_job JobId=%u: %s",
job_desc_msg->job_id, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_update_job complete JobId=%u %s",
job_desc_msg->job_id, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
/* Below functions provide their own locking */
schedule();
(void) dump_all_job_state();
}
}
/* _slurm_rpc_update_node - process RPC to update the configuration of a
* node (e.g. UP/DOWN) */
static void _slurm_rpc_update_node(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_node_msg_t *update_node_msg_ptr =
(update_node_msg_t *) msg->data;
/* Locks: Write job and write node */
slurmctld_lock_t node_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_UPDATE_NODE");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, UPDATE_NODE RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(node_write_lock);
error_code = update_node(update_node_msg_ptr);
unlock_slurmctld(node_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_update_node for %s: %s",
update_node_msg_ptr->node_names,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_update_node complete for %s %s",
update_node_msg_ptr->node_names, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* Below functions provide their own locks */
if (schedule())
(void) dump_all_job_state();
(void) dump_all_node_state();
}
/* _slurm_rpc_update_partition - process RPC to update the configuration
* of a partition (e.g. UP/DOWN) */
static void _slurm_rpc_update_partition(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_part_msg_t *part_desc_ptr = (update_part_msg_t *) msg->data;
/* Locks: Read node, write partition */
slurmctld_lock_t part_write_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, WRITE_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_UPDATE_PARTITION");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error
("Security violation, UPDATE_PARTITION RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(part_write_lock);
error_code = update_part(part_desc_ptr);
unlock_slurmctld(part_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_update_partition partition=%s: %s",
part_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_update_partition complete for %s %s",
part_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
/* NOTE: These functions provide their own locks */
(void) dump_all_part_state();
if (schedule())
(void) dump_all_job_state();
}
}
/* Reset the job credential key based upon configuration parameters */
static void _update_cred_key(void)
{
slurm_cred_ctx_key_update(slurmctld_config.cred_ctx,
slurmctld_conf.job_credential_private_key);
}