diff --git a/src/common/env.c b/src/common/env.c index 690b0f719864a3f1966ad5492c4b094ddd7131bd..f13ecf91fc6e919d93e3876e339884a2a98f7486 100644 --- a/src/common/env.c +++ b/src/common/env.c @@ -631,8 +631,24 @@ int setup_env(env_t *env) setenvf(&env->env, "LOADL_ACTIVE", "3.2.0"); } #endif - - return SLURM_SUCCESS; + + if (env->pty_port + && setenvf(&env->env, "SLURM_PTY_PORT", "%hu", env->pty_port)) { + error("Can't set SLURM_PTY_PORT env variable"); + rc = SLURM_FAILURE; + } + if (env->ws_col + && setenvf(&env->env, "SLURM_PTY_WIN_COL", "%hu", env->ws_col)) { + error("Can't set SLURM_PTY_WIN_COL env variable"); + rc = SLURM_FAILURE; + } + if (env->ws_row + && setenvf(&env->env, "SLURM_PTY_WIN_ROW", "%hu", env->ws_row)) { + error("Can't set SLURM_PTY_WIN_ROW env variable"); + rc = SLURM_FAILURE; + } + + return rc; } /********************************************************************** diff --git a/src/common/env.h b/src/common/env.h index 5b829a895ddca9ba60b9444438d38b102355f27b..4b535380c4b2645354613efeee80b83a214ab309 100644 --- a/src/common/env.h +++ b/src/common/env.h @@ -70,6 +70,9 @@ typedef struct env_options { int cpus_on_node; pid_t task_pid; char *sgtids; /* global ranks array of integers */ + uint16_t pty_port; /* used to communicate window size changes */ + uint8_t ws_col; /* window size, columns */ + uint8_t ws_row; /* window size, row count */ } env_t; diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 0e107bc12e32d6d90eaf18d289cc4330908896e8..146f97b049632be0a3159c44828fd6c1b9a8c2cd 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -648,6 +648,11 @@ typedef struct multi_core_data { uint16_t plane_size; /* plane size when task_dist = SLURM_DIST_PLANE */ } multi_core_data_t; +typedef struct pty_winsz { + uint16_t cols; + uint16_t rows; +} pty_winsz_t; + /*****************************************************************************\ * Slurm API Message Types \*****************************************************************************/ diff --git a/src/slurmd/slurmstepd/io.c b/src/slurmd/slurmstepd/io.c index b3d6fe832d3bc9cce9ad8a8ebee38d1a2892e82e..4b92a1a29e162bd9726f684aef318800df5e4518 100644 --- a/src/slurmd/slurmstepd/io.c +++ b/src/slurmd/slurmstepd/io.c @@ -60,6 +60,7 @@ # include <utmp.h> #endif +#include <sys/poll.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/stat.h> @@ -169,6 +170,17 @@ struct task_read_info { bool eof_msg_sent; }; +/********************************************************************** + * Pseudo terminal declarations + **********************************************************************/ +struct window_info { + slurmd_task_info_t *task; + slurmd_job_t *job; + slurm_fd pty_fd; +}; +static void _spawn_window_manager(slurmd_task_info_t *task, slurmd_job_t *job); +static void *_window_manager(void *arg); + /********************************************************************** * General declarations **********************************************************************/ @@ -412,7 +424,8 @@ _client_write(eio_obj_t *obj, List objs) /* * Write message to socket. */ - buf = client->out_msg->data + (client->out_msg->length - client->out_remaining); + buf = client->out_msg->data + + (client->out_msg->length - client->out_remaining); again: if ((n = write(obj->fd, buf, client->out_remaining)) < 0) { if (errno == EINTR) { @@ -658,6 +671,115 @@ again: return SLURM_SUCCESS; } +/********************************************************************** + * Pseudo terminal functions + **********************************************************************/ +static void *_window_manager(void *arg) +{ + struct window_info *win_info = (struct window_info *) arg; + pty_winsz_t winsz; + size_t len; + struct winsize ws; + struct pollfd ufds; + char buf[4]; + + info("in _window_manager"); + ufds.fd = win_info->pty_fd; + ufds.events = POLLIN; + + while (1) { + if (poll(&ufds, 1, -1) <= 0) { + if (errno == EINTR) + continue; + error("poll(pty): %m"); + break; + } + if (!(ufds.revents & POLLIN)) { + /* ((ufds.revents & POLLHUP) || + * (ufds.revents & POLLERR)) */ + break; + } + len = slurm_read_stream(win_info->pty_fd, buf, 4); + if ((len == -1) && ((errno == EINTR) || (errno == EAGAIN))) + continue; + if (len < 4) { + error("read window size error: %m"); + return NULL; + } + memcpy(&winsz.cols, buf, 2); + memcpy(&winsz.rows, buf+2, 2); + ws.ws_col = ntohs(winsz.cols); + ws.ws_row = ntohs(winsz.rows); + debug("new pty size %u:%u", ws.ws_row, ws.ws_col); + if (ioctl(win_info->task->to_stdin, TIOCSWINSZ, &ws)) + error("ioctl(TIOCSWINSZ): %s"); + if (kill(win_info->task->pid, SIGWINCH)) { + error("kill(%d, SIGWINCH): %m", + (int)win_info->task->pid); + } + } + return NULL; +} + +static void +_spawn_window_manager(slurmd_task_info_t *task, slurmd_job_t *job) +{ + char *host, *port, *rows, *cols; + slurm_fd pty_fd; + slurm_addr pty_addr; + uint16_t port_u; + struct window_info *win_info; + pthread_attr_t attr; + pthread_t win_id; + +#if 0 + /* NOTE: SLURM_LAUNCH_NODE_IPADDR is not available at this point */ + if (!(ip_addr = getenvp(job->env, "SLURM_LAUNCH_NODE_IPADDR"))) { + error("SLURM_LAUNCH_NODE_IPADDR env var not set"); + return; + } +#endif + if (!(host = getenvp(job->env, "SLURM_SRUN_COMM_HOST"))) { + error("SLURM_SRUN_COMM_HOST env var not set"); + return; + } + if (!(port = getenvp(job->env, "SLURM_PTY_PORT"))) { + error("SLURM_PTY_PORT env var not set"); + return; + } + if (!(cols = getenvp(job->env, "SLURM_PTY_WIN_COL"))) + error("SLURM_PTY_WIN_COL env var not set"); + if (!(rows = getenvp(job->env, "SLURM_PTY_WIN_ROW"))) + error("SLURM_PTY_WIN_ROW env var not set"); + + if (rows && cols) { + struct winsize ws; + ws.ws_col = atoi(cols); + ws.ws_row = atoi(rows); + debug("init pty size %u:%u", ws.ws_row, ws.ws_col); + if (ioctl(task->to_stdin, TIOCSWINSZ, &ws)) + error("ioctl(TIOCSWINSZ): %s"); + } + + port_u = atoi(port); + slurm_set_addr(&pty_addr, port_u, host); + pty_fd = slurm_open_msg_conn(&pty_addr); + if (pty_fd < 0) { + error("slurm_open_msg_conn(pty_conn) %s,%u: %m", + host, port_u); + return; + } + + win_info = xmalloc(sizeof(struct window_info)); + win_info->task = task; + win_info->job = job; + win_info->pty_fd = pty_fd; + slurm_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (pthread_create(&win_id, &attr, &_window_manager, (void *) win_info)) + error("pthread_create(pty_conn): %m"); +} + /********************************************************************** * General fuctions **********************************************************************/ @@ -700,6 +822,7 @@ _init_task_stdio_fds(slurmd_task_info_t *task, slurmd_job_t *job) task->to_stdin = amaster; fd_set_close_on_exec(task->to_stdin); fd_set_nonblocking(task->to_stdin); + _spawn_window_manager(task, job); task->in = _create_task_in_eio(task->to_stdin, job); eio_new_initial_obj(job->eio, (void *)task->in); } else { diff --git a/src/slurmd/slurmstepd/mgr.c b/src/slurmd/slurmstepd/mgr.c index 3a84e6652383a5ec3a7b32ba7b7ceab821ab9a2d..25dc8f110985f7d38dd1eadfd6502bc496263a70 100644 --- a/src/slurmd/slurmstepd/mgr.c +++ b/src/slurmd/slurmstepd/mgr.c @@ -960,8 +960,9 @@ _fork_all_tasks(slurmd_job_t *job) if (setpgid (job->task[i]->pid, job->pgid) < 0) { if (job->pty) { /* login_tty() must put task zero in its own - * session, causing setpgid() to fail */ - info("Unable to put task %d (pid %ld) into " + * session, causing setpgid() to fail, setsid() + * has already set its process group as desired */ + debug("Unable to put task %d (pid %ld) into " "pgrp %ld: %m", i, job->task[i]->pid, job->pgid); } else { diff --git a/src/slurmd/slurmstepd/task.c b/src/slurmd/slurmstepd/task.c index a58140570bd7fde205614dac38674a18e1085c37..d04f11870c73b8664380106d94ac59790ad1e30a 100644 --- a/src/slurmd/slurmstepd/task.c +++ b/src/slurmd/slurmstepd/task.c @@ -305,7 +305,7 @@ exec_task(slurmd_job_t *job, int i, int waitfd) if (login_tty(task->stdin_fd)) error("login_tty: %m"); else - info("login_tty good"); + debug3("login_tty good"); } #endif diff --git a/src/srun/signals.c b/src/srun/signals.c index df308c1e82f4c003f6dd36f87aa9de05501e720f..f9de4cb4428defe9dc5c77f7274c048cdba9e48c 100644 --- a/src/srun/signals.c +++ b/src/srun/signals.c @@ -46,6 +46,8 @@ #include <signal.h> #include <string.h> +#include <sys/ioctl.h> +#include <sys/poll.h> #include <slurm/slurm_errno.h> @@ -67,15 +69,32 @@ * Static list of signals to block in srun: */ static int srun_sigarray[] = { + SIGINT, SIGQUIT, /*SIGTSTP,*/ SIGCONT, SIGTERM, + SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, SIGWINCH, 0 +}; + +/* + * Static list of signals to process by _sig_thr(). + * NOTE: sigwait() does not work with SIGWINCH on + * on some operating systems (lots of references + * to bug this on the web). + */ +static int srun_sigarray2[] = { SIGINT, SIGQUIT, /*SIGTSTP,*/ SIGCONT, SIGTERM, SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, 0 }; +/* Processed by pty_thr() */ +static int pty_sigarray[] = { SIGWINCH, 0 }; +static int winch; + /* * Static prototypes */ static void _sigterm_handler(int); static void _handle_intr(srun_job_t *, time_t *, time_t *); +static void _handle_sigwinch(int sig); +static void * _pty_thread(void *arg); static void * _sig_thr(void *); static inline bool @@ -176,7 +195,7 @@ _sig_thr(void *arg) while (!_sig_thr_done(job)) { - xsignal_sigset_create(srun_sigarray, &set); + xsignal_sigset_create(srun_sigarray2, &set); if ((err = sigwait(&set, &signo)) != 0) { if (err != EINTR) @@ -209,5 +228,99 @@ _sig_thr(void *arg) return NULL; } +void set_winsize(srun_job_t *job) +{ + struct winsize ws; + + if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &ws)) + error("ioctl(TIOCGWINSZ): %m"); + else { + job->ws_row = ws.ws_row; + job->ws_col = ws.ws_col; + debug2("winsize %u:%u", job->ws_row, job->ws_col); + } + return; +} + +/* SIGWINCH should already be blocked by srun/signal.c */ +void block_sigwinch(void) +{ + xsignal_block(pty_sigarray); +} + +void pty_thread_create(srun_job_t *job) +{ + slurm_addr pty_addr; + pthread_attr_t attr; + + if ((job->pty_fd = slurm_init_msg_engine_port(0)) < 0) { + error("init_msg_engine_port: %m"); + return; + } + if (slurm_get_stream_addr(job->pty_fd, &pty_addr) < 0) { + error("slurm_get_stream_addr: %m"); + return; + } + job->pty_port = ntohs(((struct sockaddr_in) pty_addr).sin_port); + debug2("initialized job control port %hu\n", job->pty_port); + + slurm_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if ((pthread_create(&job->pty_id, &attr, &_pty_thread, (void *) job))) + error("pthread_create(pty_thread): %m"); + slurm_attr_destroy(&attr); +} + +static void _handle_sigwinch(int sig) +{ + winch = 1; + xsignal(SIGWINCH, _handle_sigwinch); +} + +static void _notify_winsize_change(int fd, srun_job_t *job) +{ + pty_winsz_t winsz; + int len; + char buf[4]; + + if (fd < 0) { + error("pty: no file to write window size changes to"); + return; + } + + winsz.cols = htons(job->ws_col); + winsz.rows = htons(job->ws_row); + memcpy(buf, &winsz.cols, 2); + memcpy(buf+2, &winsz.rows, 2); + len = slurm_write_stream(fd, buf, 4); + if (len < sizeof(winsz)) + error("pty: window size change notification error: %m"); +} + +static void *_pty_thread(void *arg) +{ + int fd = -1; + srun_job_t *job = (srun_job_t *) arg; + slurm_addr client_addr; + + xsignal_unblock(pty_sigarray); + xsignal(SIGWINCH, _handle_sigwinch); + + if ((fd = slurm_accept_msg_conn(job->pty_fd, &client_addr)) < 0) { + error("pty: accept failure: %m"); + return NULL; + } + + while (job->state <= SRUN_JOB_RUNNING) { + debug2("waiting for SIGWINCH"); + poll(NULL, 0, -1); + if (winch) { + set_winsize(job); + _notify_winsize_change(fd, job); + } + winch = 0; + } + return NULL; +} diff --git a/src/srun/signals.h b/src/srun/signals.h index 0fb53db828f32e8e8e0ccdd832111b4338488619..2019e840b0c54fb263fb904ef490be1054ed1935 100644 --- a/src/srun/signals.h +++ b/src/srun/signals.h @@ -38,9 +38,14 @@ #ifndef _SIGNALS_H #define _SIGNALS_H +#include "src/srun/srun_job.h" + typedef struct srun_job signal_job_t; -int sig_setup_sigmask(void); +void block_sigwinch(void); +void pty_thread_create(srun_job_t *job); +void set_winsize(srun_job_t *job); +int sig_setup_sigmask(void); int sig_unblock_signals(void); int sig_thr_create(signal_job_t *job); diff --git a/src/srun/srun.c b/src/srun/srun.c index ba8f3349773ba47f8c6000e73a908bb141c73cc0..66b78cf0020139701e367df109fd33473d547b5d 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -289,6 +289,25 @@ int srun(int ac, char **av) env->jobid = job->jobid; env->stepid = job->stepid; } + if (opt.pty) { + struct termios term; + int fd = STDIN_FILENO; + + /* Save terminal settings for restore */ + tcgetattr(fd, &termdefaults); + tcgetattr(fd, &term); + /* Set raw mode on local tty */ + cfmakeraw(&term); + tcsetattr(fd, TCSANOW, &term); + atexit(&_pty_restore); +}{ + set_winsize(job); + block_sigwinch(); + pty_thread_create(job); + env->pty_port = job->pty_port; + env->ws_col = job->ws_col; + env->ws_row = job->ws_row; + } setup_env(env); xfree(env->task_count); xfree(env); @@ -312,20 +331,7 @@ int srun(int ac, char **av) job->step_layout->task_cnt, job->step_layout->node_cnt, job->cred, opt.labelio); - if (opt.pty) { - struct termios term; - int fd = STDIN_FILENO; - - /* Save terminal settings for restore */ - tcgetattr(fd, &termdefaults); - - atexit(&_pty_restore); - - tcgetattr(fd, &term); - /* Set raw mode on local tty */ - cfmakeraw(&term); - tcsetattr(fd, TCSANOW, &term); - } +/*******************************************************************************/ if (!job->client_io || (client_io_handler_start(job->client_io) != SLURM_SUCCESS)) diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index 44a3d6016a7ccbe1bd30e7a4cdeab1042ea4b8c2..e9e7abf3c53b128fed842bdc0d7a4023e2068e81 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -137,7 +137,13 @@ typedef struct srun_job { /* Output streams and stdin fileno */ select_jobinfo_t select_jobinfo; - + + /* Pseudo terminial support */ + pthread_t pty_id; /* pthread to communicate window size changes */ + int pty_fd; /* file to communicate window size changes */ + uint16_t pty_port; /* used to communicate window size changes */ + uint8_t ws_col; /* window size, columns */ + uint8_t ws_row; /* window size, row count */ } srun_job_t; extern int message_thread;