diff --git a/doc/man/man1/sbcast.1 b/doc/man/man1/sbcast.1 index e8022810afb92bcec3ad35c216a0f4bf6f1c0ff2..50f8da40e3bb16f0d08c571a75b01c0587fcf04a 100644 --- a/doc/man/man1/sbcast.1 +++ b/doc/man/man1/sbcast.1 @@ -15,7 +15,9 @@ allocation. \fBSOURCE\fR is the name of a file on the current node. \fBDEST\fR should be the fully qualified pathname for the file copy to be created on each node. -This should be on a file system local to that node. +\fBDEST\fR should be on a file system local to that node. +Note that parallel file systems may provide better performance +than \fBsbcast\fR can provide. .SH "OPTIONS" .TP @@ -60,7 +62,7 @@ Using a batch script, transmit local file \fBmy.prog\fR to > cat my.job #!/bin/bash sbcast my.prog /tmp/my.prog -srun my.prog +srun /tmp/my.prog > srun --nodes=8 --batch my.job srun: jobid 12345 submitted diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index de03dfd8770fff5bc07d9d1f5046117a46333e68..4654a6ee1d3a9a24a2240129b3a46ec0ba6a6efd 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -937,3 +937,12 @@ static void _slurm_free_partition_info_members(partition_info_t * part) } } +extern void slurm_free_file_bcast_msg(file_bcast_msg_t *msg) +{ + if (msg) { + xfree(msg->fname); + xfree(msg->data); + xfree(msg); + } +} + diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 14f246c81a178f59d791a85a8f16af0026f03fe6..213a8eb210a9c11b679a8ad1a2bdda2a2cafce3f 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -154,6 +154,7 @@ typedef enum { REQUEST_TERMINATE_JOB, MESSAGE_EPILOG_COMPLETE, REQUEST_SPAWN_TASK, + REQUEST_FILE_BCAST, SRUN_PING = 7001, SRUN_TIMEOUT, @@ -541,6 +542,20 @@ typedef struct kvs_get_msg { char * hostname; /* hostname to be sent the kvs data */ } kvs_get_msg_t; +typedef struct file_bcast_msg { + char *fname; /* name of the destination file */ + uint16_t block_no; /* block number of this data */ + uint16_t force; /* replace existing file if set */ + uint16_t modes; /* access rights for destination file */ + uint32_t uid; /* owner for destination file */ + uint32_t gid; /* group for destination file */ + time_t atime; /* last access time for destination file */ + time_t mtime; /* last modification time for dest file */ + uint32_t block_len; /* length of this data block */ + char *data; /* data for this block */ +} file_bcast_msg_t; + + /*****************************************************************************\ * Slurm API Message Types \*****************************************************************************/ @@ -645,6 +660,7 @@ void slurm_free_job_step_info_response_msg( void slurm_free_node_info_msg(node_info_msg_t * msg); void slurm_free_partition_info_msg(partition_info_msg_t * msg); void slurm_free_get_kvs_msg(kvs_get_msg_t *msg); +void inline slurm_free_file_bcast_msg(file_bcast_msg_t *msg); extern char *job_reason_string(enum job_wait_reason inx); extern char *job_state_string(enum job_states inx); diff --git a/src/sbcast/sbcast.c b/src/sbcast/sbcast.c index c2d7aaa11f5dd329da90db054a7e3605db3cccdd..f7a2d5669919c8c483f39c44b92aa68867d0ad51 100644 --- a/src/sbcast/sbcast.c +++ b/src/sbcast/sbcast.c @@ -40,6 +40,9 @@ #include <sys/types.h> #include <sys/stat.h> +#include "src/common/hostlist.h" +#include "src/common/log.h" +#include "src/common/slurm_protocol_defs.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" #include "src/sbcast/sbcast.h" @@ -56,29 +59,34 @@ static void _get_job_info(void); int main(int argc, char *argv[]) { + log_options_t opts = LOG_OPTS_STDERR_ONLY; + log_init("sbcast", opts, SYSLOG_FACILITY_DAEMON, NULL); + parse_command_line(argc, argv); + if (params.verbose) { + opts.stderr_level += params.verbose; + log_alter(opts, SYSLOG_FACILITY_DAEMON, NULL); + } /* validate the source file */ if ((fd = open(params.src_fname, O_RDONLY)) < 0) { - fprintf(stderr, "Can't open `%s`: %s\n", params.src_fname, + error("Can't open `%s`: %s", params.src_fname, strerror(errno)); exit(1); } if (fstat(fd, &f_stat)) { - fprintf(stderr, "Can't stat `%s`: %s\n", params.src_fname, + error("Can't stat `%s`: %s", params.src_fname, strerror(errno)); exit(1); } - if (params.verbose) { - printf("modes = %o\n", (unsigned int) f_stat.st_mode); - printf("uid = %d\n", (int) f_stat.st_uid); - printf("gid = %d\n", (int) f_stat.st_gid); - printf("atime = %s", ctime(&f_stat.st_atime)); - printf("mtime = %s", ctime(&f_stat.st_mtime)); - printf("ctime = %s", ctime(&f_stat.st_ctime)); - printf("size = %ld\n", (long) f_stat.st_size); - printf("-----------------------------\n"); - } + verbose("modes = %o", (unsigned int) f_stat.st_mode); + verbose("uid = %d", (int) f_stat.st_uid); + verbose("gid = %d", (int) f_stat.st_gid); + verbose("atime = %s", ctime(&f_stat.st_atime)); + verbose("mtime = %s", ctime(&f_stat.st_mtime)); + verbose("ctime = %s", ctime(&f_stat.st_ctime)); + verbose("size = %ld", (long) f_stat.st_size); + verbose("-----------------------------"); /* identify the nodes allocated to the job */ _get_job_info(); @@ -97,36 +105,99 @@ static void _get_job_info(void) jobid_str = getenv("SLURM_JOBID"); if (!jobid_str) { - fprintf(stderr, "Command only valid from within SLURM job\n"); + error("Command only valid from within SLURM job"); exit(1); } jobid = (uint32_t) atol(jobid_str); if (slurm_allocation_lookup(jobid, &alloc_resp) != SLURM_SUCCESS) { - fprintf(stderr, "SLURM jobid %u lookup error: %s\n", + error("SLURM jobid %u lookup error: %s", jobid, slurm_strerror(slurm_get_errno())); exit(1); } - if (params.verbose) { - printf("node_list = %s\n", alloc_resp->node_list); - printf("node_cnt = %u\n", alloc_resp->node_cnt); - /* also see alloc_resp->node_addr (array) */ - } + verbose("node_list = %s\n", alloc_resp->node_list); + verbose("node_cnt = %u\n", alloc_resp->node_cnt); + /* also see alloc_resp->node_addr (array) */ /* do not bother to release the return message, * we need to preserve and use most of the information later */ } -/* broadcast the file */ +/* load a buffer with data from the file to broadcast, + * return number of bytes read, zero on end of file */ +static int _get_block(char *buffer, size_t buf_size) +{ + static int fd = 0; + int buf_used = 0, rc; + + if (!fd) { + fd = open(params.src_fname, O_RDONLY); + if (!fd) { + error("Can't open `%s`: %s\n", + params.src_fname, strerror(errno)); + exit(1); + } + } + + while (buf_size) { + rc = read(fd, buffer, buf_size); + if (rc == -1) { + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + error("Can't read `%s`: %s\n", + params.src_fname, strerror(errno)); + exit(1); + } else if (rc == 0) { + debug("end of file reached"); + break; + } + + buffer += rc; + buf_size -= rc; + buf_used += rc; + } + return buf_used; +} + +/* issue the RPC to ship the file's data */ +static void _send_rpc(file_bcast_msg_t *bcast_msg) +{ + //REQUEST_FILE_BCAST; + verbose("sending block %u with %u bytes", bcast_msg->block_no, + bcast_msg->block_len); +} + +/* read and broadcast the file */ static void _bcast_file(void) { int buf_size, size_read; + file_bcast_msg_t bcast_msg; char *buffer; - buf_size = MIN(SSIZE_MAX, (16 * 1024)); + buf_size = MIN(SSIZE_MAX, (64 * 1024)); + buf_size = MIN(buf_size, f_stat.st_size); buffer = xmalloc(buf_size); - size_read = read(fd, buffer, buf_size); - /* get source for compress and re-use */ + bcast_msg.fname = params.src_fname; + bcast_msg.block_no = 0; + bcast_msg.force = params.force; + bcast_msg.modes = f_stat.st_mode; + bcast_msg.uid = f_stat.st_uid; + bcast_msg.gid = f_stat.st_gid; + bcast_msg.data = buffer; + if (params.preserve) { + bcast_msg.atime = f_stat.st_atime; + bcast_msg.mtime = f_stat.st_mtime; + } else { + bcast_msg.atime = 0; + bcast_msg.mtime = 0; + } + + while ((bcast_msg.block_len = _get_block(buffer, buf_size))) { + bcast_msg.block_no++; + _send_rpc(&bcast_msg); + if (bcast_msg.block_len < buf_size) + break; /* end of file */ + } }