controller.c 15.03 KiB
/*
* controller.c - main control machine daemon for slurm
* see slurm.h for documentation on external functions and data structures
*
* NOTE: DEBUG_MODULE of read_config requires that it be loaded with
* bits_bytes, partition_mgr, read_config, and node_mgr
*
* author: moe jette, jette@llnl.gov
*/
#ifdef have_config_h
# include <config.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <syslog.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include "slurmctld.h"
#include "pack.h"
#define BUF_SIZE 1024
time_t init_time;
int dump_build (char **buffer_ptr, int *buffer_size, time_t last_update);
int msg_from_root (void);
void slurmctld_req (int sockfd);
int
main (int argc, char *argv[]) {
int error_code;
int cli_len, newsockfd, sockfd;
struct sockaddr_in cli_addr, serv_addr;
char node_name[MAX_NAME_LEN];
log_options_t opts = LOG_OPTS_STDERR_ONLY;
init_time = time (NULL);
log_init(argv[0], opts, SYSLOG_FACILITY_DAEMON, NULL);
error_code = init_slurm_conf ();
if (error_code)
fatal ("slurmctld: init_slurm_conf error %d", error_code);
error_code = read_slurm_conf (SLURM_CONF);
if (error_code)
fatal ("slurmctld: error %d from read_slurm_conf reading %s",
error_code, SLURM_CONF);
error_code = gethostname (node_name, MAX_NAME_LEN);
if (error_code != 0)
fatal ("slurmctld: error %d from gethostname", error_code);
if (strcmp (node_name, control_machine) != 0)
fatal ("slurmctld: this machine (%s) is not the primary control machine (%s)",
node_name, control_machine);
if ((sockfd = socket (AF_INET, SOCK_STREAM, 0)) < 0)
fatal ("slurmctld: error %d from socket", errno);
memset (&serv_addr, 0, sizeof (serv_addr));
serv_addr.sin_family = PF_INET;
serv_addr.sin_addr.s_addr = htonl (INADDR_ANY);
serv_addr.sin_port = htons (SLURMCTLD_PORT);
error_code = bind (sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr));
if ((error_code < 0) && (errno == EADDRINUSE)) {
printf("waiting to bind\n");
sleep (10);
error_code = bind (sockfd, (struct sockaddr *) &serv_addr,
sizeof (serv_addr));
}
if (error_code < 0)
fatal ("slurmctld: error %d from bind\n", errno);
info ("slurmctld ready for service\n");
listen (sockfd, 5);
while (1) {
cli_len = sizeof (cli_addr);
if ((newsockfd =
accept (sockfd, (struct sockaddr *) &cli_addr,
&cli_len)) < 0)
fatal ("slurmctld: error %d from accept", errno);
/* convert to pthread, tbd */
slurmctld_req (newsockfd); /* process the request */
close (newsockfd); /* close the new socket */
}
}
/*
* dump_build - dump all build parameters to a buffer
* input: buffer_ptr - location into which a pointer to the data is to be stored.
* the data buffer is actually allocated by dump_part and the
* calling function must xfree the storage.
* buffer_size - location into which the size of the created buffer is in bytes
* last_update - only perform dump if updated since time specified
* output: buffer_ptr - the pointer is set to the allocated buffer.
* buffer_size - set to size of the buffer in bytes
* returns 0 if no error, errno otherwise
* NOTE: the buffer at *buffer_ptr must be xfreed by the caller
* NOTE: if you make any changes here be sure to increment the value of
* BUILD_STRUCT_VERSION and make the corresponding changes to
* load_build in api/build_info.c
*/
int
dump_build (char **buffer_ptr, int *buffer_size, time_t last_update)
{
int buf_len, buffer_allocated;
char *buffer;
void *buf_ptr;
buffer_ptr[0] = NULL;
*buffer_size = 0;
if (init_time <= last_update)
return 0;
buffer_allocated = (BUF_SIZE);
buffer = xmalloc(buffer_allocated);
buf_ptr = buffer;
buf_len = buffer_allocated;
/* write header: version and time */
pack32 ((uint32_t) BUILD_STRUCT_VERSION, &buf_ptr, &buf_len);
pack32 ((uint32_t) init_time, &buf_ptr, &buf_len);
/* write data values */
pack16 ((uint16_t) BACKUP_INTERVAL, &buf_ptr, &buf_len);
packstr (BACKUP_LOCATION, &buf_ptr, &buf_len);
packstr (backup_controller, &buf_ptr, &buf_len);
packstr (CONTROL_DAEMON, &buf_ptr, &buf_len);
packstr (control_machine, &buf_ptr, &buf_len);
pack16 ((uint16_t) CONTROLLER_TIMEOUT, &buf_ptr, &buf_len);
packstr (EPILOG, &buf_ptr, &buf_len);
pack16 ((uint16_t) FAST_SCHEDULE, &buf_ptr, &buf_len);
pack16 ((uint16_t) HASH_BASE, &buf_ptr, &buf_len);
pack16 ((uint16_t) HEARTBEAT_INTERVAL, &buf_ptr, &buf_len);
packstr (INIT_PROGRAM, &buf_ptr, &buf_len);
pack16 ((uint16_t) KILL_WAIT, &buf_ptr, &buf_len);
packstr (PRIORITIZE, &buf_ptr, &buf_len);
packstr (PROLOG, &buf_ptr, &buf_len);
packstr (SERVER_DAEMON, &buf_ptr, &buf_len);
pack16 ((uint16_t) SERVER_TIMEOUT, &buf_ptr, &buf_len);
packstr (SLURM_CONF, &buf_ptr, &buf_len);
packstr (TMP_FS, &buf_ptr, &buf_len);
*buffer_size = (char *)buf_ptr - buffer;
xrealloc (buffer, *buffer_size);
buffer_ptr[0] = buffer;
return 0;
}
/*
* slurmctld_req - process a slurmctld request from the given socket
* input: sockfd - the socket with a request to be processed
*/
void
slurmctld_req (int sockfd) {
int error_code, in_size, i;
char in_line[BUF_SIZE], node_name[MAX_NAME_LEN];
int cpus, real_memory, tmp_disk;
char *node_name_ptr, *part_name, *time_stamp;
uint32_t job_id;
time_t last_update;
clock_t start_time;
char *dump;
int dump_size, dump_loc;
in_size = recv (sockfd, in_line, sizeof (in_line), 0);
start_time = clock ();
/* Allocate: allocate resources for a job */
if (strncmp ("Allocate", in_line, 8) == 0) {
node_name_ptr = NULL;
error_code = job_allocate(&in_line[8], /* skip "Allocate" */
&job_id, &node_name_ptr);
if (error_code)
info ("slurmctld_req: error %d allocating resources for %s, time=%ld",
error_code, &in_line[8], (long) (clock () - start_time));
else
info ("slurmctld_req: allocated nodes %s to %s, JobId=%u, time=%ld",
node_name_ptr, &in_line[8], job_id,
(long) (clock () - start_time));
if (error_code == 0) {
i = strlen(node_name_ptr) + 12;
dump = xmalloc(i);
sprintf(dump, "%s %u", node_name_ptr, job_id);
send (sockfd, dump, i, 0);
xfree(dump);
}
else if (error_code == EAGAIN)
send (sockfd, "EAGAIN", 7, 0);
else
send (sockfd, "EINVAL", 7, 0);
if (node_name_ptr)
xfree (node_name_ptr);
}
/* DumpBuild - dump the SLURM build parameters */
else if (strncmp ("DumpBuild", in_line, 9) == 0) {
time_stamp = NULL;
error_code =
load_string (&time_stamp, "LastUpdate=", in_line);
if (time_stamp) {
last_update = strtol (time_stamp, (char **) NULL, 10);
xfree (time_stamp);
}
else
last_update = (time_t) 0;
error_code = dump_build (&dump, &dump_size, last_update);
if (error_code)
info ("slurmctld_req: dump_build error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: dump_build returning %d bytes, time=%ld",
dump_size, (long) (clock () - start_time));
if (dump_size == 0)
send (sockfd, "nochange", 9, 0);
else if (error_code == 0) {
dump_loc = 0;
while (dump_size > 0) {
i = send (sockfd, &dump[dump_loc], dump_size, 0);
dump_loc += i;
dump_size -= i;
}
}
else
send (sockfd, "EINVAL", 7, 0);
if (dump)
xfree (dump);
}
/* DumpJob - dump the job state information */
else if (strncmp ("DumpJob", in_line, 7) == 0) {
time_stamp = NULL;
error_code =
load_string (&time_stamp, "LastUpdate=", in_line);
if (time_stamp) {
last_update = strtol (time_stamp, (char **) NULL, 10);
xfree (time_stamp);
}
else
last_update = (time_t) 0;
error_code = pack_all_jobs (&dump, &dump_size, &last_update);
if (error_code)
info ("slurmctld_req: pack_all_jobs error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: pack_all_jobs returning %d bytes, time=%ld",
dump_size, (long) (clock () - start_time));
if (dump_size == 0)
send (sockfd, "nochange", 9, 0);
else if (error_code == 0) {
dump_loc = 0;
while (dump_size > 0) {
i = send (sockfd, &dump[dump_loc], dump_size,
0);
dump_loc += i;
dump_size -= i;
}
}
else
send (sockfd, "EINVAL", 7, 0);
if (dump)
xfree (dump);
}
/* DumpNode - dump the node configurations */
else if (strncmp ("DumpNode", in_line, 8) == 0) {
time_stamp = NULL;
error_code =
load_string (&time_stamp, "LastUpdate=", in_line);
if (time_stamp) {
last_update = strtol (time_stamp, (char **) NULL, 10);
xfree (time_stamp);
}
else
last_update = (time_t) 0;
error_code = pack_all_node (&dump, &dump_size, &last_update);
if (error_code)
info ("slurmctld_req: part_all_node error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: part_all_node returning %d bytes, time=%ld",
dump_size, (long) (clock () - start_time));
if (dump_size == 0)
send (sockfd, "nochange", 9, 0);
else if (error_code == 0) {
dump_loc = 0;
while (dump_size > 0) {
i = send (sockfd, &dump[dump_loc], dump_size,
0);
dump_loc += i;
dump_size -= i;
}
}
else
send (sockfd, "EINVAL", 7, 0);
if (dump)
xfree (dump);
}
/* DumpPart - dump the partition configurations */
else if (strncmp ("DumpPart", in_line, 8) == 0) {
time_stamp = NULL;
error_code =
load_string (&time_stamp, "LastUpdate=", in_line);
if (time_stamp) {
last_update = strtol (time_stamp, (char **) NULL, 10);
xfree (time_stamp);
}
else
last_update = (time_t) 0;
error_code = pack_all_part (&dump, &dump_size, &last_update);
if (error_code)
info ("slurmctld_req: dump_part error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: dump_part returning %d bytes, time=%ld",
dump_size, (long) (clock () - start_time));
if (dump_size == 0)
send (sockfd, "nochange", 9, 0);
else if (error_code == 0) {
dump_loc = 0;
while (dump_size > 0) {
i = send (sockfd, &dump[dump_loc], dump_size,
0);
dump_loc += i;
dump_size -= i;
}
}
else
send (sockfd, "EINVAL", 7, 0);
if (dump)
xfree (dump);
}
/* JobCancel - cancel a slurm job or reservation */
else if (strncmp ("JobCancel", in_line, 9) == 0) {
job_id = (uint32_t) strtol (&in_line[10], (char **)NULL, 10);
error_code = job_cancel (job_id);
if (error_code)
info ("slurmctld_req: job_cancel error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: job_cancel success for %s, time=%ld",
&in_line[10], (long) (clock () - start_time));
if (error_code == 0)
send (sockfd, "Job killed", 11, 0);
else
send (sockfd, "EINVAL", 7, 0);
}
/* JobSubmit - submit a job to the slurm queue */
else if (strncmp ("JobSubmit", in_line, 9) == 0) {
struct job_record *job_rec_ptr;
error_code = job_create(&in_line[9], &job_id, 0, 0,
&job_rec_ptr); /* skip "JobSubmit" */
if (error_code)
info ("slurmctld_req: job_submit error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: job_submit success for %s, id=%u, time=%ld",
&in_line[9], job_id,
(long) (clock () - start_time));
if (error_code == 0) {
dump = xmalloc(12);
sprintf(dump, "%u", job_id);
send (sockfd, dump, strlen(dump) + 1, 0);
xfree (dump);
}
else
send (sockfd, "EINVAL", 7, 0);
schedule();
}
/* JobWillRun - determine if job with given configuration can be initiated now */
else if (strncmp ("JobWillRun", in_line, 10) == 0) {
error_code = EINVAL;
if (error_code)
info ("slurmctld_req: job_will_run error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: job_will_run success for %s, time=%ld",
&in_line[10], (long) (clock () - start_time));
if (error_code == 0)
send (sockfd, dump, dump_size, 0);
else
send (sockfd, "EINVAL", 7, 0);
}
/* NodeConfig - determine if a node's actual configuration satisfies the
* configured specification */
else if (strncmp ("NodeConfig", in_line, 10) == 0) {
node_name_ptr = NULL;
cpus = real_memory = tmp_disk = NO_VAL;
error_code = load_string (&node_name_ptr, "NodeName=", in_line);
if (node_name == NULL)
error_code = EINVAL;
if (error_code == 0)
error_code = load_integer (&cpus, "CPUs=", in_line);
if (error_code == 0)
error_code =
load_integer (&real_memory, "RealMemory=",
in_line);
if (error_code == 0)
error_code =
load_integer (&tmp_disk, "TmpDisk=",
in_line);
if (error_code == 0)
error_code =
validate_node_specs (node_name_ptr, cpus,
real_memory, tmp_disk);
if (error_code)
error ("slurmctld_req: node_config error %d for %s, time=%ld",
error_code, node_name_ptr, (long) (clock () - start_time));
else
info ("slurmctld_req: node_config for %s, time=%ld",
node_name_ptr, (long) (clock () - start_time));
if (error_code == 0)
send (sockfd, dump, dump_size, 0);
else
send (sockfd, "EINVAL", 7, 0);
if (node_name_ptr)
xfree (node_name_ptr);
}
/* Reconfigure - re-initialized from configuration files */
else if (strncmp ("Reconfigure", in_line, 11) == 0) {
error_code = init_slurm_conf ();
if (error_code == 0)
error_code = read_slurm_conf (SLURM_CONF);
reset_job_bitmaps ();
if (error_code)
error ("slurmctld_req: reconfigure error %d, time=%ld",
error_code, (long) (clock () - start_time));
else
info ("slurmctld_req: reconfigure completed successfully, time=%ld",
(long) (clock () - start_time));
sprintf (in_line, "%d", error_code);
send (sockfd, in_line, strlen (in_line) + 1, 0);
}
/* Update - modify node or partition configuration */
else if (strncmp ("Update", in_line, 6) == 0) {
node_name_ptr = part_name = NULL;
error_code = load_string (&node_name_ptr, "NodeName=", in_line);
if ((error_code == 0) && (node_name_ptr != NULL))
error_code = update_node (node_name_ptr, &in_line[6]); /* skip "Update" */
else {
error_code =
load_string (&part_name, "PartitionName=", in_line);
if ((error_code == 0) && (part_name != NULL))
error_code = update_part (part_name, &in_line[6]); /* skip "Update" */
else
error_code = EINVAL;
}
if (error_code) {
if (node_name_ptr)
error ("slurmctld_req: update error %d on node %s, time=%ld",
error_code, node_name_ptr, (long) (clock () - start_time));
else if (part_name)
error ("slurmctld_req: update error %d on partition %s, time=%ld",
error_code, part_name, (long) (clock () - start_time));
else
error ("slurmctld_req: update error %d on request %s, time=%ld",
error_code, in_line, (long) (clock () - start_time));
}
else {
if (node_name_ptr)
info ("slurmctld_req: updated node %s, time=%ld",
node_name_ptr, (long) (clock () - start_time));
else
info ("slurmctld_req: updated partition %s, time=%ld",
part_name, (long) (clock () - start_time));
}
sprintf (in_line, "%d", error_code);
send (sockfd, in_line, strlen (in_line) + 1, 0);
if (node_name_ptr)
xfree (node_name_ptr);
if (part_name)
xfree (part_name);
}
else {
error ("slurmctld_req: invalid request %s\n", in_line);
send (sockfd, "EINVAL", 7, 0);
}
return;
}