From 7381eeea8a454a4c501fc8b6e8bb1e4816c60792 Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" <morrone2@llnl.gov> Date: Wed, 19 Oct 2005 02:14:49 +0000 Subject: [PATCH] Fixes a flaw in client eio_obj_t setup. There are differences in how the initial client eio object and later "srun --attach" objects are handled due to the caching. Modifications to the eio engine to move the List of eio_obj_t into the opaque eio_handle_t. New eio calls "eio_new_initial_obj" and "eio_new_obj", both of which add an eio_obj_t to the eio_handle_t's internal object List. However, only the later function is safe to call when the eio mainloop is running. Also caught in this commit is some initial code to implement the RPCs REQUEST_TERMINATE_* and REQUEST_SIGNAL_*. --- src/common/eio.c | 67 +++++++++++--- src/common/eio.h | 18 +++- src/slurmd/io.c | 73 +++++++++++++-- src/slurmd/io.h | 9 ++ src/slurmd/mgr.c | 15 ++-- src/slurmd/req.c | 195 ++++++++++++++++++++++++++++++---------- src/slurmd/slurmd_job.c | 10 +-- src/slurmd/slurmd_job.h | 1 - src/srun/io.c | 14 +-- src/srun/srun_job.c | 3 +- src/srun/srun_job.h | 1 - 11 files changed, 315 insertions(+), 91 deletions(-) diff --git a/src/common/eio.c b/src/common/eio.c index 8d98305e52c..55909788ad1 100644 --- a/src/common/eio.c +++ b/src/common/eio.c @@ -38,7 +38,11 @@ #include "src/common/fd.h" #include "src/common/eio.h" - +/* + * outside threads can stick new objects on the new_objs List and + * the eio thread will move them to the main obj_list the next time + * it wakes up. + */ struct eio_handle_components { #ifndef NDEBUG # define EIO_MAGIC 0xe1e10 @@ -46,6 +50,7 @@ struct eio_handle_components { #endif int fds[2]; List obj_list; + List new_objs; }; @@ -60,7 +65,7 @@ static void _poll_dispatch(struct pollfd *, unsigned int, eio_obj_t **, static void _poll_handle_event(short revents, eio_obj_t *obj, List objList); -eio_handle_t *eio_handle_create(List eio_obj_list) +eio_handle_t *eio_handle_create(void) { eio_handle_t *eio = xmalloc(sizeof(*eio)); @@ -74,7 +79,8 @@ eio_handle_t *eio_handle_create(List eio_obj_list) xassert(eio->magic = EIO_MAGIC); - eio->obj_list = eio_obj_list; + eio->obj_list = list_create(NULL); /* FIXME! Needs destructor */ + eio->new_objs = list_create(NULL); return eio; } @@ -85,7 +91,7 @@ void eio_handle_destroy(eio_handle_t *eio) xassert(eio->magic == EIO_MAGIC); close(eio->fds[0]); close(eio->fds[1]); - /* FIXME - Destroy the eio object list here ? */ + /* FIXME - Destroy obj_list and new_objs */ xassert(eio->magic = ~EIO_MAGIC); xfree(eio); } @@ -118,16 +124,22 @@ static void _mark_shutdown_true(List obj_list) list_iterator_destroy(objs); } -static int _eio_clear(eio_handle_t *eio) +static int _eio_wakeup_handler(eio_handle_t *eio) { char c = 0; int rc = 0; + eio_obj_t *obj; while ((rc = (read(eio->fds[0], &c, 1)) > 0)) { if (c == 1) _mark_shutdown_true(eio->obj_list); } + /* move new eio objects from the new_objs to the obj_list */ + while (obj = list_dequeue(eio->new_objs)) { + list_enqueue(eio->obj_list, obj); + } + if (rc < 0) return error("eio_clear: read: %m"); return 0; @@ -152,8 +164,9 @@ _poll_loop_internal(eio_handle_t *eio, List objs) for (;;) { - /* Alloc memory for pfds and map if needed */ - if (maxnfds < (n = list_count(objs))) { + /* Alloc memory for pfds and map if needed */ + n = list_count(objs); + if (maxnfds < n) { maxnfds = n; xrealloc(pollfds, (maxnfds+1) * sizeof(struct pollfd)); xrealloc(map, maxnfds * sizeof(eio_obj_t * )); @@ -164,21 +177,18 @@ _poll_loop_internal(eio_handle_t *eio, List objs) debug3("eio: handling events for %d objects", list_count(objs)); - /* - * Clear any pending eio signals - */ - _eio_clear(eio); - if ((nfds = _poll_setup_pollfds(pollfds, map, objs)) <= 0) goto done; /* - * Setup eio handle poll fd + * Setup eio handle signalling fd */ pollfds[nfds].fd = eio->fds[0]; pollfds[nfds].events = POLLIN; nfds++; + debug("nfds = %d, n = %d, maxnfds = %d", + nfds, n, maxnfds); xassert(nfds <= maxnfds + 1); @@ -186,7 +196,7 @@ _poll_loop_internal(eio_handle_t *eio, List objs) goto error; if (pollfds[nfds-1].revents & POLLIN) - _eio_clear(eio); + _eio_wakeup_handler(eio); _poll_dispatch(pollfds, nfds-1, map, objs); } @@ -329,3 +339,32 @@ void eio_obj_destroy(eio_obj_t *obj) xfree(obj); } } + + +/* + * Add an eio_obj_t "obj" to an eio_handle_t "eio"'s internal object list. + * + * This function can only be used to intialize "eio"'s list before + * calling eio_handle_mainloop. If it is used after the eio engine's + * mainloop has started, segfaults are likely. + */ +void eio_new_initial_obj(eio_handle_t *eio, eio_obj_t *obj) +{ + xassert(eio != NULL); + xassert(eio->magic == EIO_MAGIC); + + list_enqueue(eio->obj_list, obj); +} + +/* + * Queue an eio_obj_t "obj" for inclusion in an already running + * eio_handle_t "eio"'s internal object list. + */ +void eio_new_obj(eio_handle_t *eio, eio_obj_t *obj) +{ + xassert(eio != NULL); + xassert(eio->magic == EIO_MAGIC); + + list_enqueue(eio->new_objs, obj); + eio_signal_wakeup(eio); +} diff --git a/src/common/eio.h b/src/common/eio.h index d1ab7daf9aa..6c1d34031d9 100644 --- a/src/common/eio.h +++ b/src/common/eio.h @@ -56,9 +56,25 @@ struct eio_obj { bool shutdown; }; -eio_handle_t *eio_handle_create(List eio_obj_list); +eio_handle_t *eio_handle_create(void); void eio_handle_destroy(eio_handle_t *eio); +/* + * Add an eio_obj_t "obj" to an eio_handle_t "eio"'s internal object list. + * + * This function can only be used to intialize "eio"'s list before + * calling eio_handle_mainloop. If it is used after the eio engine's + * mainloop has started, segfaults are likely. + */ +void eio_new_initial_obj(eio_handle_t *eio, eio_obj_t *obj); + +/* + * Queue an eio_obj_t "obj" for inclusion in an already running + * eio_handle_t "eio"'s internal object list. + */ +void eio_new_obj(eio_handle_t *eio, eio_obj_t *obj); + + /* This routine will watch for activtiy on the fd's as long * as obj->readable() or obj->writable() returns >0 * diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 02fe5bc5039..4cc11f05a04 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -205,7 +205,8 @@ _client_writable(eio_obj_t *obj) } /* If this is a newly attached client its msg_queue needs - * to be intialized from the outgoing_cache + * to be intialized from the outgoing_cache, and then "obj" needs + * to be added to the List of clients. */ if (client->msg_queue == NULL) { ListIterator msgs; @@ -217,6 +218,8 @@ _client_writable(eio_obj_t *obj) list_enqueue(client->msg_queue, msg); } list_iterator_destroy(msgs); + /* and now make this object visible to tasks */ + list_append(client->job->clients, (void *)obj); } if (client->out_msg != NULL) @@ -656,7 +659,7 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) fd_set_close_on_exec(task->to_stdin); fd_set_nonblocking(task->to_stdin); task->in = _create_task_in_eio(task->to_stdin, job); - list_append(job->objs, (void *)task->in); + eio_new_initial_obj(job->eio, (void *)task->in); } /* @@ -686,8 +689,8 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) fd_set_nonblocking(task->from_stdout); task->out = _create_task_out_eio(task->from_stdout, SLURM_IO_STDOUT, job, task); - list_append(job->objs, (void *)task->out); list_append(job->stdout_eio_objs, (void *)task->out); + eio_new_initial_obj(job->eio, (void *)task->out); } /* @@ -717,8 +720,8 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) fd_set_nonblocking(task->from_stderr); task->err = _create_task_out_eio(task->from_stderr, SLURM_IO_STDERR, job, task); - list_append(job->objs, (void *)task->err); list_append(job->stderr_eio_objs, (void *)task->err); + eio_new_initial_obj(job->eio, (void *)task->err); } } @@ -928,6 +931,62 @@ _io_thr(void *arg) return (void *)1; } +/* + * Create the initial TCP connection back to a waiting client (e.g. srun). + * + * Since this is the first client connection and the IO engine has not + * yet started, we initialize the msg_queue as an empty list and + * directly add the eio_obj_t to the eio handle with eio_new_initial_obj. + */ +int +io_initial_client_connect(srun_info_t *srun, slurmd_job_t *job) +{ + int i; + int sock = -1; + struct client_io_info *client; + eio_obj_t *obj; + + debug2 ("adding IO connection (logical node rank %d)", job->nodeid); + + if (srun->ioaddr.sin_addr.s_addr) { + char host[256]; + uint16_t port; + slurmd_get_addr(&srun->ioaddr, &port, host, sizeof(host)); + debug2("connecting IO back to %s:%d", host, ntohs(port)); + } + + if ((sock = (int) slurm_open_stream(&srun->ioaddr)) < 0) { + error("connect io: %m"); + /* XXX retry or silently fail? + * fail for now. + */ + return SLURM_ERROR; + } + + fd_set_blocking(sock); /* just in case... */ + + _send_io_init_msg(sock, srun->key, job); + + debug3(" back from _send_io_init_msg"); + fd_set_nonblocking(sock); + fd_set_close_on_exec(sock); + + /* Now set up the eio object */ + client = xmalloc(sizeof(struct client_io_info)); +#ifndef NDEBUG + client->magic = CLIENT_IO_MAGIC; +#endif + client->job = job; + client->msg_queue = list_create(NULL); /* FIXME - destructor */ + + obj = eio_obj_create(sock, &client_ops, (void *)client); + list_append(job->clients, (void *)obj); + eio_new_initial_obj(job->eio, (void *)obj); + debug3("Now handling %d IO Client object(s)", list_count(job->clients)); + + return SLURM_SUCCESS; +} + /* * Initiate a TCP connection back to a waiting client (e.g. srun). * @@ -974,10 +1033,10 @@ io_client_connect(srun_info_t *srun, slurmd_job_t *job) #endif client->job = job; client->msg_queue = NULL; /* initialized in _client_writable */ + /* client object adds itself to job->clients in _client_writable */ obj = eio_obj_create(sock, &client_ops, (void *)client); - list_append(job->clients, (void *)obj); - list_append(job->objs, (void *)obj); + eio_new_obj(job->eio, (void *)obj); debug3("Now handling %d IO Client object(s)", list_count(job->clients)); @@ -1068,7 +1127,7 @@ _send_eof_msg(struct task_read_info *out) clients = list_iterator_create(out->job->clients); while(eio = list_next(clients)) { client = (struct client_io_info *)eio->arg; - debug3("======================== Enqueued message"); + debug3("======================== Enqueued eof message"); xassert(client->magic == CLIENT_IO_MAGIC); if (list_enqueue(client->msg_queue, msg)) msg->ref_count++; diff --git a/src/slurmd/io.h b/src/slurmd/io.h index 4390ec47459..bb93b9c5072 100644 --- a/src/slurmd/io.h +++ b/src/slurmd/io.h @@ -47,6 +47,15 @@ struct io_buf { struct io_buf *alloc_io_buf(void); void free_io_buf(struct io_buf *buf); +/* + * Create a TCP connection back the initial client (e.g. srun). + * + * Since this is the first client connection and the IO engine has not + * yet started, we initialize the msg_queue as an empty list and + * directly add the eio_obj_t to the eio handle with eio_new_initial_handle. + */ +int io_initial_client_connect(srun_info_t *srun, slurmd_job_t *job); + /* * Initiate a TCP connection back to a waiting client (e.g. srun). * diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 3b47af90811..5a7b99e390e 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -447,6 +447,16 @@ _setup_io(slurmd_job_t *job) error("sete{u/g}id(%lu/%lu): %m", (u_long) spwd->pw_uid, (u_long) spwd->pw_gid); + /* + * MUST create the initial client object before starting + * the IO thread, or we risk losing stdout/err traffic. + */ + if (!job->batch) { + srun_info_t *srun = list_peek(job->sruns); + xassert(srun != NULL); + rc = io_initial_client_connect(srun, job); + } + if (!job->batch) if (io_thread_start(job) < 0) return ESLURMD_IO_ERROR; @@ -456,11 +466,6 @@ _setup_io(slurmd_job_t *job) */ _slurmd_job_log_init(job); - if (!job->batch) { - srun_info_t *srun = list_peek(job->sruns); - xassert(srun != NULL); - rc = io_client_connect(srun, job); - } #ifndef NDEBUG # ifdef PR_SET_DUMPABLE diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 2a94871120c..088fca5226c 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -76,10 +76,12 @@ static int _launch_tasks(launch_tasks_request_msg_t *, slurm_addr *, static void _rpc_launch_tasks(slurm_msg_t *, slurm_addr *); static void _rpc_spawn_task(slurm_msg_t *, slurm_addr *); static void _rpc_batch_job(slurm_msg_t *, slurm_addr *); -static void _rpc_kill_tasks(slurm_msg_t *, slurm_addr *); +static void _rpc_signal_tasks(slurm_msg_t *, slurm_addr *); +static void _rpc_terminate_tasks(slurm_msg_t *, slurm_addr *); static void _rpc_timelimit(slurm_msg_t *, slurm_addr *); static void _rpc_reattach_tasks(slurm_msg_t *, slurm_addr *); -static void _rpc_kill_job(slurm_msg_t *, slurm_addr *); +static void _rpc_signal_job(slurm_msg_t *, slurm_addr *); +static void _rpc_terminate_job(slurm_msg_t *, slurm_addr *); static void _rpc_update_time(slurm_msg_t *, slurm_addr *); static void _rpc_shutdown(slurm_msg_t *msg, slurm_addr *cli_addr); static void _rpc_reconfig(slurm_msg_t *msg, slurm_addr *cli_addr); @@ -130,11 +132,17 @@ slurmd_req(slurm_msg_t *msg, slurm_addr *cli) slurm_mutex_unlock(&launch_mutex); break; case REQUEST_SIGNAL_TASKS: + debug2("Processing RPC: REQUEST_SIGNAL_TASKS"); + _rpc_signal_tasks(msg, cli); + slurm_free_kill_tasks_msg(msg->data); + break; case REQUEST_TERMINATE_TASKS: - _rpc_kill_tasks(msg, cli); + debug2("Processing RPC: REQUEST_TERMINATE_TASKS"); + _rpc_terminate_tasks(msg, cli); slurm_free_kill_tasks_msg(msg->data); break; case REQUEST_KILL_TIMELIMIT: + debug2("Processing RPC: REQUEST_KILL_TIMELIMIT"); _rpc_timelimit(msg, cli); slurm_free_timelimit_msg(msg->data); break; @@ -143,8 +151,13 @@ slurmd_req(slurm_msg_t *msg, slurm_addr *cli) slurm_free_reattach_tasks_request_msg(msg->data); break; case REQUEST_SIGNAL_JOB: + debug2("Processing RPC: REQUEST_SIGNAL_JOB"); + _rpc_signal_job(msg, cli); + slurm_free_kill_job_msg(msg->data); + break; case REQUEST_TERMINATE_JOB: - _rpc_kill_job(msg, cli); + debug2("Processing RPC: REQUEST_TERMINATE_JOB"); + _rpc_terminate_job(msg, cli); slurm_free_kill_job_msg(msg->data); break; case REQUEST_UPDATE_JOB_TIME: @@ -904,7 +917,7 @@ _rpc_ping(slurm_msg_t *msg, slurm_addr *cli_addr) } static void -_rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) +_rpc_signal_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) { int rc = SLURM_SUCCESS; uid_t req_uid; @@ -928,7 +941,86 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) } if (step->state == SLURMD_JOB_STARTING) { - debug ("kill req for starting job step %u.%u", + debug ("signal req for starting job step %u.%u", + req->job_id, req->job_step_id); + rc = ESLURMD_JOB_NOTRUNNING; + goto done; + } + +#ifdef HAVE_AIX +# ifdef SIGMIGRATE +# ifdef SIGSOUND + /* SIGMIGRATE and SIGSOUND are used to initiate job checkpoint on AIX. + * These signals are not sent to the entire process group, but just a + * single process, namely the PMD. */ + if (req->signal == SIGMIGRATE || req->signal == SIGSOUND) { + if (!step->task_list || step->task_list->pid <= (pid_t)1) { + verbose("Invalid pid for signal %d", req->signal); + goto done; + } + if (kill(step->task_list->pid, req->signal) == -1) { + rc = errno; + verbose("Error sending signal %d to pmd" + " for job %u.%u: %s", + req->signal, req->job_id, req->job_step_id, + slurm_strerror(errno)); + } + goto done; + } +# endif +# endif +#endif + + if (step->pgid <= (pid_t)1) { + debug ("step %u.%u invalid in shm [mpid:%d pgid:%u]", + req->job_id, req->job_step_id, + step->mpid, step->pgid); + rc = ESLURMD_JOB_NOTRUNNING; + } + + if (killpg(step->pgid, req->signal) == -1) { + rc = errno; + verbose("Error sending signal %d to %u.%u: %s", + req->signal, req->job_id, req->job_step_id, + slurm_strerror(rc)); + } else { + verbose("Sent signal %d to %u.%u", + req->signal, req->job_id, req->job_step_id); + } + + done: + if (step) + shm_free_step(step); + slurm_send_rc_msg(msg, rc); +} + +static void +_rpc_terminate_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) +{ + int rc = SLURM_SUCCESS; + uid_t req_uid; + job_step_t *step = NULL; + kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; + + debug3("Entering _rpc_terminate_tasks"); + if (!(step = shm_get_step(req->job_id, req->job_step_id))) { + debug("kill for nonexistent job %u.%u requested", + req->job_id, req->job_step_id); + rc = ESLURM_INVALID_JOB_ID; + goto done; + } + + req_uid = g_slurm_auth_get_uid(msg->cred); + if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { + debug("kill req from uid %ld for job %u.%u owned by uid %ld", + (long) req_uid, req->job_id, req->job_step_id, + (long) step->uid); + rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ + goto done; + } + + if (step->state == SLURMD_JOB_STARTING) { + debug ("terminate req for starting job step %u.%u", req->job_id, req->job_step_id); rc = ESLURMD_JOB_NOTRUNNING; goto done; @@ -942,47 +1034,15 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) goto done; } -#if 0 - /* This code was used in an investigation of hung TotalView proceses */ - if ((req->signal == SIGKILL) - || (req->signal == SIGINT)) { /* for proctrack/linuxproc */ - /* - * Assume step termination request. - * Send SIGCONT just in case the processes are stopped. - */ - slurm_container_signal(step->cont_id, SIGCONT); - if (slurm_container_signal(step->cont_id, req->signal) < 0) - rc = errno; - } else -#endif - if (req->signal == 0) { - if (slurm_container_signal(step->cont_id, req->signal) < 0) - rc = errno; -/* SIGMIGRATE and SIGSOUND are used to initiate job checkpoint on AIX. - * These signals are not sent to the entire process group, but just a - * single process, namely the PMD. */ -#ifdef SIGMIGRATE -#ifdef SIGSOUND - } else if ((req->signal == SIGMIGRATE) || - (req->signal == SIGSOUND)) { - if (step->task_list - && (step->task_list->pid > (pid_t) 0) - && (kill(step->task_list->pid, req->signal) < 0)) - rc = errno; -#endif -#endif - } else { - if ((step->pgid > (pid_t) 0) - && (killpg(step->pgid, req->signal) < 0)) - rc = errno; - } - if (rc == SLURM_SUCCESS) - verbose("Sent signal %d to %u.%u", - req->signal, req->job_id, req->job_step_id); - else + if (slurm_container_signal(step->cont_id, req->signal) < 0) { + rc = errno; verbose("Error sending signal %d to %u.%u: %s", req->signal, req->job_id, req->job_step_id, slurm_strerror(rc)); + } else { + verbose("Sent signal %d to %u.%u", + req->signal, req->job_id, req->job_step_id); + } done: if (step) @@ -1002,7 +1062,6 @@ _rpc_timelimit(slurm_msg_t *msg, slurm_addr *cli_addr) kill_job_msg_t *req = msg->data; int nsteps; - debug2("Processing RPC: REQUEST_KILL_TIMELIMIT"); if (!_slurm_authorized_user(uid)) { error ("Security violation: rpc_timelimit req from uid %ld", (long) uid); @@ -1022,7 +1081,7 @@ _rpc_timelimit(slurm_msg_t *msg, slurm_addr *cli_addr) req->job_id, nsteps ); /* Revoke credential, send SIGKILL, run epilog, etc. */ - _rpc_kill_job(msg, cli_addr); + _rpc_terminate_job(msg, cli_addr); } static void _rpc_pid2jid(slurm_msg_t *msg, slurm_addr *cli) @@ -1331,8 +1390,49 @@ _epilog_complete(uint32_t jobid, int rc) return ret; } + +/* FIXME - kill_job_msg_t doesn't have a signal number in the payload + * so it won't suffice for this RPC. Fortunately, no one calls + * this RPC. + */ +static void +_rpc_signal_job(slurm_msg_t *msg, slurm_addr *cli) +{ + int rc = SLURM_SUCCESS; + kill_job_msg_t *req = msg->data; + uid_t uid = g_slurm_auth_get_uid(msg->cred); + int nsteps = 0; + int delay; + char *bgl_part_id = NULL; + + error("_rpc_signal_job not yet implemented"); + /* + * check that requesting user ID is the SLURM UID + */ + if (!_slurm_authorized_user(uid)) { + error("Security violation: kill_job(%ld) from uid %ld", + req->job_id, (long) uid); + if (msg->conn_fd >= 0) + slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); + return; + } + + /*nsteps = _kill_all_active_steps(req->job_id, SIGTERM, true);*/ + + /* + * At this point, if connection still open, we send controller + * a "success" reply to indicate that we've recvd the msg. + */ + if (msg->conn_fd >= 0) { + slurm_send_rc_msg(msg, SLURM_SUCCESS); + if (slurm_close_accepted_conn(msg->conn_fd) < 0) + error ("_rpc_signal_job: close(%d): %m", msg->conn_fd); + msg->conn_fd = -1; + } +} + static void -_rpc_kill_job(slurm_msg_t *msg, slurm_addr *cli) +_rpc_terminate_job(slurm_msg_t *msg, slurm_addr *cli) { int rc = SLURM_SUCCESS; kill_job_msg_t *req = msg->data; @@ -1341,7 +1441,6 @@ _rpc_kill_job(slurm_msg_t *msg, slurm_addr *cli) int delay; char *bgl_part_id = NULL; - debug2("Processing RPC: REQUEST_SIGNAL_JOB"); /* * check that requesting user ID is the SLURM UID */ diff --git a/src/slurmd/slurmd_job.c b/src/slurmd/slurmd_job.c index f25f259867f..07a52d03d10 100644 --- a/src/slurmd/slurmd_job.c +++ b/src/slurmd/slurmd_job.c @@ -172,8 +172,7 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->cwd = xstrdup(msg->cwd); job->env = _array_copy(msg->envc, msg->env); - job->objs = list_create(NULL); /* FIXME! Needs destructor */ - job->eio = eio_handle_create(job->objs); + job->eio = eio_handle_create(); job->sruns = list_create((ListDelF) _srun_info_destructor); job->clients = list_create(NULL); /* FIXME! Needs destructor */ job->stdout_eio_objs = list_create(NULL); /* FIXME! Needs destructor */ @@ -259,8 +258,7 @@ job_spawn_create(spawn_task_request_msg_t *msg, slurm_addr *cli_addr) job->cwd = xstrdup(msg->cwd); job->env = _array_copy(msg->envc, msg->env); - job->objs = list_create(NULL); /* Need destructor */ - job->eio = eio_handle_create(job->objs); + job->eio = eio_handle_create(); job->sruns = list_create((ListDelF) _srun_info_destructor); job->envtp = xmalloc(sizeof(env_t)); job->envtp->jobid = -1; @@ -346,8 +344,7 @@ job_batch_job_create(batch_job_launch_msg_t *msg) job->cwd = xstrdup(msg->work_dir); job->env = _array_copy(msg->envc, msg->environment); - job->objs = list_create(NULL); /* FIXME - Need desctructor */ - job->eio = eio_handle_create(job->objs); + job->eio = eio_handle_create(); job->sruns = list_create((ListDelF) _srun_info_destructor); job->envtp = xmalloc(sizeof(env_t)); job->envtp->jobid = -1; @@ -492,7 +489,6 @@ job_destroy(slurmd_job_t *job) for (i = 0; i < job->ntasks; i++) task_info_destroy(job->task[i]); list_destroy(job->sruns); - list_destroy(job->objs); xfree(job->envtp); xfree(job->task_prolog); xfree(job->task_epilog); diff --git a/src/slurmd/slurmd_job.h b/src/slurmd/slurmd_job.h index 3a22ef644ad..acbf9787201 100644 --- a/src/slurmd/slurmd_job.h +++ b/src/slurmd/slurmd_job.h @@ -125,7 +125,6 @@ typedef struct slurmd_job { struct passwd *pwd; /* saved passwd struct for user job */ slurmd_task_info_t **task; /* array of task information pointers */ eio_handle_t *eio; - List objs; /* List of io_obj_t pointers (see eio.h) */ List sruns; /* List of srun_info_t pointers */ List clients; /* List of struct client_io_info pointers */ List stdout_eio_objs; diff --git a/src/srun/io.c b/src/srun/io.c index a62a8debfdf..a305f8b8ac9 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -771,7 +771,7 @@ io_thr_create(srun_job_t *job) ntohs(job->listenport[i])); /*net_set_low_water(job->listensock[i], 140);*/ obj = _create_listensock_eio(job->listensock[i], job); - list_enqueue(job->eio_objs, obj); + eio_new_initial_obj(job->eio, obj); } /* FIXME - Need to open files here (or perhaps earlier) */ @@ -823,7 +823,11 @@ _read_io_init_msg(int fd, srun_job_t *job, char *host) job->ioserver[msg.nodeid] = _create_server_eio_obj(fd, job, msg.stdout_objs, msg.stderr_objs); - list_enqueue(job->eio_objs, job->ioserver[msg.nodeid]); + /* Normally using eio_new_initial_obj while the eio mainloop + * is running is not safe, but since this code is running + * inside of the eio mainloop there should be no problem. + */ + eio_new_initial_obj(job->eio, job->ioserver[msg.nodeid]); job->ioservers_ready++; return SLURM_SUCCESS; @@ -1034,7 +1038,7 @@ _init_stdio_eio_objs(srun_job_t *job) } job->stdin_obj = create_file_read_eio_obj(infd, job, type, destid); - list_enqueue(job->eio_objs, job->stdin_obj); + eio_new_initial_obj(job->eio, job->stdin_obj); } /* @@ -1059,7 +1063,7 @@ _init_stdio_eio_objs(srun_job_t *job) refcount = job->ntasks; } job->stdout_obj = create_file_write_eio_obj(outfd, job); - list_enqueue(job->eio_objs, job->stdout_obj); + eio_new_initial_obj(job->eio, job->stdout_obj); } /* @@ -1081,7 +1085,7 @@ _init_stdio_eio_objs(srun_job_t *job) } refcount = job->ntasks; job->stderr_obj = create_file_write_eio_obj(errfd, job); - list_enqueue(job->eio_objs, job->stderr_obj); + eio_new_initial_obj(job->eio, job->stderr_obj); } } diff --git a/src/srun/srun_job.c b/src/srun/srun_job.c index e6a56efe3ca..19260fc92f3 100644 --- a/src/srun/srun_job.c +++ b/src/srun/srun_job.c @@ -489,8 +489,7 @@ _job_create_internal(allocation_info_t *info) job->listensock = (int *) xmalloc(job->num_listen * sizeof(int)); job->listenport = (int *) xmalloc(job->num_listen * sizeof(int)); - job->eio_objs = list_create(NULL); /* FIXME - needs destructor */ - job->eio = eio_handle_create(job->eio_objs); + job->eio = eio_handle_create(); job->ioservers_ready = 0; /* "nhosts" number of IO protocol sockets */ job->ioserver = (eio_obj_t **)xmalloc(job->nhosts*sizeof(eio_obj_t *)); diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index c6f2a584cfb..802476d7d81 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -124,7 +124,6 @@ typedef struct srun_job { int *listensock; /* Array of stdio listen sockets */ int *listenport; /* Array of stdio listen ports */ eio_handle_t *eio; /* Event IO handle */ - List eio_objs; /* List of eio_obj_t pointers */ int ioservers_ready; /* Number of servers that established contact */ eio_obj_t **ioserver; /* Array of nhosts pointers to eio_obj_t */ eio_obj_t *stdin_obj; /* stdin eio_obj_t */ -- GitLab