From a9cdb939c61cc0425bb61f22bbe74f37bc1f85c5 Mon Sep 17 00:00:00 2001
From: Danny Auble <da@llnl.gov>
Date: Thu, 21 Jul 2005 19:33:33 +0000
Subject: [PATCH] more files added for mpi.

---
 src/common/global_srun.c | 221 +++++++++++++++++++++++++++++++
 src/common/global_srun.h |  35 +++++
 src/common/mpi.c         | 272 +++++++++++++++++++++++++++++++++++++++
 src/common/mpi.h         |  48 +++++++
 src/common/net.c         | 147 +++++++++++++++++++++
 src/common/net.h         |  20 +++
 6 files changed, 743 insertions(+)
 create mode 100644 src/common/global_srun.c
 create mode 100644 src/common/global_srun.h
 create mode 100644 src/common/mpi.c
 create mode 100644 src/common/mpi.h
 create mode 100644 src/common/net.c
 create mode 100644 src/common/net.h

diff --git a/src/common/global_srun.c b/src/common/global_srun.c
new file mode 100644
index 00000000000..5af987e38e9
--- /dev/null
+++ b/src/common/global_srun.c
@@ -0,0 +1,221 @@
+/*****************************************************************************\
+ * src/common/global_srun.c - functions needed by more than just srun
+ *****************************************************************************
+ *  Copyright (C) 2002 The Regents of the University of California.
+ *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+ *  Written by Mark Grondona <mgrondona@llnl.gov>, and
+ *             Moe Jette     <jette@llnl.gov>
+ *  UCRL-CODE-2002-040.
+ *  
+ *  This file is part of SLURM, a resource management program.
+ *  For details, see <http://www.llnl.gov/linux/slurm/>.
+ *  
+ *  SLURM is free software; you can redistribute it and/or modify it under
+ *  the terms of the GNU General Public License as published by the Free
+ *  Software Foundation; either version 2 of the License, or (at your option)
+ *  any later version.
+ *  
+ *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
+ *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+ *  details.
+ *  
+ *  You should have received a copy of the GNU General Public License along
+ *  with SLURM; if not, write to the Free Software Foundation, Inc.,
+ *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
+\*****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+#  include "config.h"
+#endif
+
+#if HAVE_PTHREAD
+#include <pthread.h>
+#endif
+
+#include <signal.h>
+#include <string.h>
+
+#include <slurm/slurm_errno.h>
+
+#include "src/common/log.h"
+#include "src/common/macros.h"
+#include "src/common/slurm_protocol_api.h"
+#include "src/common/slurm_protocol_defs.h"
+#include "src/common/xmalloc.h"
+#include "src/common/xsignal.h"
+#include "src/common/global_srun.h"
+
+/*
+ *  Static list of signals to block in srun:
+ */
+static int srun_sigarray[] = {
+	SIGINT,  SIGQUIT, SIGTSTP, SIGCONT, SIGTERM,
+	SIGALRM, SIGUSR1, SIGUSR2, SIGPIPE, 0
+};
+
+/* number of active threads */
+static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  active_cond  = PTHREAD_COND_INITIALIZER;
+static int             active = 0;
+
+typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t;
+
+typedef struct thd {
+        pthread_t	thread;			/* thread ID */
+        pthread_attr_t	attr;			/* thread attributes */
+        state_t		state;      		/* thread state */
+} thd_t;
+
+typedef struct task_info {
+	slurm_msg_t *req_ptr;
+	srun_job_t *job_ptr;
+	int host_inx;
+} task_info_t;
+
+
+/* 
+ * Static prototypes
+ */
+static void   _p_fwd_signal(slurm_msg_t *, srun_job_t *);
+static void * _p_signal_task(void *);
+
+void 
+fwd_signal(srun_job_t *job, int signo)
+{
+	int i;
+	slurm_msg_t *req;
+	kill_tasks_msg_t msg;
+	static pthread_mutex_t sig_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+	slurm_mutex_lock(&sig_mutex);
+
+	if (signo == SIGKILL || signo == SIGINT || signo == SIGTERM) {
+		slurm_mutex_lock(&job->state_mutex);
+		job->signaled = true;
+		slurm_mutex_unlock(&job->state_mutex);
+	}
+
+	debug2("forward signal %d to job", signo);
+
+	/* common to all tasks */
+	msg.job_id      = job->jobid;
+	msg.job_step_id = job->stepid;
+	msg.signal      = (uint32_t) signo;
+
+	req = xmalloc(sizeof(slurm_msg_t) * job->nhosts);
+
+	for (i = 0; i < job->nhosts; i++) {
+		if (job->host_state[i] != SRUN_HOST_REPLIED) {
+			debug2("%s has not yet replied\n", job->host[i]);
+			continue;
+		}
+
+		if (job_active_tasks_on_host(job, i) == 0)
+			continue;
+
+		req[i].msg_type = REQUEST_KILL_TASKS;
+		req[i].data     = &msg;
+		memcpy( &req[i].address, 
+		        &job->slurmd_addr[i], sizeof(slurm_addr));
+	}
+
+	_p_fwd_signal(req, job);
+
+	debug2("All tasks have been signalled");
+	xfree(req);
+	slurm_mutex_unlock(&sig_mutex);
+}
+
+int
+job_active_tasks_on_host(srun_job_t *job, int hostid)
+{
+	int i;
+	int retval = 0;
+
+	slurm_mutex_lock(&job->task_mutex);
+	for (i = 0; i < job->ntask[hostid]; i++) {
+		uint32_t tid = job->tids[hostid][i];
+		if (job->task_state[tid] == SRUN_TASK_RUNNING) 
+			retval++;
+	}
+	slurm_mutex_unlock(&job->task_mutex);
+	return retval;
+}
+
+/* _p_fwd_signal - parallel (multi-threaded) task signaller */
+static void _p_fwd_signal(slurm_msg_t *req, srun_job_t *job)
+{
+	int i;
+	task_info_t *tinfo;
+	thd_t *thd;
+
+	thd = xmalloc(job->nhosts * sizeof (thd_t));
+	for (i = 0; i < job->nhosts; i++) {
+		if (req[i].msg_type == 0)
+			continue;	/* inactive task */
+
+		slurm_mutex_lock(&active_mutex);
+		while (active >= opt.max_threads) {
+			pthread_cond_wait(&active_cond, &active_mutex);
+		}
+		active++;
+		slurm_mutex_unlock(&active_mutex);
+
+		tinfo = (task_info_t *)xmalloc(sizeof(task_info_t));
+		tinfo->req_ptr  = &req[i];
+		tinfo->job_ptr  = job;
+		tinfo->host_inx = i;
+
+		slurm_attr_init(&thd[i].attr);
+		if (pthread_attr_setdetachstate(&thd[i].attr, 
+		                                PTHREAD_CREATE_DETACHED))
+			error ("pthread_attr_setdetachstate failed");
+		if (pthread_create( &thd[i].thread, &thd[i].attr, 
+			            _p_signal_task, (void *) tinfo )) {
+			error ("pthread_create failed");
+			_p_signal_task((void *) tinfo);
+		}
+	}
+
+
+	slurm_mutex_lock(&active_mutex);
+	while (active > 0) {
+		pthread_cond_wait(&active_cond, &active_mutex);
+	}
+	slurm_mutex_unlock(&active_mutex);
+	xfree(thd);
+}
+
+/* _p_signal_task - parallelized signal of a specific task */
+static void * _p_signal_task(void *args)
+{
+	int          rc   = SLURM_SUCCESS;
+	task_info_t *info = (task_info_t *)args;
+	slurm_msg_t *req  = info->req_ptr;
+	srun_job_t  *job  = info->job_ptr;
+	char        *host = job->host[info->host_inx];
+
+	debug3("sending signal to host %s", host);
+	if (slurm_send_recv_rc_msg(req, &rc, 0) < 0) { 
+		error("%s: signal: %m", host);
+		goto done;
+	}
+
+	/*
+	 *  Report error unless it is "Invalid job id" which 
+	 *    probably just means the tasks exited in the meanwhile.
+	 */
+	if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) && (rc != ESRCH)) 
+		error("%s: signal: %s", host, slurm_strerror(rc));
+
+    done:
+	slurm_mutex_lock(&active_mutex);
+	active--;
+	pthread_cond_signal(&active_cond);
+	slurm_mutex_unlock(&active_mutex);
+	xfree(args);
+	return NULL;
+}
+
+
diff --git a/src/common/global_srun.h b/src/common/global_srun.h
new file mode 100644
index 00000000000..3607d68444c
--- /dev/null
+++ b/src/common/global_srun.h
@@ -0,0 +1,35 @@
+/*****************************************************************************\
+ * src/common/global_srun.c - functions needed by more than just srun
+ *****************************************************************************
+ *  Copyright (C) 2002 The Regents of the University of California.
+ *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+ *  Written by Mark Grodnona <mgrondona@llnl.gov>.
+ *  UCRL-CODE-2002-040.
+ *  
+ *  This file is part of SLURM, a resource management program.
+ *  For details, see <http://www.llnl.gov/linux/slurm/>.
+ *  
+ *  SLURM is free software; you can redistribute it and/or modify it under
+ *  the terms of the GNU General Public License as published by the Free
+ *  Software Foundation; either version 2 of the License, or (at your option)
+ *  any later version.
+ *  
+ *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
+ *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+ *  details.
+ *  
+ *  You should have received a copy of the GNU General Public License along
+ *  with SLURM; if not, write to the Free Software Foundation, Inc.,
+ *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
+\*****************************************************************************/
+
+#ifndef _GLOBAL_SRUN_H
+#define _GLOBAL_SRUN_H
+
+#include "src/srun/srun_job.h"
+
+void fwd_signal(srun_job_t *job, int signal);
+int job_active_tasks_on_host(srun_job_t *job, int hostid);
+
+#endif /* !_GLOBAL_SRUN_H */
diff --git a/src/common/mpi.c b/src/common/mpi.c
new file mode 100644
index 00000000000..178bc735ceb
--- /dev/null
+++ b/src/common/mpi.c
@@ -0,0 +1,272 @@
+/*****************************************************************************\
+ * src/common/mpi.c - Generic mpi selector for slurm
+ *****************************************************************************
+ *  Copyright (C) 2002 The Regents of the University of California.
+ *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+ *  Written by Mark Grondona <grondo1@llnl.gov>.
+ *  UCRL-CODE-2002-040.
+ *  
+ *  This file is part of SLURM, a resource management program.
+ *  For details, see <http://www.llnl.gov/linux/slurm/>.
+ *  
+ *  SLURM is free software; you can redistribute it and/or modify it under
+ *  the terms of the GNU General Public License as published by the Free
+ *  Software Foundation; either version 2 of the License, or (at your option)
+ *  any later version.
+ *  
+ *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
+ *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+ *  details.
+ *  
+ *  You should have received a copy of the GNU General Public License along
+ *  with SLURM; if not, write to the Free Software Foundation, Inc.,
+ *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
+\*****************************************************************************/
+
+#if HAVE_CONFIG_H
+#  include "config.h"
+#endif
+
+#include "src/common/macros.h"
+#include "src/common/plugin.h"
+#include "src/common/plugrack.h"
+#include "src/common/env.h"
+#include "src/common/mpi.h"
+#include "src/common/xmalloc.h"
+#include "src/common/xstring.h"
+
+#define MPI_DEFAULT "mpich-gm"
+
+/*
+ * WARNING:  Do not change the order of these fields or add additional
+ * fields at the beginning of the structure.  If you do, job completion
+ * logging plugins will stop working.  If you need to add fields, add them 
+ * at the end of the structure.
+ */
+typedef struct slurm_mpi_ops {
+	int          (*init)          (slurmd_job_t *job, int rank);
+	int          (*create_thread) (srun_job_t *job);
+	int          (*single_task)   (void);
+	int          (*exit)          (void);
+} slurm_mpi_ops_t;
+
+struct slurm_mpi_context {
+	char *			mpi_type;
+	plugrack_t              plugin_list;
+	plugin_handle_t         cur_plugin;
+	int                     mpi_errno;
+	slurm_mpi_ops_t	        ops;
+};
+
+static slurm_mpi_context_t g_context = NULL;
+static pthread_mutex_t      context_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static slurm_mpi_context_t
+_slurm_mpi_context_create(const char *mpi_type)
+{
+	slurm_mpi_context_t c;
+
+	if ( mpi_type == NULL ) {
+		debug3( "_slurm_mpi_context_create: no mpi type" );
+		return NULL;
+	}
+
+	c = xmalloc(sizeof(struct slurm_mpi_context));
+
+	c->mpi_errno = SLURM_SUCCESS;
+
+	/* Copy the job completion authentication type. */
+	c->mpi_type = xstrdup(mpi_type);
+	if (c->mpi_type == NULL ) {
+		debug3( "can't make local copy of mpi type" );
+		xfree(c);
+		return NULL;
+	}
+
+	/* Plugin rack is demand-loaded on first reference. */
+	c->plugin_list = NULL;
+	c->cur_plugin = PLUGIN_INVALID_HANDLE;
+
+	return c;
+}
+
+static int
+_slurm_mpi_context_destroy( slurm_mpi_context_t c )
+{
+	/*
+	 * Must check return code here because plugins might still
+	 * be loaded and active.
+	 */
+	if ( c->plugin_list ) {
+		if ( plugrack_destroy( c->plugin_list ) != SLURM_SUCCESS ) {
+			return SLURM_ERROR;
+		}
+	}
+
+	xfree(c->mpi_type);
+	xfree(c);
+
+	return SLURM_SUCCESS;
+}
+
+/*
+ * Resolve the operations from the plugin.
+ */
+static slurm_mpi_ops_t * 
+_slurm_mpi_get_ops( slurm_mpi_context_t c )
+{
+	/*
+	 * These strings must be kept in the same order as the fields
+	 * declared for slurm_mpi_ops_t.
+	 */
+	static const char *syms[] = {
+		"mpi_p_init",
+		"mpi_p_thr_create",
+		"mpi_p_single_task",
+		"mpi_p_exit"		
+	};
+	int n_syms = sizeof( syms ) / sizeof( char * );
+	char *plugin_dir = NULL;
+	
+	/* Get the plugin list, if needed. */
+	if ( c->plugin_list == NULL ) {
+		c->plugin_list = plugrack_create();
+		if ( c->plugin_list == NULL ) {
+			error("Unable to create a plugin manager");
+			return NULL; 
+		}
+
+		plugrack_set_major_type(c->plugin_list, "mpi");
+		plugrack_set_paranoia(c->plugin_list,
+				      PLUGRACK_PARANOIA_NONE,
+				      0);
+		plugin_dir = slurm_get_plugin_dir();
+		plugrack_read_dir(c->plugin_list, plugin_dir);
+		xfree(plugin_dir);
+	}
+	
+	if (strcasecmp (c->mpi_type, "mpi/list") == 0) { 
+		plugrack_print_all_plugin(c->plugin_list);
+		exit(0);
+	} else {
+		/* Find the correct plugin. */
+		c->cur_plugin = plugrack_use_by_type(c->plugin_list, 
+						     c->mpi_type);
+		if ( c->cur_plugin == PLUGIN_INVALID_HANDLE ) {
+			error("can't find a valid plugin for type %s", 
+				c->mpi_type);
+			return NULL;
+		}
+	}
+
+	/* Dereference the API. */
+	if ( plugin_get_syms( c->cur_plugin,
+				n_syms,
+				syms,
+				(void **) &c->ops ) < n_syms ) {
+		error( "incomplete mpi plugin detected" );
+		return NULL;
+	}
+
+	return &c->ops;
+}
+
+int _mpi_init (char *mpi_type)
+{
+	int retval = SLURM_SUCCESS;
+	char *full_type = NULL;
+	slurm_mutex_lock( &context_lock );
+
+	if ( g_context )
+		goto done;
+	
+	if (mpi_type == NULL)
+		mpi_type = MPI_DEFAULT;
+	
+	setenvf (NULL, "SLURM_MPI_TYPE", "%s", mpi_type);
+		
+	full_type = xmalloc(sizeof(char) * (strlen(mpi_type)+5));
+	sprintf(full_type,"mpi/%s\0",mpi_type);
+       
+	g_context = _slurm_mpi_context_create(full_type);
+	xfree(full_type);
+	if ( g_context == NULL ) {
+		error( "cannot create a context for %s", mpi_type);
+		retval = SLURM_ERROR;
+		goto done;
+	}
+
+	if ( _slurm_mpi_get_ops( g_context ) == NULL ) {
+		error( "cannot resolve plugin operations for %s", mpi_type);
+		_slurm_mpi_context_destroy( g_context );
+		g_context = NULL;
+		retval = SLURM_ERROR;
+	}
+	
+		
+done:
+	slurm_mutex_unlock( &context_lock );
+	return retval;
+}
+
+int srun_mpi_init (char *mpi_type)
+{
+	debug("mpi type = %s", mpi_type);
+	
+	if(_mpi_init(mpi_type) == SLURM_ERROR) 
+		return SLURM_ERROR;
+	
+	return SLURM_SUCCESS;
+}
+
+
+int slurmd_mpi_init (slurmd_job_t *job, int rank)
+{   
+	char *mpi_type = getenvp (job->env, "SLURM_MPI_TYPE");
+	
+	debug("mpi type = %s", mpi_type);
+
+	if(_mpi_init(mpi_type) == SLURM_ERROR) 
+		return SLURM_ERROR;
+	
+	unsetenvp (job->env, "SLURM_MPI_TYPE");	
+	return (*(g_context->ops.init))(job, rank);
+}
+
+int mpi_fini (void)
+{
+	int rc;
+
+	if (!g_context)
+		return SLURM_SUCCESS;
+
+	rc = _slurm_mpi_context_destroy(g_context);
+	return rc;
+}
+
+int slurm_mpi_thr_create(srun_job_t *job)
+{
+	if (_mpi_init(NULL) < 0)
+		return SLURM_ERROR;
+		
+	return (*(g_context->ops.create_thread))(job);
+}
+
+int slurm_mpi_single_task_per_node (void)
+{
+	if (_mpi_init(NULL) < 0)
+		return SLURM_ERROR;
+	
+	return (*(g_context->ops.single_task))();
+}
+
+int slurm_mpi_exit (void)
+{
+	if (_mpi_init(NULL) < 0)
+		return SLURM_ERROR;
+	
+	return (*(g_context->ops.exit))();
+}
+
+
diff --git a/src/common/mpi.h b/src/common/mpi.h
new file mode 100644
index 00000000000..cce49e86f0e
--- /dev/null
+++ b/src/common/mpi.h
@@ -0,0 +1,48 @@
+/*****************************************************************************\
+ * src/common/mpi.h - Generic mpi selector for slurm
+ *****************************************************************************
+ *  Copyright (C) 2002 The Regents of the University of California.
+ *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+ *  Written by Mark Grondona <grondo1@llnl.gov>.
+ *  UCRL-CODE-2002-040.
+ *  
+ *  This file is part of SLURM, a resource management program.
+ *  For details, see <http://www.llnl.gov/linux/slurm/>.
+ *  
+ *  SLURM is free software; you can redistribute it and/or modify it under
+ *  the terms of the GNU General Public License as published by the Free
+ *  Software Foundation; either version 2 of the License, or (at your option)
+ *  any later version.
+ *  
+ *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
+ *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+ *  details.
+ *  
+ *  You should have received a copy of the GNU General Public License along
+ *  with SLURM; if not, write to the Free Software Foundation, Inc.,
+ *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
+\*****************************************************************************/
+
+#ifndef _SRUN_MPI_H
+#define _SRUN_MPI_H
+
+#if HAVE_CONFIG_H
+# include "config.h"
+#endif 
+
+#include "src/srun/opt.h"
+#include "src/srun/srun_job.h"
+#include "src/slurmd/slurmd_job.h"
+
+typedef struct slurm_mpi_context *slurm_mpi_context_t;
+
+int srun_mpi_init (char *mpi_type);
+int slurmd_mpi_init (slurmd_job_t *job, int rank);
+int mpi_fini (void);
+int slurm_mpi_thr_create(srun_job_t *job);
+int slurm_mpi_single_task_per_node (void);
+int slurm_mpi_exit (void);
+
+
+#endif /* !_SRUN_MPI_H */
diff --git a/src/common/net.c b/src/common/net.c
new file mode 100644
index 00000000000..32e8b1f1703
--- /dev/null
+++ b/src/common/net.c
@@ -0,0 +1,147 @@
+/*****************************************************************************\
+ *  net.c - basic network communications for user application I/O
+ *****************************************************************************
+ *  Copyright (C) 2002 The Regents of the University of California.
+ *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
+ *  Written by Mark Grondona <grondona1@llnl.gov>, Kevin Tew <tew1@llnl.gov>, 
+ *  et. al.
+ *  UCRL-CODE-2002-040.
+ *  
+ *  This file is part of SLURM, a resource management program.
+ *  For details, see <http://www.llnl.gov/linux/slurm/>.
+ *  
+ *  SLURM is free software; you can redistribute it and/or modify it under
+ *  the terms of the GNU General Public License as published by the Free
+ *  Software Foundation; either version 2 of the License, or (at your option)
+ *  any later version.
+ *  
+ *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
+ *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+ *  details.
+ *  
+ *  You should have received a copy of the GNU General Public License along
+ *  with SLURM; if not, write to the Free Software Foundation, Inc.,
+ *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
+\*****************************************************************************/
+
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <string.h>
+#include <errno.h>
+
+#include "src/common/log.h"
+#include "src/common/net.h"
+
+#ifndef NET_DEFAULT_BACKLOG
+#  define NET_DEFAULT_BACKLOG	1024
+#endif 
+
+static int _sock_bind_wild(int sockfd)
+{
+	socklen_t len;
+	struct sockaddr_in sin;
+
+	memset(&sin, 0, sizeof(sin));
+	sin.sin_family = AF_INET;
+	sin.sin_addr.s_addr = htonl(INADDR_ANY);
+	sin.sin_port = htons(0);	/* bind ephemeral port */
+
+	if (bind(sockfd, (struct sockaddr *) &sin, sizeof(sin)) < 0)
+		return (-1);
+	len = sizeof(sin);
+	if (getsockname(sockfd, (struct sockaddr *) &sin, &len) < 0)
+		return (-1);
+	return (sin.sin_port);
+}
+
+
+
+int net_stream_listen(int *fd, int *port)
+{
+	int rc, val;
+
+	if ((*fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
+		return -1;
+
+	val = 1;
+	rc = setsockopt(*fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int));
+	if (rc > 0) 
+		goto cleanup;
+
+	*port = _sock_bind_wild(*fd);
+	if (*port < 0)
+		goto cleanup;
+#undef SOMAXCONN
+#define SOMAXCONN	1024
+	rc = listen(*fd, NET_DEFAULT_BACKLOG);
+	if (rc < 0)
+		goto cleanup;
+
+	return 1;
+
+  cleanup:
+	close(*fd);
+	return -1;
+}
+
+
+int accept_stream(int fd)
+{
+	int sd;
+
+	while ((sd = accept(fd, NULL, NULL)) < 0) {
+		if (errno == EINTR)
+			continue;
+		if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+			return -1;
+		if (errno == ECONNABORTED)
+			return -1;
+		error("Unable to accept new connection");
+	}
+
+	return sd;
+}
+
+
+int readn(int fd, void *buf, size_t nbytes)
+{
+	int n = 0;
+	char *pbuf = (char *)buf;
+	size_t nleft = nbytes;
+
+	while (nleft > 0) {
+		n = read(fd, (void *)pbuf, nleft);
+		if (n > 0) {
+			pbuf+=n;
+			nleft-=n;
+		} else if (n == 0) 	/* EOF */
+			break;
+		else if (errno == EINTR)
+			continue;
+		else {
+			debug("read error: %m");
+			break;
+		}
+	}
+	return(n);
+}
+
+int net_set_low_water(int sock, size_t size)
+{
+	if (setsockopt(sock, SOL_SOCKET, SO_RCVLOWAT, 
+	  (const void *) &size, sizeof(size)) < 0) {
+		error("Unable to set low water socket option: %m");
+		return -1;
+	}
+
+	return 0;
+}
diff --git a/src/common/net.h b/src/common/net.h
new file mode 100644
index 00000000000..9f023b3868d
--- /dev/null
+++ b/src/common/net.h
@@ -0,0 +1,20 @@
+
+#ifndef _NET_H
+#define _NET_H
+
+/* open a stream socket on an ephemereal port and put it into 
+ * the listen state. fd and port are filled in with the new
+ * socket's file descriptor and port #.
+ */
+int net_stream_listen(int *fd, int *port);
+
+/* accept the incoming connection on the stream socket fd
+ */
+int net_accept_stream(int fd);
+
+/* set low water mark on socket
+ */
+int net_set_low_water(int sock, size_t size);
+
+
+#endif /* !_NET_H */
-- 
GitLab