diff --git a/src/common/eio.c b/src/common/eio.c index 8d98305e52cd649a45b18fdb9642f9e7b52939d5..55909788ad1d962c1a86f9d027a7adf411c94d98 100644 --- a/src/common/eio.c +++ b/src/common/eio.c @@ -38,7 +38,11 @@ #include "src/common/fd.h" #include "src/common/eio.h" - +/* + * outside threads can stick new objects on the new_objs List and + * the eio thread will move them to the main obj_list the next time + * it wakes up. + */ struct eio_handle_components { #ifndef NDEBUG # define EIO_MAGIC 0xe1e10 @@ -46,6 +50,7 @@ struct eio_handle_components { #endif int fds[2]; List obj_list; + List new_objs; }; @@ -60,7 +65,7 @@ static void _poll_dispatch(struct pollfd *, unsigned int, eio_obj_t **, static void _poll_handle_event(short revents, eio_obj_t *obj, List objList); -eio_handle_t *eio_handle_create(List eio_obj_list) +eio_handle_t *eio_handle_create(void) { eio_handle_t *eio = xmalloc(sizeof(*eio)); @@ -74,7 +79,8 @@ eio_handle_t *eio_handle_create(List eio_obj_list) xassert(eio->magic = EIO_MAGIC); - eio->obj_list = eio_obj_list; + eio->obj_list = list_create(NULL); /* FIXME! Needs destructor */ + eio->new_objs = list_create(NULL); return eio; } @@ -85,7 +91,7 @@ void eio_handle_destroy(eio_handle_t *eio) xassert(eio->magic == EIO_MAGIC); close(eio->fds[0]); close(eio->fds[1]); - /* FIXME - Destroy the eio object list here ? */ + /* FIXME - Destroy obj_list and new_objs */ xassert(eio->magic = ~EIO_MAGIC); xfree(eio); } @@ -118,16 +124,22 @@ static void _mark_shutdown_true(List obj_list) list_iterator_destroy(objs); } -static int _eio_clear(eio_handle_t *eio) +static int _eio_wakeup_handler(eio_handle_t *eio) { char c = 0; int rc = 0; + eio_obj_t *obj; while ((rc = (read(eio->fds[0], &c, 1)) > 0)) { if (c == 1) _mark_shutdown_true(eio->obj_list); } + /* move new eio objects from the new_objs to the obj_list */ + while (obj = list_dequeue(eio->new_objs)) { + list_enqueue(eio->obj_list, obj); + } + if (rc < 0) return error("eio_clear: read: %m"); return 0; @@ -152,8 +164,9 @@ _poll_loop_internal(eio_handle_t *eio, List objs) for (;;) { - /* Alloc memory for pfds and map if needed */ - if (maxnfds < (n = list_count(objs))) { + /* Alloc memory for pfds and map if needed */ + n = list_count(objs); + if (maxnfds < n) { maxnfds = n; xrealloc(pollfds, (maxnfds+1) * sizeof(struct pollfd)); xrealloc(map, maxnfds * sizeof(eio_obj_t * )); @@ -164,21 +177,18 @@ _poll_loop_internal(eio_handle_t *eio, List objs) debug3("eio: handling events for %d objects", list_count(objs)); - /* - * Clear any pending eio signals - */ - _eio_clear(eio); - if ((nfds = _poll_setup_pollfds(pollfds, map, objs)) <= 0) goto done; /* - * Setup eio handle poll fd + * Setup eio handle signalling fd */ pollfds[nfds].fd = eio->fds[0]; pollfds[nfds].events = POLLIN; nfds++; + debug("nfds = %d, n = %d, maxnfds = %d", + nfds, n, maxnfds); xassert(nfds <= maxnfds + 1); @@ -186,7 +196,7 @@ _poll_loop_internal(eio_handle_t *eio, List objs) goto error; if (pollfds[nfds-1].revents & POLLIN) - _eio_clear(eio); + _eio_wakeup_handler(eio); _poll_dispatch(pollfds, nfds-1, map, objs); } @@ -329,3 +339,32 @@ void eio_obj_destroy(eio_obj_t *obj) xfree(obj); } } + + +/* + * Add an eio_obj_t "obj" to an eio_handle_t "eio"'s internal object list. + * + * This function can only be used to intialize "eio"'s list before + * calling eio_handle_mainloop. If it is used after the eio engine's + * mainloop has started, segfaults are likely. + */ +void eio_new_initial_obj(eio_handle_t *eio, eio_obj_t *obj) +{ + xassert(eio != NULL); + xassert(eio->magic == EIO_MAGIC); + + list_enqueue(eio->obj_list, obj); +} + +/* + * Queue an eio_obj_t "obj" for inclusion in an already running + * eio_handle_t "eio"'s internal object list. + */ +void eio_new_obj(eio_handle_t *eio, eio_obj_t *obj) +{ + xassert(eio != NULL); + xassert(eio->magic == EIO_MAGIC); + + list_enqueue(eio->new_objs, obj); + eio_signal_wakeup(eio); +} diff --git a/src/common/eio.h b/src/common/eio.h index d1ab7daf9aae9188b0f209278208f8a60455d096..6c1d34031d9e52600ed56177f6a73786daadc32a 100644 --- a/src/common/eio.h +++ b/src/common/eio.h @@ -56,9 +56,25 @@ struct eio_obj { bool shutdown; }; -eio_handle_t *eio_handle_create(List eio_obj_list); +eio_handle_t *eio_handle_create(void); void eio_handle_destroy(eio_handle_t *eio); +/* + * Add an eio_obj_t "obj" to an eio_handle_t "eio"'s internal object list. + * + * This function can only be used to intialize "eio"'s list before + * calling eio_handle_mainloop. If it is used after the eio engine's + * mainloop has started, segfaults are likely. + */ +void eio_new_initial_obj(eio_handle_t *eio, eio_obj_t *obj); + +/* + * Queue an eio_obj_t "obj" for inclusion in an already running + * eio_handle_t "eio"'s internal object list. + */ +void eio_new_obj(eio_handle_t *eio, eio_obj_t *obj); + + /* This routine will watch for activtiy on the fd's as long * as obj->readable() or obj->writable() returns >0 * diff --git a/src/slurmd/io.c b/src/slurmd/io.c index 02fe5bc50397d5e15a39af704e3e9fb99416c398..4cc11f05a043e96d3c775cc8843ec1b3ddab79e0 100644 --- a/src/slurmd/io.c +++ b/src/slurmd/io.c @@ -205,7 +205,8 @@ _client_writable(eio_obj_t *obj) } /* If this is a newly attached client its msg_queue needs - * to be intialized from the outgoing_cache + * to be intialized from the outgoing_cache, and then "obj" needs + * to be added to the List of clients. */ if (client->msg_queue == NULL) { ListIterator msgs; @@ -217,6 +218,8 @@ _client_writable(eio_obj_t *obj) list_enqueue(client->msg_queue, msg); } list_iterator_destroy(msgs); + /* and now make this object visible to tasks */ + list_append(client->job->clients, (void *)obj); } if (client->out_msg != NULL) @@ -656,7 +659,7 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) fd_set_close_on_exec(task->to_stdin); fd_set_nonblocking(task->to_stdin); task->in = _create_task_in_eio(task->to_stdin, job); - list_append(job->objs, (void *)task->in); + eio_new_initial_obj(job->eio, (void *)task->in); } /* @@ -686,8 +689,8 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) fd_set_nonblocking(task->from_stdout); task->out = _create_task_out_eio(task->from_stdout, SLURM_IO_STDOUT, job, task); - list_append(job->objs, (void *)task->out); list_append(job->stdout_eio_objs, (void *)task->out); + eio_new_initial_obj(job->eio, (void *)task->out); } /* @@ -717,8 +720,8 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) fd_set_nonblocking(task->from_stderr); task->err = _create_task_out_eio(task->from_stderr, SLURM_IO_STDERR, job, task); - list_append(job->objs, (void *)task->err); list_append(job->stderr_eio_objs, (void *)task->err); + eio_new_initial_obj(job->eio, (void *)task->err); } } @@ -928,6 +931,62 @@ _io_thr(void *arg) return (void *)1; } +/* + * Create the initial TCP connection back to a waiting client (e.g. srun). + * + * Since this is the first client connection and the IO engine has not + * yet started, we initialize the msg_queue as an empty list and + * directly add the eio_obj_t to the eio handle with eio_new_initial_obj. + */ +int +io_initial_client_connect(srun_info_t *srun, slurmd_job_t *job) +{ + int i; + int sock = -1; + struct client_io_info *client; + eio_obj_t *obj; + + debug2 ("adding IO connection (logical node rank %d)", job->nodeid); + + if (srun->ioaddr.sin_addr.s_addr) { + char host[256]; + uint16_t port; + slurmd_get_addr(&srun->ioaddr, &port, host, sizeof(host)); + debug2("connecting IO back to %s:%d", host, ntohs(port)); + } + + if ((sock = (int) slurm_open_stream(&srun->ioaddr)) < 0) { + error("connect io: %m"); + /* XXX retry or silently fail? + * fail for now. + */ + return SLURM_ERROR; + } + + fd_set_blocking(sock); /* just in case... */ + + _send_io_init_msg(sock, srun->key, job); + + debug3(" back from _send_io_init_msg"); + fd_set_nonblocking(sock); + fd_set_close_on_exec(sock); + + /* Now set up the eio object */ + client = xmalloc(sizeof(struct client_io_info)); +#ifndef NDEBUG + client->magic = CLIENT_IO_MAGIC; +#endif + client->job = job; + client->msg_queue = list_create(NULL); /* FIXME - destructor */ + + obj = eio_obj_create(sock, &client_ops, (void *)client); + list_append(job->clients, (void *)obj); + eio_new_initial_obj(job->eio, (void *)obj); + debug3("Now handling %d IO Client object(s)", list_count(job->clients)); + + return SLURM_SUCCESS; +} + /* * Initiate a TCP connection back to a waiting client (e.g. srun). * @@ -974,10 +1033,10 @@ io_client_connect(srun_info_t *srun, slurmd_job_t *job) #endif client->job = job; client->msg_queue = NULL; /* initialized in _client_writable */ + /* client object adds itself to job->clients in _client_writable */ obj = eio_obj_create(sock, &client_ops, (void *)client); - list_append(job->clients, (void *)obj); - list_append(job->objs, (void *)obj); + eio_new_obj(job->eio, (void *)obj); debug3("Now handling %d IO Client object(s)", list_count(job->clients)); @@ -1068,7 +1127,7 @@ _send_eof_msg(struct task_read_info *out) clients = list_iterator_create(out->job->clients); while(eio = list_next(clients)) { client = (struct client_io_info *)eio->arg; - debug3("======================== Enqueued message"); + debug3("======================== Enqueued eof message"); xassert(client->magic == CLIENT_IO_MAGIC); if (list_enqueue(client->msg_queue, msg)) msg->ref_count++; diff --git a/src/slurmd/io.h b/src/slurmd/io.h index 4390ec4745981db88d2ae0a2c37968e1c588c3b0..bb93b9c5072ed7d8b5abe2f358be50d3a0a33f92 100644 --- a/src/slurmd/io.h +++ b/src/slurmd/io.h @@ -47,6 +47,15 @@ struct io_buf { struct io_buf *alloc_io_buf(void); void free_io_buf(struct io_buf *buf); +/* + * Create a TCP connection back the initial client (e.g. srun). + * + * Since this is the first client connection and the IO engine has not + * yet started, we initialize the msg_queue as an empty list and + * directly add the eio_obj_t to the eio handle with eio_new_initial_handle. + */ +int io_initial_client_connect(srun_info_t *srun, slurmd_job_t *job); + /* * Initiate a TCP connection back to a waiting client (e.g. srun). * diff --git a/src/slurmd/mgr.c b/src/slurmd/mgr.c index 3b47af90811080778b5f4965c83a58bcf5f0c3ee..5a7b99e390e3616af85f473130546f3e35903edc 100644 --- a/src/slurmd/mgr.c +++ b/src/slurmd/mgr.c @@ -447,6 +447,16 @@ _setup_io(slurmd_job_t *job) error("sete{u/g}id(%lu/%lu): %m", (u_long) spwd->pw_uid, (u_long) spwd->pw_gid); + /* + * MUST create the initial client object before starting + * the IO thread, or we risk losing stdout/err traffic. + */ + if (!job->batch) { + srun_info_t *srun = list_peek(job->sruns); + xassert(srun != NULL); + rc = io_initial_client_connect(srun, job); + } + if (!job->batch) if (io_thread_start(job) < 0) return ESLURMD_IO_ERROR; @@ -456,11 +466,6 @@ _setup_io(slurmd_job_t *job) */ _slurmd_job_log_init(job); - if (!job->batch) { - srun_info_t *srun = list_peek(job->sruns); - xassert(srun != NULL); - rc = io_client_connect(srun, job); - } #ifndef NDEBUG # ifdef PR_SET_DUMPABLE diff --git a/src/slurmd/req.c b/src/slurmd/req.c index 2a94871120c7848d6a7498b47edfd2866315bb4e..088fca5226cfcd1417a8850c76042655434c9e26 100644 --- a/src/slurmd/req.c +++ b/src/slurmd/req.c @@ -76,10 +76,12 @@ static int _launch_tasks(launch_tasks_request_msg_t *, slurm_addr *, static void _rpc_launch_tasks(slurm_msg_t *, slurm_addr *); static void _rpc_spawn_task(slurm_msg_t *, slurm_addr *); static void _rpc_batch_job(slurm_msg_t *, slurm_addr *); -static void _rpc_kill_tasks(slurm_msg_t *, slurm_addr *); +static void _rpc_signal_tasks(slurm_msg_t *, slurm_addr *); +static void _rpc_terminate_tasks(slurm_msg_t *, slurm_addr *); static void _rpc_timelimit(slurm_msg_t *, slurm_addr *); static void _rpc_reattach_tasks(slurm_msg_t *, slurm_addr *); -static void _rpc_kill_job(slurm_msg_t *, slurm_addr *); +static void _rpc_signal_job(slurm_msg_t *, slurm_addr *); +static void _rpc_terminate_job(slurm_msg_t *, slurm_addr *); static void _rpc_update_time(slurm_msg_t *, slurm_addr *); static void _rpc_shutdown(slurm_msg_t *msg, slurm_addr *cli_addr); static void _rpc_reconfig(slurm_msg_t *msg, slurm_addr *cli_addr); @@ -130,11 +132,17 @@ slurmd_req(slurm_msg_t *msg, slurm_addr *cli) slurm_mutex_unlock(&launch_mutex); break; case REQUEST_SIGNAL_TASKS: + debug2("Processing RPC: REQUEST_SIGNAL_TASKS"); + _rpc_signal_tasks(msg, cli); + slurm_free_kill_tasks_msg(msg->data); + break; case REQUEST_TERMINATE_TASKS: - _rpc_kill_tasks(msg, cli); + debug2("Processing RPC: REQUEST_TERMINATE_TASKS"); + _rpc_terminate_tasks(msg, cli); slurm_free_kill_tasks_msg(msg->data); break; case REQUEST_KILL_TIMELIMIT: + debug2("Processing RPC: REQUEST_KILL_TIMELIMIT"); _rpc_timelimit(msg, cli); slurm_free_timelimit_msg(msg->data); break; @@ -143,8 +151,13 @@ slurmd_req(slurm_msg_t *msg, slurm_addr *cli) slurm_free_reattach_tasks_request_msg(msg->data); break; case REQUEST_SIGNAL_JOB: + debug2("Processing RPC: REQUEST_SIGNAL_JOB"); + _rpc_signal_job(msg, cli); + slurm_free_kill_job_msg(msg->data); + break; case REQUEST_TERMINATE_JOB: - _rpc_kill_job(msg, cli); + debug2("Processing RPC: REQUEST_TERMINATE_JOB"); + _rpc_terminate_job(msg, cli); slurm_free_kill_job_msg(msg->data); break; case REQUEST_UPDATE_JOB_TIME: @@ -904,7 +917,7 @@ _rpc_ping(slurm_msg_t *msg, slurm_addr *cli_addr) } static void -_rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) +_rpc_signal_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) { int rc = SLURM_SUCCESS; uid_t req_uid; @@ -928,7 +941,86 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) } if (step->state == SLURMD_JOB_STARTING) { - debug ("kill req for starting job step %u.%u", + debug ("signal req for starting job step %u.%u", + req->job_id, req->job_step_id); + rc = ESLURMD_JOB_NOTRUNNING; + goto done; + } + +#ifdef HAVE_AIX +# ifdef SIGMIGRATE +# ifdef SIGSOUND + /* SIGMIGRATE and SIGSOUND are used to initiate job checkpoint on AIX. + * These signals are not sent to the entire process group, but just a + * single process, namely the PMD. */ + if (req->signal == SIGMIGRATE || req->signal == SIGSOUND) { + if (!step->task_list || step->task_list->pid <= (pid_t)1) { + verbose("Invalid pid for signal %d", req->signal); + goto done; + } + if (kill(step->task_list->pid, req->signal) == -1) { + rc = errno; + verbose("Error sending signal %d to pmd" + " for job %u.%u: %s", + req->signal, req->job_id, req->job_step_id, + slurm_strerror(errno)); + } + goto done; + } +# endif +# endif +#endif + + if (step->pgid <= (pid_t)1) { + debug ("step %u.%u invalid in shm [mpid:%d pgid:%u]", + req->job_id, req->job_step_id, + step->mpid, step->pgid); + rc = ESLURMD_JOB_NOTRUNNING; + } + + if (killpg(step->pgid, req->signal) == -1) { + rc = errno; + verbose("Error sending signal %d to %u.%u: %s", + req->signal, req->job_id, req->job_step_id, + slurm_strerror(rc)); + } else { + verbose("Sent signal %d to %u.%u", + req->signal, req->job_id, req->job_step_id); + } + + done: + if (step) + shm_free_step(step); + slurm_send_rc_msg(msg, rc); +} + +static void +_rpc_terminate_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) +{ + int rc = SLURM_SUCCESS; + uid_t req_uid; + job_step_t *step = NULL; + kill_tasks_msg_t *req = (kill_tasks_msg_t *) msg->data; + + debug3("Entering _rpc_terminate_tasks"); + if (!(step = shm_get_step(req->job_id, req->job_step_id))) { + debug("kill for nonexistent job %u.%u requested", + req->job_id, req->job_step_id); + rc = ESLURM_INVALID_JOB_ID; + goto done; + } + + req_uid = g_slurm_auth_get_uid(msg->cred); + if ((req_uid != step->uid) && (!_slurm_authorized_user(req_uid))) { + debug("kill req from uid %ld for job %u.%u owned by uid %ld", + (long) req_uid, req->job_id, req->job_step_id, + (long) step->uid); + rc = ESLURM_USER_ID_MISSING; /* or bad in this case */ + goto done; + } + + if (step->state == SLURMD_JOB_STARTING) { + debug ("terminate req for starting job step %u.%u", req->job_id, req->job_step_id); rc = ESLURMD_JOB_NOTRUNNING; goto done; @@ -942,47 +1034,15 @@ _rpc_kill_tasks(slurm_msg_t *msg, slurm_addr *cli_addr) goto done; } -#if 0 - /* This code was used in an investigation of hung TotalView proceses */ - if ((req->signal == SIGKILL) - || (req->signal == SIGINT)) { /* for proctrack/linuxproc */ - /* - * Assume step termination request. - * Send SIGCONT just in case the processes are stopped. - */ - slurm_container_signal(step->cont_id, SIGCONT); - if (slurm_container_signal(step->cont_id, req->signal) < 0) - rc = errno; - } else -#endif - if (req->signal == 0) { - if (slurm_container_signal(step->cont_id, req->signal) < 0) - rc = errno; -/* SIGMIGRATE and SIGSOUND are used to initiate job checkpoint on AIX. - * These signals are not sent to the entire process group, but just a - * single process, namely the PMD. */ -#ifdef SIGMIGRATE -#ifdef SIGSOUND - } else if ((req->signal == SIGMIGRATE) || - (req->signal == SIGSOUND)) { - if (step->task_list - && (step->task_list->pid > (pid_t) 0) - && (kill(step->task_list->pid, req->signal) < 0)) - rc = errno; -#endif -#endif - } else { - if ((step->pgid > (pid_t) 0) - && (killpg(step->pgid, req->signal) < 0)) - rc = errno; - } - if (rc == SLURM_SUCCESS) - verbose("Sent signal %d to %u.%u", - req->signal, req->job_id, req->job_step_id); - else + if (slurm_container_signal(step->cont_id, req->signal) < 0) { + rc = errno; verbose("Error sending signal %d to %u.%u: %s", req->signal, req->job_id, req->job_step_id, slurm_strerror(rc)); + } else { + verbose("Sent signal %d to %u.%u", + req->signal, req->job_id, req->job_step_id); + } done: if (step) @@ -1002,7 +1062,6 @@ _rpc_timelimit(slurm_msg_t *msg, slurm_addr *cli_addr) kill_job_msg_t *req = msg->data; int nsteps; - debug2("Processing RPC: REQUEST_KILL_TIMELIMIT"); if (!_slurm_authorized_user(uid)) { error ("Security violation: rpc_timelimit req from uid %ld", (long) uid); @@ -1022,7 +1081,7 @@ _rpc_timelimit(slurm_msg_t *msg, slurm_addr *cli_addr) req->job_id, nsteps ); /* Revoke credential, send SIGKILL, run epilog, etc. */ - _rpc_kill_job(msg, cli_addr); + _rpc_terminate_job(msg, cli_addr); } static void _rpc_pid2jid(slurm_msg_t *msg, slurm_addr *cli) @@ -1331,8 +1390,49 @@ _epilog_complete(uint32_t jobid, int rc) return ret; } + +/* FIXME - kill_job_msg_t doesn't have a signal number in the payload + * so it won't suffice for this RPC. Fortunately, no one calls + * this RPC. + */ +static void +_rpc_signal_job(slurm_msg_t *msg, slurm_addr *cli) +{ + int rc = SLURM_SUCCESS; + kill_job_msg_t *req = msg->data; + uid_t uid = g_slurm_auth_get_uid(msg->cred); + int nsteps = 0; + int delay; + char *bgl_part_id = NULL; + + error("_rpc_signal_job not yet implemented"); + /* + * check that requesting user ID is the SLURM UID + */ + if (!_slurm_authorized_user(uid)) { + error("Security violation: kill_job(%ld) from uid %ld", + req->job_id, (long) uid); + if (msg->conn_fd >= 0) + slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING); + return; + } + + /*nsteps = _kill_all_active_steps(req->job_id, SIGTERM, true);*/ + + /* + * At this point, if connection still open, we send controller + * a "success" reply to indicate that we've recvd the msg. + */ + if (msg->conn_fd >= 0) { + slurm_send_rc_msg(msg, SLURM_SUCCESS); + if (slurm_close_accepted_conn(msg->conn_fd) < 0) + error ("_rpc_signal_job: close(%d): %m", msg->conn_fd); + msg->conn_fd = -1; + } +} + static void -_rpc_kill_job(slurm_msg_t *msg, slurm_addr *cli) +_rpc_terminate_job(slurm_msg_t *msg, slurm_addr *cli) { int rc = SLURM_SUCCESS; kill_job_msg_t *req = msg->data; @@ -1341,7 +1441,6 @@ _rpc_kill_job(slurm_msg_t *msg, slurm_addr *cli) int delay; char *bgl_part_id = NULL; - debug2("Processing RPC: REQUEST_SIGNAL_JOB"); /* * check that requesting user ID is the SLURM UID */ diff --git a/src/slurmd/slurmd_job.c b/src/slurmd/slurmd_job.c index f25f259867f4cf587b7c6d02bf72ede5e569a7ad..07a52d03d10b997567e61f786516f147cbd07166 100644 --- a/src/slurmd/slurmd_job.c +++ b/src/slurmd/slurmd_job.c @@ -172,8 +172,7 @@ job_create(launch_tasks_request_msg_t *msg, slurm_addr *cli_addr) job->cwd = xstrdup(msg->cwd); job->env = _array_copy(msg->envc, msg->env); - job->objs = list_create(NULL); /* FIXME! Needs destructor */ - job->eio = eio_handle_create(job->objs); + job->eio = eio_handle_create(); job->sruns = list_create((ListDelF) _srun_info_destructor); job->clients = list_create(NULL); /* FIXME! Needs destructor */ job->stdout_eio_objs = list_create(NULL); /* FIXME! Needs destructor */ @@ -259,8 +258,7 @@ job_spawn_create(spawn_task_request_msg_t *msg, slurm_addr *cli_addr) job->cwd = xstrdup(msg->cwd); job->env = _array_copy(msg->envc, msg->env); - job->objs = list_create(NULL); /* Need destructor */ - job->eio = eio_handle_create(job->objs); + job->eio = eio_handle_create(); job->sruns = list_create((ListDelF) _srun_info_destructor); job->envtp = xmalloc(sizeof(env_t)); job->envtp->jobid = -1; @@ -346,8 +344,7 @@ job_batch_job_create(batch_job_launch_msg_t *msg) job->cwd = xstrdup(msg->work_dir); job->env = _array_copy(msg->envc, msg->environment); - job->objs = list_create(NULL); /* FIXME - Need desctructor */ - job->eio = eio_handle_create(job->objs); + job->eio = eio_handle_create(); job->sruns = list_create((ListDelF) _srun_info_destructor); job->envtp = xmalloc(sizeof(env_t)); job->envtp->jobid = -1; @@ -492,7 +489,6 @@ job_destroy(slurmd_job_t *job) for (i = 0; i < job->ntasks; i++) task_info_destroy(job->task[i]); list_destroy(job->sruns); - list_destroy(job->objs); xfree(job->envtp); xfree(job->task_prolog); xfree(job->task_epilog); diff --git a/src/slurmd/slurmd_job.h b/src/slurmd/slurmd_job.h index 3a22ef644ad6b461766658fd9887a577a437d7de..acbf978720127cbd7a1bcd24e33f31a77a39a5e4 100644 --- a/src/slurmd/slurmd_job.h +++ b/src/slurmd/slurmd_job.h @@ -125,7 +125,6 @@ typedef struct slurmd_job { struct passwd *pwd; /* saved passwd struct for user job */ slurmd_task_info_t **task; /* array of task information pointers */ eio_handle_t *eio; - List objs; /* List of io_obj_t pointers (see eio.h) */ List sruns; /* List of srun_info_t pointers */ List clients; /* List of struct client_io_info pointers */ List stdout_eio_objs; diff --git a/src/srun/io.c b/src/srun/io.c index a62a8debfdfcb8769f3ffffad202b3c3a108beef..a305f8b8ac92d382bce126e68e4829412dac4f7f 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -771,7 +771,7 @@ io_thr_create(srun_job_t *job) ntohs(job->listenport[i])); /*net_set_low_water(job->listensock[i], 140);*/ obj = _create_listensock_eio(job->listensock[i], job); - list_enqueue(job->eio_objs, obj); + eio_new_initial_obj(job->eio, obj); } /* FIXME - Need to open files here (or perhaps earlier) */ @@ -823,7 +823,11 @@ _read_io_init_msg(int fd, srun_job_t *job, char *host) job->ioserver[msg.nodeid] = _create_server_eio_obj(fd, job, msg.stdout_objs, msg.stderr_objs); - list_enqueue(job->eio_objs, job->ioserver[msg.nodeid]); + /* Normally using eio_new_initial_obj while the eio mainloop + * is running is not safe, but since this code is running + * inside of the eio mainloop there should be no problem. + */ + eio_new_initial_obj(job->eio, job->ioserver[msg.nodeid]); job->ioservers_ready++; return SLURM_SUCCESS; @@ -1034,7 +1038,7 @@ _init_stdio_eio_objs(srun_job_t *job) } job->stdin_obj = create_file_read_eio_obj(infd, job, type, destid); - list_enqueue(job->eio_objs, job->stdin_obj); + eio_new_initial_obj(job->eio, job->stdin_obj); } /* @@ -1059,7 +1063,7 @@ _init_stdio_eio_objs(srun_job_t *job) refcount = job->ntasks; } job->stdout_obj = create_file_write_eio_obj(outfd, job); - list_enqueue(job->eio_objs, job->stdout_obj); + eio_new_initial_obj(job->eio, job->stdout_obj); } /* @@ -1081,7 +1085,7 @@ _init_stdio_eio_objs(srun_job_t *job) } refcount = job->ntasks; job->stderr_obj = create_file_write_eio_obj(errfd, job); - list_enqueue(job->eio_objs, job->stderr_obj); + eio_new_initial_obj(job->eio, job->stderr_obj); } } diff --git a/src/srun/srun_job.c b/src/srun/srun_job.c index e6a56efe3ca29e525e8ba1c9f16710924ef23fa5..19260fc92f3016aac5dd8d05c759c38bdd5a29c8 100644 --- a/src/srun/srun_job.c +++ b/src/srun/srun_job.c @@ -489,8 +489,7 @@ _job_create_internal(allocation_info_t *info) job->listensock = (int *) xmalloc(job->num_listen * sizeof(int)); job->listenport = (int *) xmalloc(job->num_listen * sizeof(int)); - job->eio_objs = list_create(NULL); /* FIXME - needs destructor */ - job->eio = eio_handle_create(job->eio_objs); + job->eio = eio_handle_create(); job->ioservers_ready = 0; /* "nhosts" number of IO protocol sockets */ job->ioserver = (eio_obj_t **)xmalloc(job->nhosts*sizeof(eio_obj_t *)); diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index c6f2a584cfbb017622e720fadd69e7a6045492c2..802476d7d81de7b886b3ae429baaa19fda1fbea9 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -124,7 +124,6 @@ typedef struct srun_job { int *listensock; /* Array of stdio listen sockets */ int *listenport; /* Array of stdio listen ports */ eio_handle_t *eio; /* Event IO handle */ - List eio_objs; /* List of eio_obj_t pointers */ int ioservers_ready; /* Number of servers that established contact */ eio_obj_t **ioserver; /* Array of nhosts pointers to eio_obj_t */ eio_obj_t *stdin_obj; /* stdin eio_obj_t */