From ec542b8a8babdefd091a8252ad25701c5b8b6f6f Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" <morrone2@llnl.gov> Date: Fri, 18 Nov 2005 23:08:58 +0000 Subject: [PATCH] Redo the stepd_api to allow persisent connections. A connection with a slurmstepd is established with the stepd_connect() call, and then all of the following stepd_* calls take the file descriptor that was returned by stepd_connect. --- src/slurmd/common/run_script.c | 1 + src/slurmd/common/stepd_api.c | 218 ++++++++++----------- src/slurmd/common/stepd_api.h | 47 +++-- src/slurmd/slurmd/req.c | 220 +++++++++++++++------ src/slurmd/slurmd/slurmd.c | 11 +- src/slurmd/slurmstepd/mgr.c | 2 +- src/slurmd/slurmstepd/req.c | 338 +++++++++++++++++---------------- 7 files changed, 487 insertions(+), 350 deletions(-) diff --git a/src/slurmd/common/run_script.c b/src/slurmd/common/run_script.c index ffc8c170d49..8140d7a4858 100644 --- a/src/slurmd/common/run_script.c +++ b/src/slurmd/common/run_script.c @@ -24,6 +24,7 @@ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. \*****************************************************************************/ +#include <stdlib.h> #include <sys/wait.h> #include <sys/errno.h> diff --git a/src/slurmd/common/stepd_api.c b/src/slurmd/common/stepd_api.c index e234e9101bf..1c318e8815b 100644 --- a/src/slurmd/common/stepd_api.c +++ b/src/slurmd/common/stepd_api.c @@ -46,7 +46,7 @@ #include "src/slurmd/common/stepd_api.h" static int -step_connect(step_loc_t step) +_step_connect(char *directory, char *nodename, uint32_t jobid, uint32_t stepid) { int fd; int len; @@ -54,19 +54,18 @@ step_connect(step_loc_t step) char *name = NULL; if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { - debug("step_connect: socket: %m"); + debug("_step_connect: socket: %m"); return -1; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; - xstrfmtcat(name, "%s/%s_%u.%u", step.directory, step.nodename, - step.jobid, step.stepid); + xstrfmtcat(name, "%s/%s_%u.%u", directory, nodename, jobid, stepid); strcpy(addr.sun_path, name); len = strlen(addr.sun_path)+1 + sizeof(addr.sun_family); if (connect(fd, (struct sockaddr *) &addr, len) < 0) { - debug("step_connect: connect: %m"); + debug("_step_connect: connect: %m"); xfree(name); close(fd); return -1; @@ -76,61 +75,125 @@ step_connect(step_loc_t step) return fd; } -slurmstepd_state_t -stepd_state(step_loc_t step) + +/* + * Connect to a slurmstepd proccess by way of its unix domain socket. + * + * Returns a socket descriptor for the opened socket on success, + * and -1 on error. + */ +int +stepd_connect(char *directory, char *nodename, uint32_t jobid, uint32_t stepid) { - int req = REQUEST_STATE; - int fd; - slurmstepd_state_t status = SLURMSTEPD_NOT_RUNNING; + int req = REQUEST_CONNECT; + int fd = -1; + int rc; + void *auth_cred; + Buf buffer; + int len; - fd = step_connect(step); + buffer = init_buf(0); + /* Create an auth credential */ + auth_cred = g_slurm_auth_create(NULL, 2); + if (auth_cred == NULL) { + error("Creating authentication credential: %s", + g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); + slurm_seterrno(SLURM_PROTOCOL_AUTHENTICATION_ERROR); + goto fail1; + } + + /* Pack the auth credential */ + rc = g_slurm_auth_pack(auth_cred, buffer); + (void) g_slurm_auth_destroy(auth_cred); + if (rc) { + error("Packing authentication credential: %s", + g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred))); + slurm_seterrno(SLURM_PROTOCOL_AUTHENTICATION_ERROR); + goto fail1; + } + + /* Connect to the step */ + fd = _step_connect(directory, nodename, jobid, stepid); if (fd == -1) - return status; + goto fail1; safe_write(fd, &req, sizeof(int)); - safe_read(fd, &status, sizeof(slurmstepd_state_t)); + len = size_buf(buffer); + safe_write(fd, &len, sizeof(int)); + safe_write(fd, get_buf_data(buffer), len); + + safe_read(fd, &rc, sizeof(int)); + if (rc < 0) { + error("slurmstepd refused authentication: %m"); + slurm_seterrno(SLURM_PROTOCOL_AUTHENTICATION_ERROR); + goto rwfail; + } + + free_buf(buffer); + return fd; rwfail: close(fd); +fail1: + free_buf(buffer); + return -1; +} + + +/* + * Retrieve a job step's current state. + */ +slurmstepd_state_t +stepd_state(int fd) +{ + int req = REQUEST_STATE; + slurmstepd_state_t status = SLURMSTEPD_NOT_RUNNING; + + safe_write(fd, &req, sizeof(int)); + safe_read(fd, &status, sizeof(slurmstepd_state_t)); +rwfail: return status; } +/* + * Retrieve slurmstepd_info_t structure for a job step. + * + * Must be xfree'd by the caller. + */ +slurmstepd_info_t * +stepd_get_info(int fd) +{ + int req = REQUEST_INFO; + slurmstepd_info_t *info; + + info = xmalloc(sizeof(slurmstepd_info_t)); + safe_write(fd, &req, sizeof(int)); + safe_read(fd, &info->uid, sizeof(uid_t)); + safe_read(fd, &info->jobid, sizeof(uint32_t)); + safe_read(fd, &info->stepid, sizeof(uint32_t)); + + return info; +rwfail: + return NULL; +} + /* * Send a signal to the process group of a job step. */ int -stepd_signal(step_loc_t step, void *auth_cred, int signal) +stepd_signal(int fd, int signal) { int req = REQUEST_SIGNAL_PROCESS_GROUP; - int fd; - Buf buf; - int buf_len; int rc; - fd = step_connect(step); - if (fd == -1) - return -1; safe_write(fd, &req, sizeof(int)); - - /* pack auth credential */ - buf = init_buf(0); - if (g_slurm_auth_pack(auth_cred, buf) == SLURM_ERROR) - error("g_slurm_auth_pack failed!: %m"); - buf_len = size_buf(buf); - debug("buf_len = %d", buf_len); - safe_write(fd, &signal, sizeof(int)); - safe_write(fd, &buf_len, sizeof(int)); - safe_write(fd, get_buf_data(buf), buf_len); /* Receive the return code */ safe_read(fd, &rc, sizeof(int)); - free_buf(buf); - close(fd); return rc; rwfail: - close(fd); return -1; } @@ -138,40 +201,20 @@ rwfail: * Send a signal to a single task in a job step. */ int -stepd_signal_task_local(step_loc_t step, void *auth_cred, - int signal, int ltaskid) +stepd_signal_task_local(int fd, int signal, int ltaskid) { int req = REQUEST_SIGNAL_PROCESS_GROUP; - int fd; - Buf buf; - int buf_len; int rc; - fd = step_connect(step); - if (fd == -1) - return -1; safe_write(fd, &req, sizeof(int)); - - /* pack auth credential */ - buf = init_buf(0); - if (g_slurm_auth_pack(auth_cred, buf) == SLURM_ERROR) - error("g_slurm_auth_pack failed!: %m"); - buf_len = size_buf(buf); - debug("buf_len = %d", buf_len); - safe_write(fd, &signal, sizeof(int)); safe_write(fd, <askid, sizeof(int)); - safe_write(fd, &buf_len, sizeof(int)); - safe_write(fd, get_buf_data(buf), buf_len); /* Receive the return code */ safe_read(fd, &rc, sizeof(int)); - free_buf(buf); - close(fd); return rc; rwfail: - close(fd); return -1; } @@ -179,43 +222,22 @@ rwfail: * Send a signal to the proctrack container of a job step. */ int -stepd_signal_container(step_loc_t step, void *auth_cred, int signal) +stepd_signal_container(int fd, int signal) { int req = REQUEST_SIGNAL_CONTAINER; - int fd; - Buf buf; - int buf_len; int rc; int errnum = 0; - fd = step_connect(step); - if (fd == -1) - return -1; safe_write(fd, &req, sizeof(int)); - - /* pack auth credential */ - buf = init_buf(0); - if (g_slurm_auth_pack(auth_cred, buf) == SLURM_ERROR) { - error("g_slurm_auth_pack: %s", - g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); - } - buf_len = size_buf(buf); - debug("buf_len = %d", buf_len); - safe_write(fd, &signal, sizeof(int)); - safe_write(fd, &buf_len, sizeof(int)); - safe_write(fd, get_buf_data(buf), buf_len); /* Receive the return code and errno */ safe_read(fd, &rc, sizeof(int)); safe_read(fd, &errnum, sizeof(int)); - free_buf(buf); - close(fd); errno = errnum; return rc; rwfail: - close(fd); return -1; } @@ -227,33 +249,16 @@ rwfail: * resp->gtids, resp->ntasks, and resp->executable. */ int -stepd_attach(step_loc_t step, slurm_addr *ioaddr, slurm_addr *respaddr, - void *auth_cred, slurm_cred_t job_cred, - reattach_tasks_response_msg_t *resp) +stepd_attach(int fd, slurm_addr *ioaddr, slurm_addr *respaddr, + void *job_cred_sig, reattach_tasks_response_msg_t *resp) { int req = REQUEST_ATTACH; - int fd; - Buf buf; - int buf_len; int rc = SLURM_SUCCESS; - fd = step_connect(step); - if (fd == -1) - return SLURM_ERROR; safe_write(fd, &req, sizeof(int)); - - /* pack auth and job credentials */ - buf = init_buf(0); - if (g_slurm_auth_pack(auth_cred, buf) == SLURM_ERROR) - error("g_slurm_auth_pack failed!: %m"); - slurm_cred_pack(job_cred, buf); - buf_len = size_buf(buf); - debug("buf_len = %d", buf_len); - safe_write(fd, ioaddr, sizeof(slurm_addr)); safe_write(fd, respaddr, sizeof(slurm_addr)); - safe_write(fd, &buf_len, sizeof(int)); - safe_write(fd, get_buf_data(buf), buf_len); + safe_write(fd, job_cred_sig, SLURM_CRED_SIGLEN); /* Receive the return code */ safe_read(fd, &rc, sizeof(int)); @@ -278,12 +283,8 @@ stepd_attach(step_loc_t step, slurm_addr *ioaddr, slurm_addr *respaddr, safe_read(fd, resp->executable_name, len); } - free_buf(buf); - close(fd); return rc; - rwfail: - close(fd); return SLURM_ERROR; } @@ -455,16 +456,11 @@ done: * the proctrack container of the slurmstepd "step". */ bool -stepd_pid_in_container(step_loc_t step, pid_t pid) +stepd_pid_in_container(int fd, pid_t pid) { int req = REQUEST_PID_IN_CONTAINER; - int fd; bool rc; - fd = step_connect(step); - if (fd == -1) - return false; - safe_write(fd, &req, sizeof(int)); safe_write(fd, &pid, sizeof(pid_t)); @@ -472,10 +468,8 @@ stepd_pid_in_container(step_loc_t step, pid_t pid) safe_read(fd, &rc, sizeof(bool)); debug("Leaving stepd_pid_in_container"); - close(fd); return rc; rwfail: - close(fd); return false; } @@ -483,21 +477,15 @@ rwfail: * Return the process ID of the slurmstepd. */ pid_t -stepd_daemon_pid(step_loc_t step) +stepd_daemon_pid(int fd) { int req = REQUEST_DAEMON_PID; - int fd; pid_t pid; - fd = step_connect(step); - if (fd == -1) - return (pid_t)-1; safe_write(fd, &req, sizeof(int)); safe_read(fd, &pid, sizeof(pid_t)); - close(fd); return pid; rwfail: - close(fd); return (pid_t)-1; } diff --git a/src/slurmd/common/stepd_api.h b/src/slurmd/common/stepd_api.h index d4946e3ac4b..66d12287e74 100644 --- a/src/slurmd/common/stepd_api.h +++ b/src/slurmd/common/stepd_api.h @@ -42,11 +42,13 @@ typedef struct step_location { } step_loc_t; typedef enum { - REQUEST_SIGNAL_PROCESS_GROUP = 0, + REQUEST_CONNECT = 0, + REQUEST_SIGNAL_PROCESS_GROUP, REQUEST_SIGNAL_TASK_LOCAL, REQUEST_SIGNAL_TASK_GLOBAL, REQUEST_SIGNAL_CONTAINER, REQUEST_STATE, + REQUEST_INFO, REQUEST_ATTACH, REQUEST_PID_IN_CONTAINER, REQUEST_DAEMON_PID @@ -59,32 +61,52 @@ typedef enum { SLURMSTEPD_STEP_ENDING } slurmstepd_state_t; +typedef struct { + uid_t uid; + uint32_t jobid; + uint32_t stepid; +} slurmstepd_info_t; + +/* + * Connect to a slurmstepd proccess by way of its unix domain socket. + * + * Returns a socket descriptor for the opened socket on success, + * and -1 on error. + */ +int stepd_connect(char *directory, char *nodename, + uint32_t jobid, uint32_t stepid); + /* * Retrieve a job step's current state. */ -slurmstepd_state_t stepd_state(step_loc_t step); +slurmstepd_state_t stepd_state(int fd); + +/* + * Retrieve slurmstepd_info_t structure for a job step. + * + * Must be xfree'd by the caller. + */ +slurmstepd_info_t *stepd_get_info(int fd); /* * Send a signal to the process group of a job step. */ -int stepd_signal(step_loc_t step, void *auth_cred, int signal); +int stepd_signal(int fd, int signal); /* * Send a signal to a single task in a job step. */ -int stepd_signal_task_local(step_loc_t step, void *auth_cred, - int signal, int ltaskid); +int stepd_signal_task_local(int fd, int signal, int ltaskid); /* * Send a signal to a single task in a job step. */ -int stepd_signal_task_global(step_loc_t step, void *auth_cred, - int signal, int gtaskid); +int stepd_signal_task_global(int fd, int signal, int gtaskid); /* * Send a signal to the proctrack container of a job step. */ -int stepd_signal_container(step_loc_t step, void *auth_cred, int signal); +int stepd_signal_container(int fd, int signal); /* * Attach a client to a running job step. @@ -96,9 +118,8 @@ int stepd_signal_container(step_loc_t step, void *auth_cred, int signal); * probably be moved into a more generic stepd_api call so that * this header does not need to include slurm_protocol_defs.h. */ -int stepd_attach(step_loc_t step, slurm_addr *ioaddr, slurm_addr *respaddr, - void *auth_cred, slurm_cred_t job_cred, - reattach_tasks_response_msg_t *resp); +int stepd_attach(int fd, slurm_addr *ioaddr, slurm_addr *respaddr, + void *job_cred_sig, reattach_tasks_response_msg_t *resp); /* * Scan for available running slurm step daemons by checking @@ -112,12 +133,12 @@ List stepd_available(const char *directory, const char *nodename); * Return true if the process with process ID "pid" is found in * the proctrack container of the slurmstepd "step". */ -bool stepd_pid_in_container(step_loc_t step, pid_t pid); +bool stepd_pid_in_container(int fd, pid_t pid); /* * Return the process ID of the slurmstepd. */ -pid_t stepd_daemon_pid(step_loc_t step); +pid_t stepd_daemon_pid(int fd); #define safe_read(fd, ptr, size) do { \ if (read(fd, ptr, size) != size) { \ diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 0e6af2cf16c..3cae24ef322 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -846,19 +846,34 @@ _rpc_ping(slurm_msg_t *msg, slurm_addr *cli_addr) static void _rpc_signal_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) { + int fd; int rc = SLURM_SUCCESS; uid_t req_uid; kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; - step_loc_t loc; + slurmstepd_info_t *step; - /* - * Use slurmstepd API to request that the slurmdstepd handle - * the signalling. - */ - loc.directory = conf->spooldir; - loc.nodename = conf->node_name; - loc.jobid = req->job_id; - loc.stepid = req->job_step_id; + fd = stepd_connect(conf->spooldir, conf->node_name, + req->job_id, req->job_step_id); + if (fd == -1) { + error("stepd_connect failed: %m"); + rc = EPERM; + goto done; + } + if ((step = stepd_get_info(fd)) == NULL) { + debug("kill for nonexistent job %u.%u requested", + req->job_id, req->job_step_id); + rc = ESLURM_INVALID_JOB_ID; + goto done2; + } + + req_uid = g_slurm_auth_get_uid(msg->cred); + if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { + debug("kill req from uid %ld for job %u.%u owned by uid %ld", + (long) req_uid, req->job_id, req->job_step_id, + (long) step->uid); + rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ + goto done3; + } #ifdef HAVE_AIX # ifdef SIGMIGRATE @@ -867,18 +882,22 @@ _rpc_signal_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) * These signals are not sent to the entire process group, but just a * single process, namely the PMD. */ if (req->signal == SIGMIGRATE || req->signal == SIGSOUND) { - rc = stepd_signal_task_local(loc, msg->cred, req->signal, 0); + rc = stepd_signal_task_local(fd, req->signal, 0); goto done; } # endif # endif #endif - rc = stepd_signal(loc, msg->cred, req->signal); + rc = stepd_signal(fd, req->signal); if (rc == -1) rc = ESLURMD_JOB_NOTRUNNING; - done: +done3: + xfree(step); +done2: + close(fd); +done: slurm_send_rc_msg(msg, rc); } @@ -886,23 +905,44 @@ static void _rpc_terminate_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) { kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; - step_loc_t loc; int rc = SLURM_SUCCESS; + int fd; + uid_t req_uid; + slurmstepd_info_t *step; debug3("Entering _rpc_terminate_tasks"); - /* - * Use slurmstepd API to request that the slurmdstepd handle - * the signalling. - */ - loc.directory = conf->spooldir; - loc.nodename = conf->node_name; - loc.jobid = req->job_id; - loc.stepid = req->job_step_id; - rc = stepd_signal_container(loc, msg->cred, req->signal); + fd = stepd_connect(conf->spooldir, conf->node_name, + req->job_id, req->job_step_id); + if (fd == -1) { + error("stepd_connect failed: %m"); + rc = EPERM; + goto done; + } + if (!(step = stepd_get_info(fd))) { + debug("kill for nonexistent job %u.%u requested", + req->job_id, req->job_step_id); + rc = ESLURM_INVALID_JOB_ID; + goto done2; + } + + req_uid = g_slurm_auth_get_uid(msg->cred); + if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { + debug("kill req from uid %ld for job %u.%u owned by uid %ld", + (long) req_uid, req->job_id, req->job_step_id, + (long) step->uid); + rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ + goto done3; + } + + rc = stepd_signal_container(fd, req->signal); if (rc == -1) rc = ESLURMD_JOB_NOTRUNNING; - done: +done3: + xfree(step); +done2: + close(fd); +done: slurm_send_rc_msg(msg, rc); } @@ -953,12 +993,19 @@ static void _rpc_pid2jid(slurm_msg_t *msg, slurm_addr *cli) steps = stepd_available(conf->spooldir, conf->node_name); i = list_iterator_create(steps); while (stepd = list_next(i)) { - if (stepd_pid_in_container(*stepd, req->job_pid) - || req->job_pid == stepd_daemon_pid(*stepd)) { + int fd; + fd = stepd_connect(stepd->directory, stepd->nodename, + stepd->jobid, stepd->stepid); + if (fd == -1) + continue; + if (stepd_pid_in_container(fd, req->job_pid) + || req->job_pid == stepd_daemon_pid(fd)) { resp.job_id = stepd->jobid; found = true; + close(fd); break; } + close(fd); } list_iterator_destroy(i); list_destroy(steps); @@ -981,58 +1028,93 @@ static void _rpc_reattach_tasks(slurm_msg_t *msg, slurm_addr *cli) { reattach_tasks_request_msg_t *req = msg->data; - reattach_tasks_response_msg_t resp; + reattach_tasks_response_msg_t *resp; slurm_msg_t resp_msg; int rc = SLURM_SUCCESS; uint16_t port = 0; char host[MAXHOSTNAMELEN]; int i; slurm_addr ioaddr; - step_loc_t loc; + void *job_cred_sig; + int len; + int fd; + uid_t req_uid; + slurmstepd_info_t *step; + + resp = xmalloc(sizeof(reattach_tasks_response_msg_t)); + memset(&resp_msg, 0, sizeof(slurm_msg_t)); + fd = stepd_connect(conf->spooldir, conf->node_name, + req->job_id, req->job_step_id); + if (fd == -1) { + error("stepd_connect failed: %m"); + rc = EPERM; + goto done; + } + if ((step = stepd_get_info(fd)) == NULL) { + debug("kill for nonexistent job %u.%u requested", + req->job_id, req->job_step_id); + rc = ESLURM_INVALID_JOB_ID; + goto done2; + } + + if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { + error("uid %ld attempt to attach to job %u.%u owned by %ld", + (long) req_uid, req->job_id, req->job_step_id, + (long) step->uid); + rc = EPERM; + goto done3; + } - memset(&resp, 0, sizeof(reattach_tasks_response_msg_t)); + memset(resp, 0, sizeof(reattach_tasks_response_msg_t)); slurm_get_ip_str(cli, &port, host, sizeof(host)); /* - * Set response addr by resp_port and client address + * Set response address by resp_port and client address */ memcpy(&resp_msg.address, cli, sizeof(slurm_addr)); slurm_set_addr(&resp_msg.address, req->resp_port, NULL); /* - * Set IO address and by io_port and client address + * Set IO address by io_port and client address */ memcpy(&ioaddr, cli, sizeof(slurm_addr)); slurm_set_addr(&ioaddr, req->io_port, NULL); /* - * Use slurmstepd API to request that the slurmdstepd handle - * the attach. + * Get the signature of the job credential. slurmstepd will need + * this to prove its identity when it connects back to srun. */ - loc.directory = conf->spooldir; - loc.nodename = conf->node_name; - loc.jobid = req->job_id; - loc.stepid = req->job_step_id; - rc = stepd_attach(loc, &ioaddr, &resp_msg.address, - msg->cred, req->cred, &resp); + slurm_cred_get_signature(req->cred, (char **)(&job_cred_sig), &len); + xassert(len == SLURM_CRED_SIGLEN); + + resp->gtids = NULL; + resp->local_pids = NULL; + /* Following call fills in gtids and local_pids when successful */ + rc = stepd_attach(fd, &ioaddr, &resp_msg.address, job_cred_sig, resp); if (rc != SLURM_SUCCESS) { debug2("stepd_attach call failed"); - goto done; + goto done3; } - done: +done3: + xfree(step); +done2: + close(fd); +done: debug2("update step addrs rc = %d", rc); - resp_msg.data = &resp; + resp_msg.data = resp; resp_msg.msg_type = RESPONSE_REATTACH_TASKS; - resp.node_name = conf->node_name; - resp.srun_node_id = req->srun_node_id; - resp.return_code = rc; + resp->node_name = conf->node_name; + resp->srun_node_id = req->srun_node_id; + resp->return_code = rc; slurm_send_only_node_msg(&resp_msg); - xfree(resp.gtids); - xfree(resp.local_pids); - + if (resp->gtids) + xfree(resp->gtids); + if (resp->local_pids) + xfree(resp->local_pids); + xfree(resp); } /* @@ -1049,6 +1131,7 @@ _kill_all_active_steps(void *auth_cred, uint32_t jobid, int sig, bool batch) ListIterator i; step_loc_t *stepd; int step_cnt = 0; + int fd; steps = stepd_available(conf->spooldir, conf->node_name); i = list_iterator_create(steps); @@ -1065,10 +1148,19 @@ _kill_all_active_steps(void *auth_cred, uint32_t jobid, int sig, bool batch) step_cnt++; + fd = stepd_connect(stepd->directory, stepd->nodename, + stepd->jobid, stepd->stepid); + if (fd == -1) { + debug3("Unable to connect to step %u.%u", + stepd->jobid, stepd->stepid); + continue; + } + debug2("container signal %d to job %u.%u", sig, jobid, stepd->stepid); - if (stepd_signal_container(*stepd, auth_cred, sig) < 0) + if (stepd_signal_container(fd, sig) < 0) debug("kill jobid=%u failed: %m", jobid); + close(fd); } list_iterator_destroy(i); list_destroy(steps); @@ -1088,10 +1180,18 @@ _job_still_running(uint32_t job_id) steps = stepd_available(conf->spooldir, conf->node_name); i = list_iterator_create(steps); while ((s = list_next(i))) { - if (s->jobid == job_id - && stepd_state(*s) != SLURMSTEPD_NOT_RUNNING) { - retval = true; - break; + if (s->jobid == job_id) { + int fd; + fd = stepd_connect(s->directory, s->nodename, + s->jobid, s->stepid); + if (fd == -1) + continue; + if (stepd_state(fd) != SLURMSTEPD_NOT_RUNNING) { + retval = true; + close(fd); + break; + } + close(fd); } } list_iterator_destroy(i); @@ -1137,10 +1237,18 @@ _steps_completed_now(uint32_t jobid) steps = stepd_available(conf->spooldir, conf->node_name); i = list_iterator_create(steps); while (stepd = list_next(i)) { - if (stepd->jobid == jobid - && stepd_state(*stepd) != SLURMSTEPD_NOT_RUNNING) { - rc = false; - break; + if (stepd->jobid == jobid) { + int fd; + fd = stepd_connect(stepd->directory, stepd->nodename, + stepd->jobid, stepd->stepid); + if (fd == -1) + continue; + if (stepd_state(fd) != SLURMSTEPD_NOT_RUNNING) { + rc = false; + close(fd); + break; + } + close(fd); } } list_iterator_destroy(i); diff --git a/src/slurmd/slurmd/slurmd.c b/src/slurmd/slurmd/slurmd.c index 64053ba502e..1aa2175c07d 100644 --- a/src/slurmd/slurmd/slurmd.c +++ b/src/slurmd/slurmd/slurmd.c @@ -424,12 +424,21 @@ _fill_registration_msg(slurm_node_registration_status_msg_t *msg) i = list_iterator_create(steps); n = 0; while (stepd = list_next(i)) { - if (stepd_state(*stepd) == SLURMSTEPD_NOT_RUNNING) { + int fd; + fd = stepd_connect(stepd->directory, stepd->nodename, + stepd->jobid, stepd->stepid); + if (fd == -1) { + --(msg->job_count); + continue; + } + if (stepd_state(fd) == SLURMSTEPD_NOT_RUNNING) { debug("stale domain socket for stepd %u.%u ", stepd->jobid, stepd->stepid); --(msg->job_count); + close(fd); continue; } + close(fd); if (stepd->stepid == NO_VAL) debug("found apparently running job %u", stepd->jobid); else diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c index defbe0f4f53..2fdfa51b1af 100644 --- a/src/slurmd/slurmstepd/mgr.c +++ b/src/slurmd/slurmstepd/mgr.c @@ -649,7 +649,7 @@ _fork_all_tasks(slurmd_job_t *job) * Now it's ok to unblock the tasks, so they may call exec. */ for (i = 0; i < job->ntasks; i++) { - char c; + char c = '\0'; debug3("Unblocking %u.%u task %d, writefd = %d", job->jobid, job->stepid, i, writefds[i]); diff --git a/src/slurmd/slurmstepd/req.c b/src/slurmd/slurmstepd/req.c index c6940639900..875a68456ee 100644 --- a/src/slurmd/slurmstepd/req.c +++ b/src/slurmd/slurmstepd/req.c @@ -46,14 +46,16 @@ #include "src/slurmd/slurmstepd/slurmstepd_job.h" #include "src/slurmd/slurmstepd/req.h" -static void *_handle_request(void *arg); -static void _handle_state(int fd, slurmd_job_t *job); -static void _handle_signal_process_group(int fd, slurmd_job_t *job); -static void _handle_signal_task_local(int fd, slurmd_job_t *job); -static void _handle_signal_container(int fd, slurmd_job_t *job); -static void _handle_attach(int fd, slurmd_job_t *job); -static void _handle_pid_in_container(int fd, slurmd_job_t *job); -static void _handle_daemon_pid(int fd, slurmd_job_t *job); +static void *_handle_accept(void *arg); +static int _handle_request(int fd, slurmd_job_t *job, uid_t uid, gid_t gid); +static int _handle_state(int fd, slurmd_job_t *job); +static int _handle_info(int fd, slurmd_job_t *job); +static int _handle_signal_process_group(int fd, slurmd_job_t *job, uid_t uid); +static int _handle_signal_task_local(int fd, slurmd_job_t *job, uid_t uid); +static int _handle_signal_container(int fd, slurmd_job_t *job, uid_t uid); +static int _handle_attach(int fd, slurmd_job_t *job, uid_t uid); +static int _handle_pid_in_container(int fd, slurmd_job_t *job); +static int _handle_daemon_pid(int fd, slurmd_job_t *job); static bool _msg_socket_readable(eio_obj_t *obj); static int _msg_socket_accept(eio_obj_t *obj, List objs); @@ -193,6 +195,8 @@ msg_thr_create(slurmd_job_t *job) eio_new_initial_obj(job->msg_handle, eio_obj); slurm_attr_init(&attr); + if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)) + error("pthread_attr_setdetachstate: %m"); if (pthread_create(&job->msgid, &attr, &_msg_thr_internal, (void *)job) != 0) { error("pthread_create: %m"); @@ -230,7 +234,7 @@ _msg_socket_accept(eio_obj_t *obj, List objs) pthread_attr_t attr; pthread_t id; - debug3("Called _msg_socket_read"); + debug3("Called _msg_socket_accept"); while ((fd = accept(obj->fd, (struct sockaddr *)&addr, (socklen_t *)&len)) < 0) { @@ -246,8 +250,7 @@ _msg_socket_accept(eio_obj_t *obj, List objs) return SLURM_SUCCESS; } - /* FIXME should really create a pthread to handle the message */ - + fd_set_close_on_exec(fd); fd_set_blocking(fd); slurm_attr_init(&attr); @@ -260,116 +263,191 @@ _msg_socket_accept(eio_obj_t *obj, List objs) param = xmalloc(sizeof(struct request_params)); param->fd = fd; param->job = job; - if (pthread_create(&id, &attr, &_handle_request, (void *)param) != 0) { + if (pthread_create(&id, &attr, &_handle_accept, (void *)param) != 0) { error("stepd_api message engine pthread_create: %m"); - _handle_request((void *)param); + _handle_accept((void *)param); } + param = NULL; + debug3("Leaving _msg_socket_accept"); return SLURM_SUCCESS; } static void * -_handle_request(void *arg) +_handle_accept(void *arg) { - struct request_params *param = (struct request_params *)arg; + /*struct request_params *param = (struct request_params *)arg;*/ + int fd = ((struct request_params *)arg)->fd; + slurmd_job_t *job = ((struct request_params *)arg)->job; int req; + int len; + Buf buffer; + void *auth_cred; + int rc; + uid_t uid; + gid_t gid; - debug3("Entering _handle_message"); + debug3("Entering _handle_accept (new thread)"); + xfree(arg); - if (read(param->fd, &req, sizeof(req)) != sizeof(req)) { - error("Could not read request type: %m"); + safe_read(fd, &req, sizeof(int)); + if (req != REQUEST_CONNECT) { + error("First message must be REQUEST_CONNECT"); goto fail; } + safe_read(fd, &len, sizeof(int)); + buffer = init_buf(len); + safe_read(fd, get_buf_data(buffer), len); + + /* Unpack and verify the auth credential */ + auth_cred = g_slurm_auth_unpack(buffer); + if (auth_cred == NULL) { + error("Unpacking authentication credential: %s", + g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); + free_buf(buffer); + goto fail; + } + rc = g_slurm_auth_verify(auth_cred, NULL, 2); + if (rc != SLURM_SUCCESS) { + error("Verifying authentication credential: %s", + g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred))); + (void) g_slurm_auth_destroy(auth_cred); + free_buf(buffer); + goto fail; + } + + /* Get the uid & gid from the credential, then destroy it. */ + uid = g_slurm_auth_get_uid(auth_cred); + gid = g_slurm_auth_get_gid(auth_cred); + debug3(" Identity: uid=%d, gid=%d", uid, gid); + g_slurm_auth_destroy(auth_cred); + free_buf(buffer); + + rc = SLURM_SUCCESS; + safe_write(fd, &rc, sizeof(int)); + + while (1) { + rc = _handle_request(fd, job, uid, gid); + if (rc != SLURM_SUCCESS) + break; + } + + if (close(fd) == -1) + error("Closing accepted fd: %m"); + debug("Leaving _handle_accept"); + return NULL; + +fail: + rc = SLURM_FAILURE; + safe_write(fd, &rc, sizeof(int)); +rwfail: + if (close(fd) == -1) + error("Closing accepted fd after error: %m"); + debug("Leaving _handle_accept on an error"); + return NULL; +} + + +int +_handle_request(int fd, slurmd_job_t *job, uid_t uid, gid_t gid) +{ + int rc = SLURM_SUCCESS; + int req; + + debug3("Entering _handle_request"); + safe_read(fd, &req, sizeof(int)); + debug3("Got request"); switch (req) { case REQUEST_SIGNAL_PROCESS_GROUP: debug("Handling REQUEST_SIGNAL_PROCESS_GROUP"); - _handle_signal_process_group(param->fd, param->job); + rc = _handle_signal_process_group(fd, job, uid); break; case REQUEST_SIGNAL_TASK_LOCAL: debug("Handling REQUEST_SIGNAL_TASK_LOCAL"); - _handle_signal_task_local(param->fd, param->job); + rc = _handle_signal_task_local(fd, job, uid); break; case REQUEST_SIGNAL_TASK_GLOBAL: debug("Handling REQUEST_SIGNAL_TASK_LOCAL (not implemented)"); break; case REQUEST_SIGNAL_CONTAINER: debug("Handling REQUEST_SIGNAL_CONTAINER"); - _handle_signal_container(param->fd, param->job); + rc = _handle_signal_container(fd, job, uid); break; case REQUEST_STATE: debug("Handling REQUEST_STATE"); - _handle_state(param->fd, param->job); + rc = _handle_state(fd, job); + break; + case REQUEST_INFO: + debug("Handling REQUEST_INFO"); + rc = _handle_info(fd, job); break; case REQUEST_ATTACH: debug("Handling REQUEST_ATTACH"); - _handle_attach(param->fd, param->job); + rc = _handle_attach(fd, job, uid); break; case REQUEST_PID_IN_CONTAINER: debug("Handling REQUEST_PID_IN_CONTAINER"); - _handle_pid_in_container(param->fd, param->job); + rc = _handle_pid_in_container(fd, job); break; case REQUEST_DAEMON_PID: debug("Handling REQUEST_DAEMON_PID"); - _handle_daemon_pid(param->fd, param->job); + rc = _handle_daemon_pid(fd, job); break; default: error("Unrecognized request: %d", req); + rc = SLURM_FAILURE; break; } -fail: - close(param->fd); - xfree(arg); - debug3("Leaving _handle_message"); + debug3("Leaving _handle_request: %s", + rc ? "SLURM_FAILURE" : "SLURM_SUCCESS"); + return rc; + +rwfail: + debug3("Leaving _handle_request on read error"); + return SLURM_FAILURE; } -static void +static int _handle_state(int fd, slurmd_job_t *job) { - int status = 0; - safe_write(fd, &job->state, sizeof(slurmstepd_state_t)); + + return SLURM_SUCCESS; rwfail: - return; + return SLURM_FAILURE; } -static void -_handle_signal_process_group(int fd, slurmd_job_t *job) +static int +_handle_info(int fd, slurmd_job_t *job) +{ + safe_write(fd, &job->uid, sizeof(uid_t)); + safe_write(fd, &job->jobid, sizeof(uint32_t)); + safe_write(fd, &job->stepid, sizeof(uint32_t)); + + return SLURM_SUCCESS; +rwfail: + return SLURM_FAILURE; +} + +static int +_handle_signal_process_group(int fd, slurmd_job_t *job, uid_t uid) { int rc = SLURM_SUCCESS; int signal; - int buf_len = 0; - Buf buf; - void *auth_cred; - int uid; debug("_handle_signal_process_group for job %u.%u", job->jobid, job->stepid); safe_read(fd, &signal, sizeof(int)); - safe_read(fd, &buf_len, sizeof(int)); - buf = init_buf(buf_len); - safe_read(fd, get_buf_data(buf), buf_len); - - debug3(" buf_len = %d", buf_len); - if ((auth_cred = g_slurm_auth_unpack(buf)) == NULL) { - error("g_slurm_auth_unpack: %s", - g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); - rc = EPERM; - goto done; - } - /* - * Authenticate the user using the auth credential. - */ - uid = g_slurm_auth_get_uid(auth_cred); debug3(" uid = %d", uid); if (uid != job->uid && !_slurm_authorized_user(uid)) { debug("kill req from uid %ld for job %u.%u owned by uid %ld", (long)uid, job->jobid, job->stepid, (long)job->uid); rc = EPERM; - goto done2; + goto done; } /* @@ -379,7 +457,7 @@ _handle_signal_process_group(int fd, slurmd_job_t *job) debug ("step %u.%u invalid [jmgr_pid:%d pgid:%u]", job->jobid, job->stepid, job->jmgr_pid, job->pgid); rc = ESLURMD_JOB_NOTRUNNING; - goto done2; + goto done; } if (killpg(job->pgid, signal) == -1) { @@ -392,53 +470,33 @@ _handle_signal_process_group(int fd, slurmd_job_t *job) signal, job->jobid, job->stepid, job->pgid); } -done2: - g_slurm_auth_destroy(auth_cred); done: - free_buf(buf); /* takes care of xfree'ing data as well */ /* Send the return code */ safe_write(fd, &rc, sizeof(int)); + return SLURM_SUCCESS; rwfail: - return; + return SLURM_FAILURE; } -static void -_handle_signal_task_local(int fd, slurmd_job_t *job) +static int +_handle_signal_task_local(int fd, slurmd_job_t *job, uid_t uid) { int rc = SLURM_SUCCESS; int signal; int ltaskid; /* local task index */ - int buf_len = 0; - Buf buf; - void *auth_cred; - int uid; debug("_handle_signal_task_local for job %u.%u", job->jobid, job->stepid); safe_read(fd, &signal, sizeof(int)); safe_read(fd, <askid, sizeof(int)); - safe_read(fd, &buf_len, sizeof(int)); - buf = init_buf(buf_len); - safe_read(fd, get_buf_data(buf), buf_len); - - debug3(" buf_len = %d", buf_len); - if ((auth_cred = g_slurm_auth_unpack(buf)) == NULL) { - error("unpack of the auth_cred unsuccessful"); - rc = EPERM; - goto done; - } - /* - * Authenticate the user using the auth credential. - */ - uid = g_slurm_auth_get_uid(auth_cred); debug3(" uid = %d", uid); if (uid != job->uid && !_slurm_authorized_user(uid)) { debug("kill req from uid %ld for job %u.%u owned by uid %ld", (long)uid, job->jobid, job->stepid, (long)job->uid); rc = EPERM; - goto done2; + goto done; } /* @@ -448,21 +506,21 @@ _handle_signal_task_local(int fd, slurmd_job_t *job) debug("step %u.%u invalid local task id %d", job->jobid, job->stepid, ltaskid); rc = SLURM_ERROR; - goto done2; + goto done; } if (!job->task || !job->task[ltaskid]) { debug("step %u.%u no task info for task id %d", job->jobid, job->stepid, ltaskid); rc = SLURM_ERROR; - goto done2; + goto done; } if (job->task[ltaskid]->pid <= 1) { debug("step %u.%u invalid pid %d for task %d", job->jobid, job->stepid, job->task[ltaskid]->pid, ltaskid); rc = SLURM_ERROR; - goto done2; + goto done; } /* @@ -479,47 +537,25 @@ _handle_signal_task_local(int fd, slurmd_job_t *job) job->task[ltaskid]->pid); } -done2: - g_slurm_auth_destroy(auth_cred); done: - free_buf(buf); /* takes care of xfree'ing data as well */ /* Send the return code */ safe_write(fd, &rc, sizeof(int)); + return SLURM_SUCCESS; rwfail: - return; + return SLURM_FAILURE; } -static void -_handle_signal_container(int fd, slurmd_job_t *job) +static int +_handle_signal_container(int fd, slurmd_job_t *job, uid_t uid) { int rc = SLURM_SUCCESS; int signal; - int buf_len = 0; - Buf buf; - void *auth_cred; - int uid; debug("_handle_signal_container for job %u.%u", job->jobid, job->stepid); safe_read(fd, &signal, sizeof(int)); - safe_read(fd, &buf_len, sizeof(int)); - buf = init_buf(buf_len); - safe_read(fd, get_buf_data(buf), buf_len); - - debug3(" buf_len = %d", buf_len); - if ((auth_cred = g_slurm_auth_unpack(buf)) == NULL) { - error("g_slurm_auth_unpack: %s", - g_slurm_auth_errstr(g_slurm_auth_errno(NULL))); - rc = -1; - errno = EPERM; - goto done; - } - /* - * Authenticate the user using the auth credential. - */ - uid = g_slurm_auth_get_uid(auth_cred); debug3(" uid = %d", uid); if (uid != job->uid && !_slurm_authorized_user(uid)) { debug("kill container req from uid %ld for job %u.%u " @@ -527,7 +563,7 @@ _handle_signal_container(int fd, slurmd_job_t *job) (long)uid, job->jobid, job->stepid, (long)job->uid); rc = -1; errno = EPERM; - goto done2; + goto done; } /* @@ -538,7 +574,7 @@ _handle_signal_container(int fd, slurmd_job_t *job) job->jobid, job->stepid, job->cont_id); rc = -1; errno = ESLURMD_JOB_NOTRUNNING; - goto done2; + goto done; } if (slurm_container_signal(job->cont_id, signal) < 0) { @@ -551,100 +587,69 @@ _handle_signal_container(int fd, slurmd_job_t *job) signal, job->jobid, job->stepid); } -done2: - g_slurm_auth_destroy(auth_cred); done: - free_buf(buf); /* takes care of xfree'ing data as well */ /* Send the return code and errno */ safe_write(fd, &rc, sizeof(int)); safe_write(fd, &errno, sizeof(int)); + return SLURM_SUCCESS; rwfail: - return; + return SLURM_FAILURE; } -static void -_handle_attach(int fd, slurmd_job_t *job) +static int +_handle_attach(int fd, slurmd_job_t *job, uid_t uid) { srun_info_t *srun; int rc = SLURM_SUCCESS; - int buf_len = 0; - Buf buf; - void *auth_cred; - slurm_cred_t job_cred; int sig_len; - int uid, gid; debug("_handle_request_attach for job %u.%u", job->jobid, job->stepid); srun = xmalloc(sizeof(*srun)); - srun->key = xmalloc(sizeof(*srun->key)); + srun->key = (srun_key_t *)xmalloc(sizeof(SLURM_CRED_SIGLEN)); + debug("sizeof(srun_info_t) = %d, sizeof(slurm_addr) = %d", + sizeof(srun_info_t), sizeof(slurm_addr)); safe_read(fd, &srun->ioaddr, sizeof(slurm_addr)); safe_read(fd, &srun->resp_addr, sizeof(slurm_addr)); - safe_read(fd, &buf_len, sizeof(int)); - buf = init_buf(buf_len); - safe_read(fd, get_buf_data(buf), buf_len); - - debug3("buf_len = %d", buf_len); - if ((auth_cred = g_slurm_auth_unpack(buf)) == NULL) { - error("unpack of the auth_cred unsuccessful"); - rc = EPERM; - goto done; - } - job_cred = slurm_cred_unpack(buf); + safe_read(fd, srun->key, SLURM_CRED_SIGLEN); /* * Check if jobstep is actually running. */ if (job->state != SLURMSTEPD_STEP_RUNNING) { rc = ESLURMD_JOB_NOTRUNNING; - goto done2; + goto done; } - /* - * Authenticate the user using the auth credential. - */ - uid = g_slurm_auth_get_uid(auth_cred); - gid = g_slurm_auth_get_gid(auth_cred); - debug3(" uid = %d, gid = %d", uid, gid); - if (uid != job->uid && gid != job->gid) { + if (!_slurm_authorized_user(uid)) { error("uid %ld attempt to attach to job %u.%u owned by %ld", - (long) uid, job->jobid, job->stepid, - (long) job->uid); + (long) uid, job->jobid, job->stepid, (long)job->uid); rc = EPERM; - goto done2; + goto done; } - /* - * Get the signature of the job credential to send back to srun. - */ - slurm_cred_get_signature(job_cred, (void *)&srun->key, &sig_len); - xassert(sig_len <= SLURM_CRED_SIGLEN); - list_prepend(job->sruns, (void *) srun); - rc = io_client_connect(srun, job); -done2: - g_slurm_auth_destroy(auth_cred); + debug(" back from io_client_connect, rc = %d", rc); done: - free_buf(buf); /* takes care of xfree'ing data as well */ /* Send the return code */ safe_write(fd, &rc, sizeof(int)); - debug("In _handle_attach rc = %d", rc); + debug(" in _handle_attach rc = %d", rc); if (rc == SLURM_SUCCESS) { /* Send response info */ uint32_t *pids, *gtids; int len, i; - debug("In _handle_attach sending response info"); + debug(" in _handle_attach sending response info"); len = job->ntasks * sizeof(uint32_t); pids = xmalloc(len); gtids = xmalloc(len); if (job->task != NULL) { for (i = 0; i < job->ntasks; i++) { - if (job->task == NULL) + if (job->task[i] == NULL) continue; pids[i] = (uint32_t)job->task[i]->pid; gtids[i] = job->task[i]->gtid; @@ -662,11 +667,12 @@ done: safe_write(fd, job->argv[0], len); } + return SLURM_SUCCESS; rwfail: - return; + return SLURM_FAILURE; } -static void +static int _handle_pid_in_container(int fd, slurmd_job_t *job) { bool rc = false; @@ -687,15 +693,19 @@ _handle_pid_in_container(int fd, slurmd_job_t *job) /* Send the return code */ safe_write(fd, &rc, sizeof(bool)); -rwfail: debug("Leaving _handle_pid_in_container"); + return SLURM_SUCCESS; +rwfail: + return SLURM_FAILURE; } -static void +static int _handle_daemon_pid(int fd, slurmd_job_t *job) { safe_write(fd, &job->jmgr_pid, sizeof(pid_t)); + + return SLURM_SUCCESS; rwfail: - return; + return SLURM_FAILURE; } -- GitLab