From d287700585cac1d56005ea31467adca646b2ee3a Mon Sep 17 00:00:00 2001
From: Mark Grondona <mgrondona@llnl.gov>
Date: Fri, 14 Feb 2003 00:32:43 +0000
Subject: [PATCH]  o common/{credential,signature}_utils.c : code cleanup  o
 srun/ : place temporary fix for stdout nonblocking I/O problem

---
 src/common/credential_utils.c                 | 93 +++++++++----------
 src/common/fd.c                               | 13 +++
 src/common/fd.h                               |  5 +
 src/common/signature_utils.c                  | 47 +++++-----
 .../slurm_protocol_socket_implementation.c    |  3 +-
 src/srun/io.c                                 | 48 +++++-----
 src/srun/srun.c                               |  5 +
 7 files changed, 117 insertions(+), 97 deletions(-)

diff --git a/src/common/credential_utils.c b/src/common/credential_utils.c
index 7c4ebe9f82f..e247c85b63e 100644
--- a/src/common/credential_utils.c
+++ b/src/common/credential_utils.c
@@ -51,20 +51,15 @@
 /* prototypes */
 
 static int  _clear_expired_revoked_credentials(List list);
-static int  _is_credential_still_valid(slurm_job_credential_t * credential,
-				       List list);
+static int  _is_credential_still_valid(slurm_job_credential_t *, List);
 static void _free_credential_state(void *credential_state);
-static int  _init_credential_state(credential_state_t * state,
-				   slurm_job_credential_t * cred);
-static int  _insert_credential_state(slurm_job_credential_t * credential,
-				     List list);
-static int  _insert_revoked_credential_state(revoke_credential_msg_t *
-					     revoke_msg, List list);
-static void _pack_one_cred(credential_state_t *credential_state_ptr, 
-			   Buf buffer);
-static int  _unpack_one_cred(credential_state_t *credential_state_ptr, 
-			     Buf buffer);
+static int  _insert_credential_state(slurm_job_credential_t *l, List);
+static int  _insert_revoked_credential_state(revoke_credential_msg_t *, List);
+static void _pack_one_cred(credential_state_t *, Buf);
+static int  _unpack_one_cred(credential_state_t *, Buf);
 
+static int  _init_credential_state(credential_state_t *, 
+		                   slurm_job_credential_t *);
 
 int
 sign_credential(slurm_ssl_key_ctx_t * ctx, slurm_job_credential_t * cred)
@@ -277,18 +272,17 @@ int _insert_credential_state(slurm_job_credential_t * credential, List list)
 }
 
 int
-_insert_revoked_credential_state(revoke_credential_msg_t * revoke_msg,
-				List list)
+_insert_revoked_credential_state(revoke_credential_msg_t *msg, List list)
 {
 	time_t now = time(NULL);
-	credential_state_t *credential_state;
-
-	credential_state = xmalloc(sizeof(slurm_job_credential_t));
-	credential_state->job_id = revoke_msg->job_id;
-	credential_state->expiration = revoke_msg->expiration_time;
-	credential_state->revoked = true;
-	credential_state->revoke_time = now;
-	list_append(list, credential_state);
+	credential_state_t *s = xmalloc(sizeof(*s));
+
+	s->job_id      = msg->job_id;
+	s->expiration  = msg->expiration_time;
+	s->revoked     = true;
+	s->revoke_time = now;
+
+	list_append(list, s);
 	return SLURM_SUCCESS;
 }
 
@@ -300,13 +294,13 @@ _insert_revoked_credential_state(revoke_credential_msg_t * revoke_msg,
  */ 
 void pack_credential_list(List list, Buf buffer)
 {
-	ListIterator iterator;
-	credential_state_t *credential_state_ptr;
+	ListIterator        i = NULL;
+	credential_state_t *s = NULL;
 
-	iterator = list_iterator_create(list);
-	while ((credential_state_ptr = list_next(iterator)))
-		_pack_one_cred(credential_state_ptr, buffer);
-	list_iterator_destroy(iterator);
+	i = list_iterator_create(list);
+	while ((s = list_next(i)))
+		_pack_one_cred(s, buffer);
+	list_iterator_destroy(i);
 }
 
 /* unpack_credential_list
@@ -319,39 +313,40 @@ void pack_credential_list(List list, Buf buffer)
  */ 
 int unpack_credential_list(List list, Buf buffer)
 {
-	credential_state_t *credential_state_ptr;
+	credential_state_t *s = NULL;
 
-	while (1) {
-		credential_state_ptr = xmalloc(sizeof(slurm_job_credential_t));
-		if (_unpack_one_cred(credential_state_ptr, buffer)) {
-			xfree(credential_state_ptr);
+	do {
+		s = xmalloc(sizeof(slurm_job_credential_t));
+		if (_unpack_one_cred(s, buffer)) {
+			xfree(s);
 			return SLURM_ERROR;
 		} else	
-			list_append(list, credential_state_ptr);
-	}
+			list_append(list, s);
+	} while (remaining_buf(buffer));
+
 	return SLURM_SUCCESS;
 }
 
 static void 
-_pack_one_cred(credential_state_t *credential_state_ptr, Buf buffer)
+_pack_one_cred(credential_state_t *state, Buf buffer)
 {
-	pack32(credential_state_ptr->job_id,		buffer);
-	pack16(credential_state_ptr->revoked,		buffer);
-	pack16(credential_state_ptr->procs_allocated,	buffer);
-	pack16(credential_state_ptr->total_procs,	buffer);
-	pack_time(credential_state_ptr->revoke_time,	buffer);
-	pack_time(credential_state_ptr->expiration,	buffer);
+	pack32(state->job_id,		buffer);
+	pack16(state->revoked,		buffer);
+	pack16(state->procs_allocated,	buffer);
+	pack16(state->total_procs,	buffer);
+	pack_time(state->revoke_time,	buffer);
+	pack_time(state->expiration,	buffer);
 }
 
 static int 
-_unpack_one_cred(credential_state_t *credential_state_ptr, Buf buffer)
+_unpack_one_cred(credential_state_t *state, Buf buffer)
 {
-	safe_unpack32(&credential_state_ptr->job_id,		buffer);
-	safe_unpack16(&credential_state_ptr->revoked,		buffer);
-	safe_unpack16(&credential_state_ptr->procs_allocated,	buffer);
-	safe_unpack16(&credential_state_ptr->total_procs,	buffer);
-	unpack_time(&credential_state_ptr->revoke_time,		buffer);
-	unpack_time(&credential_state_ptr->expiration,		buffer);
+	safe_unpack32(&state->job_id,		buffer);
+	safe_unpack16(&state->revoked,		buffer);
+	safe_unpack16(&state->procs_allocated,	buffer);
+	safe_unpack16(&state->total_procs,	buffer);
+	unpack_time(&state->revoke_time,		buffer);
+	unpack_time(&state->expiration,		buffer);
 	return SLURM_SUCCESS;
 
       unpack_error:
diff --git a/src/common/fd.c b/src/common/fd.c
index c45736e0ec1..13b9e93c80b 100644
--- a/src/common/fd.c
+++ b/src/common/fd.c
@@ -66,6 +66,19 @@ void fd_set_nonblocking(int fd)
     return;
 }
 
+void fd_set_blocking(int fd)
+{
+    int fval;
+
+    assert(fd >= 0);
+
+    if ((fval = fcntl(fd, F_GETFL, 0)) < 0)
+	    error("fcntl(F_GETFL) failed: %m");
+    if (fcntl(fd, F_SETFL, fval & ~O_NONBLOCK) < 0)
+	    error("fcntl(F_SETFL) failed: %m");
+    return;
+}
+
 
 int fd_get_read_lock(int fd)
 {
diff --git a/src/common/fd.h b/src/common/fd.h
index 17e24010660..24f66ac69c5 100644
--- a/src/common/fd.h
+++ b/src/common/fd.h
@@ -47,6 +47,11 @@ void fd_set_nonblocking(int fd);
  *  Sets the file descriptor (fd) for non-blocking I/O.
  */
 
+void fd_set_blocking(int fd);
+/*
+ * Sets the file descriptor (fd) for blocking I/O.
+ */
+
 int fd_get_read_lock(int fd);
 /*
  *  Obtain a read lock on the file specified by (fd).
diff --git a/src/common/signature_utils.c b/src/common/signature_utils.c
index fecbf84d18e..3e6eeec1539 100644
--- a/src/common/signature_utils.c
+++ b/src/common/signature_utils.c
@@ -100,48 +100,43 @@ int slurm_init_verifier(slurm_ssl_key_ctx_t * ctx, char *path)
 
 int slurm_destroy_ssl_key_ctx(slurm_ssl_key_ctx_t * ctx)
 {
-	if (ctx) 
-		EVP_PKEY_free(ctx->key.private);
+	if (ctx) EVP_PKEY_free(ctx->key.private);
 	return SLURM_SUCCESS;
 }
 
 
 int
-slurm_ssl_sign(slurm_ssl_key_ctx_t *ctx, char *data_buffer,
-	       int data_length, char *signature_buffer,
-	       int *signature_length)
+slurm_ssl_sign(slurm_ssl_key_ctx_t *ctx, 
+               char *data, int datalen, char *sig,  int *siglen )
 {
-	int rc;
-	EVP_MD_CTX md_ctx;
+	EVP_MD_CTX ectx;
+
 	if (EVP_PKEY_size(ctx->key.private) > SLURM_SSL_SIGNATURE_LENGTH) 
 		slurm_seterrno_ret(ESLURMD_SIGNATURE_FIELD_TOO_SMALL);
 
-	EVP_SignInit(&md_ctx, EVP_sha1());
-	EVP_SignUpdate(&md_ctx, data_buffer, data_length);
-	if ((rc = 
-	    EVP_SignFinal(&md_ctx, signature_buffer, signature_length,
-			   ctx->key.private)) != SLURM_OPENSSL_SIGNED) {
+	EVP_SignInit(&ectx, EVP_sha1());
+
+	EVP_SignUpdate(&ectx, data, datalen);
+
+	if (!EVP_SignFinal(&ectx, sig, siglen, ctx->key.private)) {
 		ERR_print_errors_fp(log_fp()); 
-		slurm_seterrno(ESLURMD_OPENSSL_ERROR);
-		return SLURM_ERROR;
+		slurm_seterrno_ret(ESLURMD_OPENSSL_ERROR);
 	}
-	info("signature length = %d", *signature_length);
+
 	return SLURM_SUCCESS;
 }
 
 int
-slurm_ssl_verify(slurm_ssl_key_ctx_t * ctx, char *data_buffer,
-		 int data_length, char *signature_buffer,
-		 int signature_length)
+slurm_ssl_verify(slurm_ssl_key_ctx_t * ctx, 
+                 char *data, int datalen, char *sig, int siglen)
 {
-	int rc;
-	EVP_MD_CTX md_ctx;
-
-	EVP_VerifyInit(&md_ctx, EVP_sha1());
-	EVP_VerifyUpdate(&md_ctx, data_buffer, data_length);
-	if ((rc =
-	     EVP_VerifyFinal(&md_ctx, signature_buffer, signature_length,
-			     ctx->key.public)) != SLURM_OPENSSL_VERIFIED) {
+	EVP_MD_CTX ectx;
+
+	EVP_VerifyInit(&ectx, EVP_sha1());
+
+	EVP_VerifyUpdate(&ectx, data, datalen);
+
+	if (!EVP_VerifyFinal(&ectx, sig, siglen, ctx->key.public)) {
 		error("EVP_VerifyFinal: %s", 
 		      ERR_error_string(ERR_get_error(), NULL));
 		slurm_seterrno_ret(ESLURMD_OPENSSL_ERROR);
diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c
index 6ab8a464ee5..043d89fb347 100644
--- a/src/common/slurm_protocol_socket_implementation.c
+++ b/src/common/slurm_protocol_socket_implementation.c
@@ -580,6 +580,7 @@ int _slurm_set_stream_non_blocking ( slurm_fd open_fd )
 		return SLURM_SOCKET_ERROR ;
 	}
 	flags |= O_NONBLOCK ;
+
 	return _slurm_fcntl ( open_fd , F_SETFL , flags )  ;
 }
 
@@ -820,7 +821,7 @@ void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port ,
 		host_info = get_host_by_name( host, (void *) &hostent_buf, 
 				sizeof(hostent_buf), NULL );
 		if (host_info == NULL) {
-			fatal ("get_host_by_name failure on %s", host);
+			error ("get_host_by_name failure on %s", host);
 			slurm_address->sin_family = 0;
 			slurm_address->sin_port = 0;
 			return;
diff --git a/src/srun/io.c b/src/srun/io.c
index 0952f647df1..75bb553baa0 100644
--- a/src/srun/io.c
+++ b/src/srun/io.c
@@ -163,6 +163,7 @@ _update_task_state(job_t *job, int taskid)
 	slurm_mutex_unlock(&job->task_mutex);
 }
 
+
 static void
 _do_output(cbuf_t buf, FILE *out, int tasknum)
 {
@@ -170,29 +171,35 @@ _do_output(cbuf_t buf, FILE *out, int tasknum)
 	int  tot     = 0;
 	char line[4096];
 
+	/* 
+	 * This here is a hack to ensure that output streams are
+	 * set blocking. Until I can figure out where stdout/err
+	 * are getting set to nonblocking I/O, this test should
+	 * remain here.
+	 */
+	if ((fcntl(fileno(out), F_GETFL, 0) & O_NONBLOCK)) 
+		fd_set_blocking(fileno(out));
+
 	while ((len = cbuf_read_line(buf, line, sizeof(line), 1))) {
-		int n = len;
+		int n = 0;
 
 		if (opt.labelio)
 			fprintf(out, "%0*d: ", fmt_width, tasknum);
 
-		if ((n -= fprintf(out, "%s", line)) != 0) { 
-			error("Need to rewind %d bytes", n);
-			cbuf_rewind(buf, n);
-		} 
-
-		tot += (len - n);
+		if ((n = fprintf(out, "%s", line)) < 0) { 
+			error("Need to rewind %d bytes: %m", len);
+			goto done;
+		} else
+			tot += n;
 	}
 
-	/* if (fflush(out) == EOF)
-		error ("fflush: %m");
-	*/
+    done:
+	fflush(NULL);
 
-	debug3("Wrote %d bytes output. %d still buffered", 
-	       tot, cbuf_used(buf));
+	debug3("do_output: [%d %d %d]", tot, cbuf_used(buf), cbuf_size(buf));
 
 	nwritten += tot;
-
+	return;
 }
 
 static void
@@ -219,7 +226,6 @@ _flush_io(job_t *job)
 			_close_stream(&job->err[i], stderr, i);
 	}
 
-	fclose(job->outstream);
 	debug3("Read %dB from tasks, wrote %dB", nbytes, nwritten);
 }
 
@@ -271,7 +277,8 @@ _io_thr_poll(void *job_arg)
 		_poll_set_rd(fds[i], job->iofd[i]);
 
 	while (!_io_thr_done(job)) {
-		int eofcnt = 0;
+		int  eofcnt = 0;
+
 		nfds = job->niofds; /* already have n ioport fds + stdin */
 
 		if ((job->stdinfd >= 0) && stdin_open) {
@@ -279,6 +286,7 @@ _io_thr_poll(void *job_arg)
 			nfds++;
 		}
 
+
 		for (i = 0; i < opt.nprocs; i++) {
 			if (job->out[i] >= 0) {
 				_poll_set_rd(fds[nfds], job->out[i]);
@@ -308,6 +316,7 @@ _io_thr_poll(void *job_arg)
 				eofcnt++;
 				_update_task_state(job, i);
 			}
+
 		}
 
 		/* exit if we have received EOF on all streams */
@@ -330,10 +339,12 @@ _io_thr_poll(void *job_arg)
 
 		while ((!_io_thr_done(job)) && 
 		       ((rc = poll(fds, nfds, POLL_TIMEOUT_MSEC)) <= 0)) {
+
 			if (rc == 0) {	/* timeout */
 				_do_poll_timeout(job);
 				continue;
 			}
+
 			switch(errno) {
 				case EINTR:
 				case EAGAIN:
@@ -349,7 +360,6 @@ _io_thr_poll(void *job_arg)
 			}
 		}
 
-		debug3("poll returned with rc = %d", rc);
 
 		for (i = 0; i < job->niofds; i++) {
 			if (fds[i].revents) {
@@ -565,7 +575,7 @@ open_streams(job_t *job)
 	 * with our own buffers. (Also, stdio buffering seems to
 	 * causing some problems with loss of output)
 	 */
-	setvbuf(job->outstream, NULL, _IONBF, 0);
+	 /* setvbuf(job->outstream, NULL, _IONBF, 0); */
 
 	return 0;
 }
@@ -732,10 +742,6 @@ _do_task_output(int *fd, FILE *out, cbuf_t buf, int tasknum)
 		return len;
 	}
 
-	debug3("Wrote %d bytes into buffer of size %d", len, cbuf_used(buf));
-	if (dropped)
-		debug3("dropped %d bytes", dropped);
-
 	nbytes += len;
 
 	_do_output(buf, out, tasknum);
diff --git a/src/srun/srun.c b/src/srun/srun.c
index ffb2da72f27..d707c46a4a4 100644
--- a/src/srun/srun.c
+++ b/src/srun/srun.c
@@ -47,6 +47,7 @@
 #include <string.h>
 #include <signal.h>
 #include <unistd.h>
+#include <fcntl.h>
 
 #include "src/common/log.h"
 #include "src/common/slurm_protocol_api.h"
@@ -105,6 +106,7 @@ srun(int ac, char **av)
 
 	log_options_t logopt = LOG_OPTS_STDERR_ONLY;
 
+
 	log_init(xbasename(av[0]), logopt, 0, NULL);
 
 	/* set default options, process commandline arguments, and
@@ -184,6 +186,9 @@ srun(int ac, char **av)
 
 	/* job structure should now be filled in */
 
+	fd_set_blocking(STDOUT_FILENO);
+	fd_set_blocking(STDERR_FILENO);
+
 	if (msg_thr_create(job) < 0)
 		job_fatal(job, "Unable to create msg thread");
 
-- 
GitLab