Newer
Older
unlock_slurmctld(job_read_lock);
END_TIMER;
/* job_id:step_id not found or otherwise *\
\* error message is printed elsewhere */
debug2("_slurm_rpc_job_step_get_info: %s",
slurm_strerror(error_code));
free_buf(buffer);
} else {
resp_buffer_size = get_buf_offset(buffer);
resp_buffer = xfer_buf_data(buffer);
debug2("_slurm_rpc_job_step_get_info size=%d %s",
resp_buffer_size, TIME_STR);
}
if (error_code)
slurm_send_rc_msg(msg, error_code);
else {
slurm_msg_t response_msg;
response_msg.address = msg->address;
response_msg.msg_type = RESPONSE_JOB_STEP_INFO;
response_msg.data = resp_buffer;
response_msg.data_size = resp_buffer_size;
slurm_send_node_msg(msg->conn_fd, &response_msg);
}
}
/* _slurm_rpc_job_will_run - process RPC to determine if job with given
* configuration can be initiated */
static void _slurm_rpc_job_will_run(slurm_msg_t * msg)
{
/* init */
DEF_TIMERS;
int error_code = SLURM_SUCCESS;
uint16_t num_cpu_groups = 0;
uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
uint32_t job_id;
job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
char *node_list_ptr = NULL;
/* Locks: Write job, read node, read partition */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_JOB_WILL_RUN");
/* do RPC call */
dump_job_desc(job_desc_msg);
uid = g_slurm_auth_get_uid(msg->cred);
if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, JOB_WILL_RUN RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(job_write_lock);
error_code = job_allocate(job_desc_msg, &job_id,
&node_list_ptr, &num_cpu_groups,
&cpus_per_node, &cpu_count_reps,
false, true, true, uid, NULL,
NULL);
unlock_slurmctld(job_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_job_will_run: %s",
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_job_will_run success %s", TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
}
/* _slurm_rpc_node_registration - process RPC to determine if a node's
* actual configuration satisfies the configured specification */
static void _slurm_rpc_node_registration(slurm_msg_t * msg)
{
/* init */
DEF_TIMERS;
int error_code = SLURM_SUCCESS;
slurm_node_registration_status_msg_t *node_reg_stat_msg =
(slurm_node_registration_status_msg_t *) msg->data;
/* Locks: Write job and node */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: MESSAGE_NODE_REGISTRATION_STATUS");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, NODE_REGISTER RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(job_write_lock);
validate_jobs_on_node(node_reg_stat_msg->node_name,
&node_reg_stat_msg->job_count,
node_reg_stat_msg->job_id,
node_reg_stat_msg->step_id);
error_code =
validate_node_specs(node_reg_stat_msg->node_name,
node_reg_stat_msg->cpus,
node_reg_stat_msg->
real_memory_size,
node_reg_stat_msg->
temporary_disk_space,
node_reg_stat_msg->job_count,
node_reg_stat_msg->status);
unlock_slurmctld(job_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
error("_slurm_rpc_node_registration node=%s: %s",
node_reg_stat_msg->node_name,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_node_registration complete for %s %s",
node_reg_stat_msg->node_name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
}
/* _slurm_rpc_old_job_alloc - process RPC to get details on existing job */
static void _slurm_rpc_old_job_alloc(slurm_msg_t * msg)
{
int error_code = SLURM_SUCCESS;
slurm_msg_t response_msg;
DEF_TIMERS;
old_job_alloc_msg_t *job_desc_msg =
(old_job_alloc_msg_t *) msg->data;
char *node_list_ptr = NULL;
uint16_t num_cpu_groups = 0;
uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
resource_allocation_response_msg_t alloc_msg;
/* Locks: Read job, read node */
slurmctld_lock_t job_read_lock = {
NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK };
uint16_t node_cnt;
slurm_addr *node_addr;
uid_t uid;
bool do_unlock = false;
START_TIMER;
debug2("Processing RPC: REQUEST_OLD_JOB_RESOURCE_ALLOCATION");
/* do RPC call */
uid = g_slurm_auth_get_uid(msg->cred);
if ( (uid != job_desc_msg->uid) && (!_is_super_user(uid)) ) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, RESOURCE_ALLOCATE from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
do_unlock = true;
lock_slurmctld(job_read_lock);
error_code = old_job_info(job_desc_msg->uid,
job_desc_msg->job_id,
&node_list_ptr, &num_cpu_groups,
&cpus_per_node, &cpu_count_reps,
&node_cnt, &node_addr);
END_TIMER;
}
/* return result */
if (error_code) {
if (do_unlock)
unlock_slurmctld(job_read_lock);
debug2("_slurm_rpc_old_job_alloc: JobId=%u, uid=%u: %s",
job_desc_msg->job_id, job_desc_msg->uid,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_old_job_alloc JobId=%u NodeList=%s %s",
job_desc_msg->job_id, node_list_ptr, TIME_STR);
/* send job_ID and node_name_ptr */
alloc_msg.cpu_count_reps = xmalloc(sizeof(uint32_t) *
num_cpu_groups);
memcpy(alloc_msg.cpu_count_reps, cpu_count_reps,
(sizeof(uint32_t) * num_cpu_groups));
alloc_msg.cpus_per_node = xmalloc(sizeof(uint32_t) *
num_cpu_groups);
memcpy(alloc_msg.cpus_per_node, cpus_per_node,
(sizeof(uint32_t) * num_cpu_groups));
alloc_msg.error_code = error_code;
alloc_msg.job_id = job_desc_msg->job_id;
alloc_msg.node_addr = xmalloc(sizeof(slurm_addr) *
node_cnt);
memcpy(alloc_msg.node_addr, node_addr,
(sizeof(slurm_addr) * node_cnt));
alloc_msg.node_cnt = node_cnt;
alloc_msg.node_list = xstrdup(node_list_ptr);
alloc_msg.num_cpu_groups = num_cpu_groups;
unlock_slurmctld(job_read_lock);
response_msg.msg_type = RESPONSE_RESOURCE_ALLOCATION;
response_msg.data = &alloc_msg;
slurm_send_node_msg(msg->conn_fd, &response_msg);
xfree(alloc_msg.cpu_count_reps);
xfree(alloc_msg.cpus_per_node);
xfree(alloc_msg.node_addr);
xfree(alloc_msg.node_list);
}
}
/* _slurm_rpc_ping - process ping RPC */
static void _slurm_rpc_ping(slurm_msg_t * msg)
{
/* We could authenticate here, if desired */
/* return result */
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* _slurm_rpc_reconfigure_controller - process RPC to re-initialize
* slurmctld from configuration file */
static void _slurm_rpc_reconfigure_controller(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
/* Locks: Write configuration, job, node and partition */
slurmctld_lock_t config_write_lock = {
WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK };
/* Locks: Read node */
slurmctld_lock_t node_read_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_RECONFIGURE");
uid = g_slurm_auth_get_uid(msg->cred);\
if (!_is_super_user(uid)) {
error("Security violation, RECONFIGURE RPC from uid=%u",
(unsigned int) uid);
error_code = ESLURM_USER_ID_MISSING;
}
/* do RPC call */
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(config_write_lock);
error_code = read_slurm_conf(0);
unlock_slurmctld(config_write_lock);
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(node_read_lock);
msg_to_slurmd(REQUEST_RECONFIGURE);
unlock_slurmctld(node_read_lock);
}
}
if (error_code == SLURM_SUCCESS) { /* Stuff to do after unlock */
_update_cred_key();
if (slurmctld_config.daemonize &&
chdir(slurmctld_conf.state_save_location) < 0) {
error("chdir to %s error %m",
slurmctld_conf.state_save_location);
}
}
END_TIMER;
/* return result */
if (error_code) {
error("_slurm_rpc_reconfigure_controller: %s",
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info("_slurm_rpc_reconfigure_controller: completed %s",
TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
schedule(); /* has its own locks */
save_all_state();
}
}
/* _slurm_rpc_shutdown_controller - process RPC to shutdown slurmctld */
static void _slurm_rpc_shutdown_controller(slurm_msg_t * msg)
{
int error_code = SLURM_SUCCESS, i;
uint16_t core_arg = 0;
shutdown_msg_t *shutdown_msg = (shutdown_msg_t *) msg->data;
uid_t uid;
/* Locks: Read node */
slurmctld_lock_t node_read_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error("Security violation, SHUTDOWN RPC from uid=%u",
(unsigned int) uid);
error_code = ESLURM_USER_ID_MISSING;
}
if (error_code);
else if (msg->msg_type == REQUEST_CONTROL) {
info("Performing RPC: REQUEST_CONTROL");
/* resume backup mode */
slurmctld_config.resume_backup = true;
} else {
debug2("Performing RPC: REQUEST_SHUTDOWN");
core_arg = shutdown_msg->core;
}
/* do RPC call */
if (error_code);
else if (core_arg)
info("performing immeditate shutdown without state save");
else if (slurmctld_config.shutdown_time)
debug2("shutdown RPC issued when already in progress");
else {
if (msg->msg_type == REQUEST_SHUTDOWN) {
/* This means (msg->msg_type != REQUEST_CONTROL) */
lock_slurmctld(node_read_lock);
msg_to_slurmd(REQUEST_SHUTDOWN);
unlock_slurmctld(node_read_lock);
}
if (slurmctld_config.thread_id_sig) /* signal clean-up */
pthread_kill(slurmctld_config.thread_id_sig, SIGTERM);
else {
error("thread_id_sig undefined, hard shutdown");
slurmctld_config.shutdown_time = time(NULL);
/* send REQUEST_SHUTDOWN_IMMEDIATE RPC */
slurmctld_shutdown();
}
}
if (msg->msg_type == REQUEST_CONTROL) {
/* Wait for workload to dry up before sending reply.
* One thread should remain, this one. */
for (i = 1; i < CONTROL_TIMEOUT; i++) {
if (slurmctld_config.server_thread_count <= 1)
break;
sleep(1);
}
if (slurmctld_config.server_thread_count > 1)
error("REQUEST_CONTROL reply with %d active threads",
slurmctld_config.server_thread_count);
/* save_all_state(); performed by _slurmctld_background */
}
slurm_send_rc_msg(msg, error_code);
if ((error_code == SLURM_SUCCESS) && core_arg) {
info("Aborting per RPC request");
abort();
}
}
/* _slurm_rpc_shutdown_controller_immediate - process RPC to shutdown
* slurmctld */
static void _slurm_rpc_shutdown_controller_immediate(slurm_msg_t * msg)
{
int error_code = SLURM_SUCCESS;
uid_t uid;
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
("Security violation, SHUTDOWN_IMMEDIATE RPC from uid=%u",
(unsigned int) uid);
error_code = ESLURM_USER_ID_MISSING;
}
/* do RPC call */
/* No op: just used to knock loose accept RPC thread */
if (error_code == SLURM_SUCCESS)
debug("Performing RPC: REQUEST_SHUTDOWN_IMMEDIATE");
}
/* _slurm_rpc_submit_batch_job - process RPC to submit a batch job */
static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
uint32_t job_id;
slurm_msg_t response_msg;
submit_response_msg_t submit_msg;
job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
/* Locks: Write job, read node, read partition */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_SUBMIT_BATCH_JOB");
/* do RPC call */
dump_job_desc(job_desc_msg);
uid = g_slurm_auth_get_uid(msg->cred);
if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, SUBMIT_JOB from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
lock_slurmctld(job_write_lock);
error_code = job_allocate(job_desc_msg, &job_id,
(char **) NULL,
(uint16_t *) NULL,
(uint32_t **) NULL,
(uint32_t **) NULL, false, false,
false, uid, NULL, NULL);
unlock_slurmctld(job_write_lock);
END_TIMER;
}
/* return result */
if ((error_code != SLURM_SUCCESS) &&
(error_code != ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE)) {
info("_slurm_rpc_submit_batch_job: %s",
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info(
"_slurm_rpc_submit_batch_job JobId=%u %s",
job_id, TIME_STR);
/* send job_ID */
submit_msg.job_id = job_id;
submit_msg.error_code = error_code;
response_msg.msg_type = RESPONSE_SUBMIT_BATCH_JOB;
response_msg.data = &submit_msg;
slurm_send_node_msg(msg->conn_fd, &response_msg);
schedule(); /* has own locks */
(void) dump_all_job_state(); /* has own locks */
}
}
/* _slurm_rpc_update_job - process RPC to update the configuration of a
* job (e.g. priority) */
static void _slurm_rpc_update_job(slurm_msg_t * msg)
{
/* init */
int error_code;
DEF_TIMERS;
job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
/* Locks: Write job, read node, read partition */
slurmctld_lock_t job_write_lock = {
NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_UPDATE_JOB");
/* do RPC call */
uid = g_slurm_auth_get_uid(msg->cred);
lock_slurmctld(job_write_lock);
error_code = update_job(job_desc_msg, uid);
unlock_slurmctld(job_write_lock);
END_TIMER;
/* return result */
if (error_code) {
error("_slurm_rpc_update_job JobId=%u: %s",
job_desc_msg->job_id, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_update_job complete JobId=%u %s",
job_desc_msg->job_id, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
/* Below functions provide their own locking */
schedule();
(void) dump_all_job_state();
}
}
/* _slurm_rpc_update_node - process RPC to update the configuration of a
* node (e.g. UP/DOWN) */
static void _slurm_rpc_update_node(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_node_msg_t *update_node_msg_ptr =
(update_node_msg_t *) msg->data;
/* Locks: Write job and write node */
slurmctld_lock_t node_write_lock = {
NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_UPDATE_NODE");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error("Security violation, UPDATE_NODE RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(node_write_lock);
error_code = update_node(update_node_msg_ptr);
unlock_slurmctld(node_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_update_node for %s: %s",
update_node_msg_ptr->node_names,
slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_update_node complete for %s %s",
update_node_msg_ptr->node_names, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
}
/* Below functions provide their own locks */
if (schedule())
(void) dump_all_job_state();
(void) dump_all_node_state();
}
/* _slurm_rpc_update_partition - process RPC to update the configuration
* of a partition (e.g. UP/DOWN) */
static void _slurm_rpc_update_partition(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
update_part_msg_t *part_desc_ptr = (update_part_msg_t *) msg->data;
/* Locks: Read node, write partition */
slurmctld_lock_t part_write_lock = {
NO_LOCK, NO_LOCK, READ_LOCK, WRITE_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_UPDATE_PARTITION");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error
("Security violation, UPDATE_PARTITION RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(part_write_lock);
error_code = update_part(part_desc_ptr);
unlock_slurmctld(part_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_update_partition partition=%s: %s",
part_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
debug2("_slurm_rpc_update_partition complete for %s %s",
part_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
/* NOTE: These functions provide their own locks */
(void) dump_all_part_state();
if (schedule())
(void) dump_all_job_state();
}
}
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
/* _slurm_rpc_delete_partition - process RPC to delete a partition */
static void _slurm_rpc_delete_partition(slurm_msg_t * msg)
{
/* init */
int error_code = SLURM_SUCCESS;
DEF_TIMERS;
delete_part_msg_t *part_desc_ptr = (delete_part_msg_t *) msg->data;
/* Locks: Read job, write partition */
slurmctld_lock_t part_write_lock = {
NO_LOCK, WRITE_LOCK, NO_LOCK, WRITE_LOCK };
uid_t uid;
START_TIMER;
debug2("Processing RPC: REQUEST_DELETE_PARTITION");
uid = g_slurm_auth_get_uid(msg->cred);
if (!_is_super_user(uid)) {
error_code = ESLURM_USER_ID_MISSING;
error
("Security violation, DELETE_PARTITION RPC from uid=%u",
(unsigned int) uid);
}
if (error_code == SLURM_SUCCESS) {
/* do RPC call */
lock_slurmctld(part_write_lock);
error_code = delete_partition(part_desc_ptr);
unlock_slurmctld(part_write_lock);
END_TIMER;
}
/* return result */
if (error_code) {
info("_slurm_rpc_delete_partition partition=%s: %s",
part_desc_ptr->name, slurm_strerror(error_code));
slurm_send_rc_msg(msg, error_code);
} else {
info("_slurm_rpc_delete_partition complete for %s %s",
part_desc_ptr->name, TIME_STR);
slurm_send_rc_msg(msg, SLURM_SUCCESS);
/* NOTE: These functions provide their own locks */
(void) dump_all_job_state();
(void) dump_all_part_state();
if (schedule())
(void) dump_all_job_state();
}
}
/* Reset the job credential key based upon configuration parameters */
static void _update_cred_key(void)
{
slurm_cred_ctx_key_update(slurmctld_config.cred_ctx,
slurmctld_conf.job_credential_private_key);
}