diff --git a/src/common/credential_utils.c b/src/common/credential_utils.c index 7c4ebe9f82f98fa50e98cb8feb69b3aad1ec12cd..e247c85b63edca3a61685c309d8a9c63e50e600e 100644 --- a/src/common/credential_utils.c +++ b/src/common/credential_utils.c @@ -51,20 +51,15 @@ /* prototypes */ static int _clear_expired_revoked_credentials(List list); -static int _is_credential_still_valid(slurm_job_credential_t * credential, - List list); +static int _is_credential_still_valid(slurm_job_credential_t *, List); static void _free_credential_state(void *credential_state); -static int _init_credential_state(credential_state_t * state, - slurm_job_credential_t * cred); -static int _insert_credential_state(slurm_job_credential_t * credential, - List list); -static int _insert_revoked_credential_state(revoke_credential_msg_t * - revoke_msg, List list); -static void _pack_one_cred(credential_state_t *credential_state_ptr, - Buf buffer); -static int _unpack_one_cred(credential_state_t *credential_state_ptr, - Buf buffer); +static int _insert_credential_state(slurm_job_credential_t *l, List); +static int _insert_revoked_credential_state(revoke_credential_msg_t *, List); +static void _pack_one_cred(credential_state_t *, Buf); +static int _unpack_one_cred(credential_state_t *, Buf); +static int _init_credential_state(credential_state_t *, + slurm_job_credential_t *); int sign_credential(slurm_ssl_key_ctx_t * ctx, slurm_job_credential_t * cred) @@ -277,18 +272,17 @@ int _insert_credential_state(slurm_job_credential_t * credential, List list) } int -_insert_revoked_credential_state(revoke_credential_msg_t * revoke_msg, - List list) +_insert_revoked_credential_state(revoke_credential_msg_t *msg, List list) { time_t now = time(NULL); - credential_state_t *credential_state; - - credential_state = xmalloc(sizeof(slurm_job_credential_t)); - credential_state->job_id = revoke_msg->job_id; - credential_state->expiration = revoke_msg->expiration_time; - credential_state->revoked = true; - credential_state->revoke_time = now; - list_append(list, credential_state); + credential_state_t *s = xmalloc(sizeof(*s)); + + s->job_id = msg->job_id; + s->expiration = msg->expiration_time; + s->revoked = true; + s->revoke_time = now; + + list_append(list, s); return SLURM_SUCCESS; } @@ -300,13 +294,13 @@ _insert_revoked_credential_state(revoke_credential_msg_t * revoke_msg, */ void pack_credential_list(List list, Buf buffer) { - ListIterator iterator; - credential_state_t *credential_state_ptr; + ListIterator i = NULL; + credential_state_t *s = NULL; - iterator = list_iterator_create(list); - while ((credential_state_ptr = list_next(iterator))) - _pack_one_cred(credential_state_ptr, buffer); - list_iterator_destroy(iterator); + i = list_iterator_create(list); + while ((s = list_next(i))) + _pack_one_cred(s, buffer); + list_iterator_destroy(i); } /* unpack_credential_list @@ -319,39 +313,40 @@ void pack_credential_list(List list, Buf buffer) */ int unpack_credential_list(List list, Buf buffer) { - credential_state_t *credential_state_ptr; + credential_state_t *s = NULL; - while (1) { - credential_state_ptr = xmalloc(sizeof(slurm_job_credential_t)); - if (_unpack_one_cred(credential_state_ptr, buffer)) { - xfree(credential_state_ptr); + do { + s = xmalloc(sizeof(slurm_job_credential_t)); + if (_unpack_one_cred(s, buffer)) { + xfree(s); return SLURM_ERROR; } else - list_append(list, credential_state_ptr); - } + list_append(list, s); + } while (remaining_buf(buffer)); + return SLURM_SUCCESS; } static void -_pack_one_cred(credential_state_t *credential_state_ptr, Buf buffer) +_pack_one_cred(credential_state_t *state, Buf buffer) { - pack32(credential_state_ptr->job_id, buffer); - pack16(credential_state_ptr->revoked, buffer); - pack16(credential_state_ptr->procs_allocated, buffer); - pack16(credential_state_ptr->total_procs, buffer); - pack_time(credential_state_ptr->revoke_time, buffer); - pack_time(credential_state_ptr->expiration, buffer); + pack32(state->job_id, buffer); + pack16(state->revoked, buffer); + pack16(state->procs_allocated, buffer); + pack16(state->total_procs, buffer); + pack_time(state->revoke_time, buffer); + pack_time(state->expiration, buffer); } static int -_unpack_one_cred(credential_state_t *credential_state_ptr, Buf buffer) +_unpack_one_cred(credential_state_t *state, Buf buffer) { - safe_unpack32(&credential_state_ptr->job_id, buffer); - safe_unpack16(&credential_state_ptr->revoked, buffer); - safe_unpack16(&credential_state_ptr->procs_allocated, buffer); - safe_unpack16(&credential_state_ptr->total_procs, buffer); - unpack_time(&credential_state_ptr->revoke_time, buffer); - unpack_time(&credential_state_ptr->expiration, buffer); + safe_unpack32(&state->job_id, buffer); + safe_unpack16(&state->revoked, buffer); + safe_unpack16(&state->procs_allocated, buffer); + safe_unpack16(&state->total_procs, buffer); + unpack_time(&state->revoke_time, buffer); + unpack_time(&state->expiration, buffer); return SLURM_SUCCESS; unpack_error: diff --git a/src/common/fd.c b/src/common/fd.c index c45736e0ec1985876b7dd9a405dd877c515210a4..13b9e93c80b20488cccc52fb3fcd3819d8e0eeba 100644 --- a/src/common/fd.c +++ b/src/common/fd.c @@ -66,6 +66,19 @@ void fd_set_nonblocking(int fd) return; } +void fd_set_blocking(int fd) +{ + int fval; + + assert(fd >= 0); + + if ((fval = fcntl(fd, F_GETFL, 0)) < 0) + error("fcntl(F_GETFL) failed: %m"); + if (fcntl(fd, F_SETFL, fval & ~O_NONBLOCK) < 0) + error("fcntl(F_SETFL) failed: %m"); + return; +} + int fd_get_read_lock(int fd) { diff --git a/src/common/fd.h b/src/common/fd.h index 17e24010660abad43eecf5736da8668a467c427c..24f66ac69c53420a2f6e18178e11778860727163 100644 --- a/src/common/fd.h +++ b/src/common/fd.h @@ -47,6 +47,11 @@ void fd_set_nonblocking(int fd); * Sets the file descriptor (fd) for non-blocking I/O. */ +void fd_set_blocking(int fd); +/* + * Sets the file descriptor (fd) for blocking I/O. + */ + int fd_get_read_lock(int fd); /* * Obtain a read lock on the file specified by (fd). diff --git a/src/common/signature_utils.c b/src/common/signature_utils.c index fecbf84d18e8adc81c09af0c2d86df081c1408db..3e6eeec1539387ea7c332f09abdc3c9cf3e86b1f 100644 --- a/src/common/signature_utils.c +++ b/src/common/signature_utils.c @@ -100,48 +100,43 @@ int slurm_init_verifier(slurm_ssl_key_ctx_t * ctx, char *path) int slurm_destroy_ssl_key_ctx(slurm_ssl_key_ctx_t * ctx) { - if (ctx) - EVP_PKEY_free(ctx->key.private); + if (ctx) EVP_PKEY_free(ctx->key.private); return SLURM_SUCCESS; } int -slurm_ssl_sign(slurm_ssl_key_ctx_t *ctx, char *data_buffer, - int data_length, char *signature_buffer, - int *signature_length) +slurm_ssl_sign(slurm_ssl_key_ctx_t *ctx, + char *data, int datalen, char *sig, int *siglen ) { - int rc; - EVP_MD_CTX md_ctx; + EVP_MD_CTX ectx; + if (EVP_PKEY_size(ctx->key.private) > SLURM_SSL_SIGNATURE_LENGTH) slurm_seterrno_ret(ESLURMD_SIGNATURE_FIELD_TOO_SMALL); - EVP_SignInit(&md_ctx, EVP_sha1()); - EVP_SignUpdate(&md_ctx, data_buffer, data_length); - if ((rc = - EVP_SignFinal(&md_ctx, signature_buffer, signature_length, - ctx->key.private)) != SLURM_OPENSSL_SIGNED) { + EVP_SignInit(&ectx, EVP_sha1()); + + EVP_SignUpdate(&ectx, data, datalen); + + if (!EVP_SignFinal(&ectx, sig, siglen, ctx->key.private)) { ERR_print_errors_fp(log_fp()); - slurm_seterrno(ESLURMD_OPENSSL_ERROR); - return SLURM_ERROR; + slurm_seterrno_ret(ESLURMD_OPENSSL_ERROR); } - info("signature length = %d", *signature_length); + return SLURM_SUCCESS; } int -slurm_ssl_verify(slurm_ssl_key_ctx_t * ctx, char *data_buffer, - int data_length, char *signature_buffer, - int signature_length) +slurm_ssl_verify(slurm_ssl_key_ctx_t * ctx, + char *data, int datalen, char *sig, int siglen) { - int rc; - EVP_MD_CTX md_ctx; - - EVP_VerifyInit(&md_ctx, EVP_sha1()); - EVP_VerifyUpdate(&md_ctx, data_buffer, data_length); - if ((rc = - EVP_VerifyFinal(&md_ctx, signature_buffer, signature_length, - ctx->key.public)) != SLURM_OPENSSL_VERIFIED) { + EVP_MD_CTX ectx; + + EVP_VerifyInit(&ectx, EVP_sha1()); + + EVP_VerifyUpdate(&ectx, data, datalen); + + if (!EVP_VerifyFinal(&ectx, sig, siglen, ctx->key.public)) { error("EVP_VerifyFinal: %s", ERR_error_string(ERR_get_error(), NULL)); slurm_seterrno_ret(ESLURMD_OPENSSL_ERROR); diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index 6ab8a464ee526f5704a52dfd77c1747ab90dfd36..043d89fb3477c677142e4d99e1c7917081f75657 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -580,6 +580,7 @@ int _slurm_set_stream_non_blocking ( slurm_fd open_fd ) return SLURM_SOCKET_ERROR ; } flags |= O_NONBLOCK ; + return _slurm_fcntl ( open_fd , F_SETFL , flags ) ; } @@ -820,7 +821,7 @@ void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , host_info = get_host_by_name( host, (void *) &hostent_buf, sizeof(hostent_buf), NULL ); if (host_info == NULL) { - fatal ("get_host_by_name failure on %s", host); + error ("get_host_by_name failure on %s", host); slurm_address->sin_family = 0; slurm_address->sin_port = 0; return; diff --git a/src/srun/io.c b/src/srun/io.c index 0952f647df18961b6d0f1b351dee77b437ca1a54..75bb553baa0b75a687897220239b2c78a411f0c5 100644 --- a/src/srun/io.c +++ b/src/srun/io.c @@ -163,6 +163,7 @@ _update_task_state(job_t *job, int taskid) slurm_mutex_unlock(&job->task_mutex); } + static void _do_output(cbuf_t buf, FILE *out, int tasknum) { @@ -170,29 +171,35 @@ _do_output(cbuf_t buf, FILE *out, int tasknum) int tot = 0; char line[4096]; + /* + * This here is a hack to ensure that output streams are + * set blocking. Until I can figure out where stdout/err + * are getting set to nonblocking I/O, this test should + * remain here. + */ + if ((fcntl(fileno(out), F_GETFL, 0) & O_NONBLOCK)) + fd_set_blocking(fileno(out)); + while ((len = cbuf_read_line(buf, line, sizeof(line), 1))) { - int n = len; + int n = 0; if (opt.labelio) fprintf(out, "%0*d: ", fmt_width, tasknum); - if ((n -= fprintf(out, "%s", line)) != 0) { - error("Need to rewind %d bytes", n); - cbuf_rewind(buf, n); - } - - tot += (len - n); + if ((n = fprintf(out, "%s", line)) < 0) { + error("Need to rewind %d bytes: %m", len); + goto done; + } else + tot += n; } - /* if (fflush(out) == EOF) - error ("fflush: %m"); - */ + done: + fflush(NULL); - debug3("Wrote %d bytes output. %d still buffered", - tot, cbuf_used(buf)); + debug3("do_output: [%d %d %d]", tot, cbuf_used(buf), cbuf_size(buf)); nwritten += tot; - + return; } static void @@ -219,7 +226,6 @@ _flush_io(job_t *job) _close_stream(&job->err[i], stderr, i); } - fclose(job->outstream); debug3("Read %dB from tasks, wrote %dB", nbytes, nwritten); } @@ -271,7 +277,8 @@ _io_thr_poll(void *job_arg) _poll_set_rd(fds[i], job->iofd[i]); while (!_io_thr_done(job)) { - int eofcnt = 0; + int eofcnt = 0; + nfds = job->niofds; /* already have n ioport fds + stdin */ if ((job->stdinfd >= 0) && stdin_open) { @@ -279,6 +286,7 @@ _io_thr_poll(void *job_arg) nfds++; } + for (i = 0; i < opt.nprocs; i++) { if (job->out[i] >= 0) { _poll_set_rd(fds[nfds], job->out[i]); @@ -308,6 +316,7 @@ _io_thr_poll(void *job_arg) eofcnt++; _update_task_state(job, i); } + } /* exit if we have received EOF on all streams */ @@ -330,10 +339,12 @@ _io_thr_poll(void *job_arg) while ((!_io_thr_done(job)) && ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0)) { + if (rc == 0) { /* timeout */ _do_poll_timeout(job); continue; } + switch(errno) { case EINTR: case EAGAIN: @@ -349,7 +360,6 @@ _io_thr_poll(void *job_arg) } } - debug3("poll returned with rc = %d", rc); for (i = 0; i < job->niofds; i++) { if (fds[i].revents) { @@ -565,7 +575,7 @@ open_streams(job_t *job) * with our own buffers. (Also, stdio buffering seems to * causing some problems with loss of output) */ - setvbuf(job->outstream, NULL, _IONBF, 0); + /* setvbuf(job->outstream, NULL, _IONBF, 0); */ return 0; } @@ -732,10 +742,6 @@ _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum) return len; } - debug3("Wrote %d bytes into buffer of size %d", len, cbuf_used(buf)); - if (dropped) - debug3("dropped %d bytes", dropped); - nbytes += len; _do_output(buf, out, tasknum); diff --git a/src/srun/srun.c b/src/srun/srun.c index ffb2da72f270b9d7c36735372e024ccd77a2bbfc..d707c46a4a4ee9882ca16d349cab3dcbdeb4ce89 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -47,6 +47,7 @@ #include <string.h> #include <signal.h> #include <unistd.h> +#include <fcntl.h> #include "src/common/log.h" #include "src/common/slurm_protocol_api.h" @@ -105,6 +106,7 @@ srun(int ac, char **av) log_options_t logopt = LOG_OPTS_STDERR_ONLY; + log_init(xbasename(av[0]), logopt, 0, NULL); /* set default options, process commandline arguments, and @@ -184,6 +186,9 @@ srun(int ac, char **av) /* job structure should now be filled in */ + fd_set_blocking(STDOUT_FILENO); + fd_set_blocking(STDERR_FILENO); + if (msg_thr_create(job) < 0) job_fatal(job, "Unable to create msg thread");