Skip to content
Snippets Groups Projects
proc_req.c 48.9 KiB
Newer Older
		slurm_send_node_msg(msg->conn_fd, &response_msg);
		xfree(resp_buffer);
	}
}

/* _slurm_rpc_job_will_run - process RPC to determine if job with given 
 *	configuration can be initiated */
static void _slurm_rpc_job_will_run(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	uint16_t num_cpu_groups = 0;
	uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
	uint32_t job_id;
	job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
	char *node_list_ptr = NULL;
	/* Locks: Write job, read node, read partition */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_JOB_WILL_RUN");

	/* do RPC call */
	dump_job_desc(job_desc_msg);
	uid = g_slurm_auth_get_uid(msg->cred);
	if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation, JOB_WILL_RUN RPC from uid=%u",
		      (unsigned int) uid);
	}

	if (error_code == SLURM_SUCCESS) {
		lock_slurmctld(job_write_lock);
		error_code = job_allocate(job_desc_msg, &job_id,
					  &node_list_ptr, &num_cpu_groups,
					  &cpus_per_node, &cpu_count_reps,
					  false, true, true, uid, NULL,
					  NULL);
		unlock_slurmctld(job_write_lock);
		info("_slurm_rpc_job_will_run: %s", 
			slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		debug2("_slurm_rpc_job_will_run success %s", TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);
	}
}

/* _slurm_rpc_node_registration - process RPC to determine if a node's 
 *	actual configuration satisfies the configured specification */
static void _slurm_rpc_node_registration(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	slurm_node_registration_status_msg_t *node_reg_stat_msg =
	    (slurm_node_registration_status_msg_t *) msg->data;
	/* Locks: Write job and node */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: MESSAGE_NODE_REGISTRATION_STATUS");
	uid = g_slurm_auth_get_uid(msg->cred);
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation,  NODE_REGISTER RPC from uid=%u",
		      (unsigned int) uid);
	}
	if (error_code == SLURM_SUCCESS) {
		/* do RPC call */
		lock_slurmctld(job_write_lock);
		validate_jobs_on_node(node_reg_stat_msg->node_name,
				      &node_reg_stat_msg->job_count,
				      node_reg_stat_msg->job_id,
				      node_reg_stat_msg->step_id);
		error_code =
		    validate_node_specs(node_reg_stat_msg->node_name,
					node_reg_stat_msg->cpus,
					node_reg_stat_msg->
					real_memory_size,
					node_reg_stat_msg->
					temporary_disk_space,
					node_reg_stat_msg->job_count,
					node_reg_stat_msg->status);
		unlock_slurmctld(job_write_lock);
		error("_slurm_rpc_node_registration node=%s: %s",
			node_reg_stat_msg->node_name, 
			slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		debug2("_slurm_rpc_node_registration complete for %s %s",
			node_reg_stat_msg->node_name, TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);
	}
}

/* _slurm_rpc_old_job_alloc - process RPC to get details on existing job */
static void _slurm_rpc_old_job_alloc(slurm_msg_t * msg)
{
	int error_code = SLURM_SUCCESS;
	slurm_msg_t response_msg;
	old_job_alloc_msg_t *job_desc_msg =
	    (old_job_alloc_msg_t *) msg->data;
	char *node_list_ptr = NULL;
	uint16_t num_cpu_groups = 0;
	uint32_t *cpus_per_node = NULL, *cpu_count_reps = NULL;
	resource_allocation_response_msg_t alloc_msg;
	/* Locks: Read job, read node */
	slurmctld_lock_t job_read_lock = { 
		NO_LOCK, READ_LOCK, READ_LOCK, NO_LOCK };
	uint16_t node_cnt;
	slurm_addr *node_addr;
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_OLD_JOB_RESOURCE_ALLOCATION");

	/* do RPC call */
	uid = g_slurm_auth_get_uid(msg->cred);
	if ( (uid != job_desc_msg->uid) && (!_is_super_user(uid)) ) {
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation, RESOURCE_ALLOCATE from uid=%u",
		      (unsigned int) uid);
	}
	if (error_code == SLURM_SUCCESS) {
		lock_slurmctld(job_read_lock);
		error_code = old_job_info(job_desc_msg->uid,
					  job_desc_msg->job_id,
					  &node_list_ptr, &num_cpu_groups,
					  &cpus_per_node, &cpu_count_reps,
					  &node_cnt, &node_addr);
		unlock_slurmctld(job_read_lock);
		debug2("_slurm_rpc_old_job_alloc: JobId=%u, uid=%u: %s",
			job_desc_msg->job_id, job_desc_msg->uid, 
			slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		debug2("_slurm_rpc_old_job_alloc JobId=%u NodeList=%s %s",
			job_desc_msg->job_id, node_list_ptr, TIME_STR);

		/* send job_ID  and node_name_ptr */

		alloc_msg.job_id         = job_desc_msg->job_id;
		alloc_msg.node_list      = node_list_ptr;
		alloc_msg.num_cpu_groups = num_cpu_groups;
		alloc_msg.cpus_per_node  = cpus_per_node;
		alloc_msg.cpu_count_reps = cpu_count_reps;
		alloc_msg.node_cnt       = node_cnt;
		alloc_msg.node_addr      = node_addr;
		response_msg.msg_type    = RESPONSE_RESOURCE_ALLOCATION;
		response_msg.data        = &alloc_msg;

		slurm_send_node_msg(msg->conn_fd, &response_msg);
	}
}

/* _slurm_rpc_ping - process ping RPC */
static void _slurm_rpc_ping(slurm_msg_t * msg)
{
	/* We could authenticate here, if desired */

	/* return result */
	slurm_send_rc_msg(msg, SLURM_SUCCESS);
}


/* _slurm_rpc_reconfigure_controller - process RPC to re-initialize 
 *	slurmctld from configuration file */
static void _slurm_rpc_reconfigure_controller(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	/* Locks: Write configuration, job, node and partition */
	slurmctld_lock_t config_write_lock = { 
		WRITE_LOCK, WRITE_LOCK, WRITE_LOCK, WRITE_LOCK };
	/* Locks: Read node */
	slurmctld_lock_t node_read_lock = { 
		NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };
	START_TIMER;
	debug2("Processing RPC: REQUEST_RECONFIGURE");
	uid = g_slurm_auth_get_uid(msg->cred);\
	if (!_is_super_user(uid)) {
		error("Security violation, RECONFIGURE RPC from uid=%u",
		      (unsigned int) uid);
		error_code = ESLURM_USER_ID_MISSING;
	}

	/* do RPC call */
	if (error_code == SLURM_SUCCESS) {
		lock_slurmctld(config_write_lock);
		error_code = read_slurm_conf(0);
		unlock_slurmctld(config_write_lock);
		if (error_code == SLURM_SUCCESS) {
			lock_slurmctld(node_read_lock);
			msg_to_slurmd(REQUEST_RECONFIGURE);
			unlock_slurmctld(node_read_lock);
		}
	}
	if (error_code == SLURM_SUCCESS) {  /* Stuff to do after unlock */
		_update_cred_key();
		if (slurmctld_config.daemonize && 
		    chdir(slurmctld_conf.state_save_location) < 0) {
			error("chdir to %s error %m",
			      slurmctld_conf.state_save_location);
		}
	}
		error("_slurm_rpc_reconfigure_controller: %s",
			slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		info("_slurm_rpc_reconfigure_controller: completed %s", 
			TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);
		schedule();	/* has its own locks */
		save_all_state();
	}
}

/* _slurm_rpc_shutdown_controller - process RPC to shutdown slurmctld */
static void _slurm_rpc_shutdown_controller(slurm_msg_t * msg)
{
	int error_code = SLURM_SUCCESS, i;
	uint16_t core_arg = 0;
	shutdown_msg_t *shutdown_msg = (shutdown_msg_t *) msg->data;
	uid_t uid;
	/* Locks: Read node */
	slurmctld_lock_t node_read_lock = { 
		NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK };

	uid = g_slurm_auth_get_uid(msg->cred);
	if (!_is_super_user(uid)) {
		error("Security violation, SHUTDOWN RPC from uid=%u",
		      (unsigned int) uid);
		error_code = ESLURM_USER_ID_MISSING;
	}
	if (error_code);
	else if (msg->msg_type == REQUEST_CONTROL) {
		info("Performing RPC: REQUEST_CONTROL");
		/* resume backup mode */
		slurmctld_config.resume_backup = true;	
	} else {
		debug2("Performing RPC: REQUEST_SHUTDOWN");
		core_arg = shutdown_msg->core;
	}

	/* do RPC call */
	if (error_code);
	else if (core_arg)
		info("performing immeditate shutdown without state save");
	else if (slurmctld_config.shutdown_time)
		debug2("shutdown RPC issued when already in progress");
	else {
		if (msg->msg_type == REQUEST_SHUTDOWN) {
			/* This means (msg->msg_type != REQUEST_CONTROL) */
			lock_slurmctld(node_read_lock);
			msg_to_slurmd(REQUEST_SHUTDOWN);
			unlock_slurmctld(node_read_lock);
		}
		if (slurmctld_config.thread_id_sig)	/* signal clean-up */
			pthread_kill(slurmctld_config.thread_id_sig, SIGTERM);
		else {
			error("thread_id_sig undefined, hard shutdown");
			slurmctld_config.shutdown_time = time(NULL);
			/* send REQUEST_SHUTDOWN_IMMEDIATE RPC */
			slurmctld_shutdown();
		}
	}

	if (msg->msg_type == REQUEST_CONTROL) {
		/* Wait for workload to dry up before sending reply.
		 * One thread should remain, this one. */
		for (i = 1; i < CONTROL_TIMEOUT; i++) {
			if (slurmctld_config.server_thread_count <= 1)
				break;
			sleep(1);
		}
		if (slurmctld_config.server_thread_count > 1)
			error("REQUEST_CONTROL reply with %d active threads",
				slurmctld_config.server_thread_count);
		/* save_all_state();	performed by _slurmctld_background */
	}
	slurm_send_rc_msg(msg, error_code);
	if ((error_code == SLURM_SUCCESS) && core_arg) {
		info("Aborting per RPC request");
		abort();
	}
}

/* _slurm_rpc_shutdown_controller_immediate - process RPC to shutdown 
 *	slurmctld */
static void _slurm_rpc_shutdown_controller_immediate(slurm_msg_t * msg)
{
	int error_code = SLURM_SUCCESS;
	uid_t uid;

	uid = g_slurm_auth_get_uid(msg->cred);
		    ("Security violation, SHUTDOWN_IMMEDIATE RPC from uid=%u",
		     (unsigned int) uid);
		error_code = ESLURM_USER_ID_MISSING;
	}

	/* do RPC call */
	/* No op: just used to knock loose accept RPC thread */
	if (error_code == SLURM_SUCCESS)
		debug("Performing RPC: REQUEST_SHUTDOWN_IMMEDIATE");
}

/* _slurm_rpc_submit_batch_job - process RPC to submit a batch job */
static void _slurm_rpc_submit_batch_job(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	uint32_t job_id;
	slurm_msg_t response_msg;
	submit_response_msg_t submit_msg;
	job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
	/* Locks: Write job, read node, read partition */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_SUBMIT_BATCH_JOB");

	/* do RPC call */
	dump_job_desc(job_desc_msg);
	uid = g_slurm_auth_get_uid(msg->cred);
	if ( (uid != job_desc_msg->user_id) && (!_is_super_user(uid)) ) {
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation, SUBMIT_JOB from uid=%u",
		      (unsigned int) uid);
	}
	if (error_code == SLURM_SUCCESS) {
		lock_slurmctld(job_write_lock);
		error_code = job_allocate(job_desc_msg, &job_id,
					  (char **) NULL,
					  (uint16_t *) NULL,
					  (uint32_t **) NULL,
					  (uint32_t **) NULL, false, false,
					  false, uid, NULL, NULL);
		unlock_slurmctld(job_write_lock);
	}

	/* return result */
	if ((error_code != SLURM_SUCCESS) &&
	    (error_code != ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE)) {
		info("_slurm_rpc_submit_batch_job: %s",
		     slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		info(
		   "_slurm_rpc_submit_batch_job JobId=%u %s", 
		   job_id, TIME_STR);
		/* send job_ID */
		submit_msg.job_id     = job_id;
		submit_msg.error_code = error_code;
		response_msg.msg_type = RESPONSE_SUBMIT_BATCH_JOB;
		response_msg.data = &submit_msg;
		slurm_send_node_msg(msg->conn_fd, &response_msg);
		schedule();	/* has own locks */
		(void) dump_all_job_state();	/* has own locks */
	}
}

/* _slurm_rpc_update_job - process RPC to update the configuration of a 
 *	job (e.g. priority) */
static void _slurm_rpc_update_job(slurm_msg_t * msg)
{
	/* init */
	int error_code;
	job_desc_msg_t *job_desc_msg = (job_desc_msg_t *) msg->data;
	/* Locks: Write job, read node, read partition */
	slurmctld_lock_t job_write_lock = { 
		NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_UPDATE_JOB");

	/* do RPC call */
	uid = g_slurm_auth_get_uid(msg->cred);
	error_code = update_job(job_desc_msg, uid);
	unlock_slurmctld(job_write_lock);
	END_TIMER;
		error("_slurm_rpc_update_job JobId=%u: %s",
		     job_desc_msg->job_id, slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		debug2("_slurm_rpc_update_job complete JobId=%u %s", 
			job_desc_msg->job_id, TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);
		/* Below functions provide their own locking */
		schedule();
		(void) dump_all_job_state();
	}
}

/* _slurm_rpc_update_node - process RPC to update the configuration of a 
 *	node (e.g. UP/DOWN) */
static void _slurm_rpc_update_node(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	update_node_msg_t *update_node_msg_ptr =
	    			(update_node_msg_t *) msg->data;
	/* Locks: Write job and write node */
	slurmctld_lock_t node_write_lock = { 
		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
	START_TIMER;
	debug2("Processing RPC: REQUEST_UPDATE_NODE");
	uid = g_slurm_auth_get_uid(msg->cred);
		error_code = ESLURM_USER_ID_MISSING;
		error("Security violation, UPDATE_NODE RPC from uid=%u",
		      (unsigned int) uid);
	}

	if (error_code == SLURM_SUCCESS) {
		/* do RPC call */
		lock_slurmctld(node_write_lock);
		error_code = update_node(update_node_msg_ptr);
		unlock_slurmctld(node_write_lock);
		info("_slurm_rpc_update_node for %s: %s",
		      update_node_msg_ptr->node_names,
		      slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		debug2("_slurm_rpc_update_node complete for %s %s", 
			update_node_msg_ptr->node_names, TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);
	}

	/* Below functions provide their own locks */
	if (schedule())
		(void) dump_all_job_state();
	(void) dump_all_node_state();
}

/* _slurm_rpc_update_partition - process RPC to update the configuration 
 *	of a partition (e.g. UP/DOWN) */
static void _slurm_rpc_update_partition(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	update_part_msg_t *part_desc_ptr = (update_part_msg_t *) msg->data;
	/* Locks: Read node, write partition */
	slurmctld_lock_t part_write_lock = { 
		NO_LOCK, NO_LOCK, READ_LOCK, WRITE_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_UPDATE_PARTITION");
	uid = g_slurm_auth_get_uid(msg->cred);
		error_code = ESLURM_USER_ID_MISSING;
		error
		    ("Security violation, UPDATE_PARTITION RPC from uid=%u",
		     (unsigned int) uid);
	}

	if (error_code == SLURM_SUCCESS) {
		/* do RPC call */
		lock_slurmctld(part_write_lock);
		error_code = update_part(part_desc_ptr);
		unlock_slurmctld(part_write_lock);
		info("_slurm_rpc_update_partition partition=%s: %s",
			part_desc_ptr->name, slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		debug2("_slurm_rpc_update_partition complete for %s %s",
			part_desc_ptr->name, TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);

		/* NOTE: These functions provide their own locks */
		(void) dump_all_part_state();
		if (schedule())
			(void) dump_all_job_state();
	}
}

/* _slurm_rpc_delete_partition - process RPC to delete a partition */
static void _slurm_rpc_delete_partition(slurm_msg_t * msg)
{
	/* init */
	int error_code = SLURM_SUCCESS;
	DEF_TIMERS;
	delete_part_msg_t *part_desc_ptr = (delete_part_msg_t *) msg->data;
	/* Locks: Read job, write partition */
	slurmctld_lock_t part_write_lock = { 
		NO_LOCK, WRITE_LOCK, NO_LOCK, WRITE_LOCK };
	uid_t uid;

	START_TIMER;
	debug2("Processing RPC: REQUEST_DELETE_PARTITION");
	uid = g_slurm_auth_get_uid(msg->cred);
	if (!_is_super_user(uid)) {
		error_code = ESLURM_USER_ID_MISSING;
		error
		    ("Security violation, DELETE_PARTITION RPC from uid=%u",
		     (unsigned int) uid);
	}

	if (error_code == SLURM_SUCCESS) {
		/* do RPC call */
		lock_slurmctld(part_write_lock);
		error_code = delete_partition(part_desc_ptr);
		unlock_slurmctld(part_write_lock);
		END_TIMER;
	}

	/* return result */
	if (error_code) {
		info("_slurm_rpc_delete_partition partition=%s: %s",
			part_desc_ptr->name, slurm_strerror(error_code));
		slurm_send_rc_msg(msg, error_code);
	} else {
		info("_slurm_rpc_delete_partition complete for %s %s",
			part_desc_ptr->name, TIME_STR);
		slurm_send_rc_msg(msg, SLURM_SUCCESS);

		/* NOTE: These functions provide their own locks */
		(void) dump_all_job_state();
		(void) dump_all_part_state();
		if (schedule())
			(void) dump_all_job_state();
	}
}

/* Reset the job credential key based upon configuration parameters */
static void _update_cred_key(void) 
{
	slurm_cred_ctx_key_update(slurmctld_config.cred_ctx, 
				  slurmctld_conf.job_credential_private_key);
}