Skip to content
Snippets Groups Projects
sbcast.c 7.08 KiB
/*****************************************************************************\
 *  sbcast.c - Broadcast a file to allocated nodes
 *
 *  $Id: sbcast.c 6965 2006-01-04 23:31:07Z jette $
 *****************************************************************************
 *  Copyright (C) 2006 The Regents of the University of California.
 *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
 *  Written by Morris Jette <jette1@llnl.gov>
 *  UCRL-CODE-217948.
 *  
 *  This file is part of SLURM, a resource management program.
 *  For details, see <http://www.llnl.gov/linux/slurm/>.
 *  
 *  SLURM is free software; you can redistribute it and/or modify it under
 *  the terms of the GNU General Public License as published by the Free
 *  Software Foundation; either version 2 of the License, or (at your option)
 *  any later version.
 *  
 *  SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
 *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 *  details.
 *  
 *  You should have received a copy of the GNU General Public License along
 *  with SLURM; if not, write to the Free Software Foundation, Inc.,
 *  59 Temple Place, Suite 330, Boston, MA  02111-1307  USA.
\*****************************************************************************/

#if HAVE_CONFIG_H
#  include "config.h"
#endif

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <slurm/slurm_errno.h>
#include <sys/types.h>
#include <sys/stat.h>

#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/sbcast/sbcast.h"

/* global variables */
int fd;						/* source file descriptor */
struct sbcast_parameters params;		/* program parameters */
struct stat f_stat;				/* source file stats */
resource_allocation_response_msg_t *alloc_resp;	/* job specification */

static void _bcast_file(void);
static void _get_job_info(void);


int main(int argc, char *argv[])
{
	log_options_t opts = LOG_OPTS_STDERR_ONLY;
	log_init("sbcast", opts, SYSLOG_FACILITY_DAEMON, NULL);

	parse_command_line(argc, argv);
	if (params.verbose) {
		opts.stderr_level += params.verbose;
		log_alter(opts, SYSLOG_FACILITY_DAEMON, NULL);
	}

	/* validate the source file */
	if ((fd = open(params.src_fname, O_RDONLY)) < 0) {
		error("Can't open `%s`: %s", params.src_fname, 
			strerror(errno));
		exit(1);
	}
	if (fstat(fd, &f_stat)) {
		error("Can't stat `%s`: %s", params.src_fname,
			strerror(errno));
		exit(1);
	}
	verbose("modes    = %o", (unsigned int) f_stat.st_mode);
	verbose("uid      = %d", (int) f_stat.st_uid);
	verbose("gid      = %d", (int) f_stat.st_gid);
	verbose("atime    = %s", ctime(&f_stat.st_atime));
	verbose("mtime    = %s", ctime(&f_stat.st_mtime));
	verbose("ctime    = %s", ctime(&f_stat.st_ctime));
	verbose("size     = %ld", (long) f_stat.st_size);
	verbose("-----------------------------");

	/* identify the nodes allocated to the job */
	_get_job_info();

	/* transmit the file */
	_bcast_file();

	exit(0);
}

/* get details about this slurm job: jobid and allocated node */
static void _get_job_info(void)
{
	char *jobid_str;
	uint32_t jobid;

	jobid_str = getenv("SLURM_JOBID");
	if (!jobid_str) {
		error("Command only valid from within SLURM job");
		exit(1);
	}
	jobid = (uint32_t) atol(jobid_str);
	verbose("jobid      = %u", jobid);

	if (slurm_allocation_lookup(jobid, &alloc_resp) != SLURM_SUCCESS) {
		error("SLURM jobid %u lookup error: %s",
			jobid, slurm_strerror(slurm_get_errno()));
		exit(1);
	}

	verbose("node_list  = %s", alloc_resp->node_list);
	verbose("node_cnt   = %u", alloc_resp->node_cnt);
	/* also see alloc_resp->node_addr (array) */

	/* do not bother to release the return message,
	 * we need to preserve and use most of the information later */
}

/* load a buffer with data from the file to broadcast, 
 * return number of bytes read, zero on end of file */
static int _get_block(char *buffer, size_t buf_size)
{
	static int fd = 0;
	int buf_used = 0, rc;

	if (!fd) {
		fd = open(params.src_fname, O_RDONLY);
		if (!fd) {
			error("Can't open `%s`: %s", 
				params.src_fname, strerror(errno));
			exit(1);
		}
	}

	while (buf_size) {
		rc = read(fd, buffer, buf_size);
		if (rc == -1) {
			if ((errno == EINTR) || (errno == EAGAIN))
				continue;
			error("Can't read `%s`: %s",
				params.src_fname, strerror(errno));
			exit(1);
		} else if (rc == 0) {
			debug("end of file reached");
			break;
		}

		buffer   += rc;
		buf_size -= rc;
		buf_used += rc;
	}
	return buf_used;
}

/* issue the RPC to ship the file's data */
static void _send_rpc(file_bcast_msg_t *bcast_msg)
{
#if 1
	slurm_msg_t msg;
	int rc;

	msg.msg_type = REQUEST_FILE_BCAST;
	msg.address = alloc_resp->node_addr[0];
	msg.data = bcast_msg;

	if (slurm_send_recv_rc_msg_only_one(&msg, &rc, 0)) {
		error("slurm_send_recv_rc_msg_only_one: %m");
		exit(1);
	}
#else
// This code will handle message fanout to multiple slurmd, not implemented yet
	int i, rc;

	/* use static structures for persistent communcations data */
	static slurm_msg_t *msg = NULL; /* array of message structs, one per node */
	static int *rc_array;
	static int node_cnt;

	if (!msg) {
		node_cnt = alloc_resp->node_cnt;
		msg = xmalloc(sizeof(slurm_msg_t) * node_cnt);
		rc_array = xmalloc(sizeof(int) * node_cnt);
		for (i = 0; i < node_cnt; i++) {
			msg[i].msg_type = REQUEST_FILE_BCAST;
			msg[i].address = alloc_resp->node_addr[i];
		}
		slurm_free_resource_allocation_response_msg(alloc_resp);
	}
	for (i = 0; i < node_cnt; i++)
		msg[i].data = bcast_msg;

	verbose("sending block %u with %u bytes to %d nodes", 
		bcast_msg->block_no, bcast_msg->block_len, node_cnt);
	//_p_send_recv_rc_msg(alloc_resp->node_cnt, msg, rc_array, 10);

	rc = 0;
	for (i = 0; i < node_cnt; i++) {
		if (rc_array[i]) {
			rc = rc_array[i];
			break;
		}
	}
#endif

	if (rc) {
		error("REQUEST_FILE_BCAST: %s", slurm_strerror(rc));
		exit(1);
	}
}

/* read and broadcast the file */
static void _bcast_file(void)
{
	int buf_size;
	off_t size_read = 0;
	file_bcast_msg_t bcast_msg;
	char *buffer;

	/* NOTE: packmem() uses 16 bits to express a block size, 
	 * buf_size must be no larger than 64k - 1 */
	buf_size = MIN(SSIZE_MAX, (63 * 1024));
	buf_size = MIN(buf_size, f_stat.st_size);
	buffer = xmalloc(buf_size);

	bcast_msg.fname		= params.dst_fname;
	bcast_msg.block_no	= 0;
	bcast_msg.last_block	= 0;
	bcast_msg.force		= params.force;
	bcast_msg.modes		= f_stat.st_mode;
	bcast_msg.uid		= f_stat.st_uid;
	bcast_msg.gid		= f_stat.st_gid;
	bcast_msg.data		= buffer;
	if (params.preserve) {
		bcast_msg.atime     = f_stat.st_atime;
		bcast_msg.mtime     = f_stat.st_mtime;
	} else {
		bcast_msg.atime     = 0;
		bcast_msg.mtime     = 0;
	}

	while ((bcast_msg.block_len = _get_block(buffer, buf_size))) {
		bcast_msg.block_no++;
		size_read += bcast_msg.block_len;
		if (size_read >= f_stat.st_size)
			bcast_msg.last_block = 1;
		_send_rpc(&bcast_msg);
		if (bcast_msg.last_block)
			break;	/* end of file */
	}
}