Skip to content
Snippets Groups Projects
Commit 6387b10e authored by Moe Jette's avatar Moe Jette
Browse files

Increase sbcast functionality, add support for slurm logs, added source

file read. Still need to add data broadcast and write.
parent d0861d09
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,9 @@ allocation.
\fBSOURCE\fR is the name of a file on the current node.
\fBDEST\fR should be the fully qualified pathname for the
file copy to be created on each node.
This should be on a file system local to that node.
\fBDEST\fR should be on a file system local to that node.
Note that parallel file systems may provide better performance
than \fBsbcast\fR can provide.
.SH "OPTIONS"
.TP
......@@ -60,7 +62,7 @@ Using a batch script, transmit local file \fBmy.prog\fR to
> cat my.job
#!/bin/bash
sbcast my.prog /tmp/my.prog
srun my.prog
srun /tmp/my.prog
> srun --nodes=8 --batch my.job
srun: jobid 12345 submitted
......
......@@ -937,3 +937,12 @@ static void _slurm_free_partition_info_members(partition_info_t * part)
}
}
extern void slurm_free_file_bcast_msg(file_bcast_msg_t *msg)
{
if (msg) {
xfree(msg->fname);
xfree(msg->data);
xfree(msg);
}
}
......@@ -154,6 +154,7 @@ typedef enum {
REQUEST_TERMINATE_JOB,
MESSAGE_EPILOG_COMPLETE,
REQUEST_SPAWN_TASK,
REQUEST_FILE_BCAST,
SRUN_PING = 7001,
SRUN_TIMEOUT,
......@@ -541,6 +542,20 @@ typedef struct kvs_get_msg {
char * hostname; /* hostname to be sent the kvs data */
} kvs_get_msg_t;
typedef struct file_bcast_msg {
char *fname; /* name of the destination file */
uint16_t block_no; /* block number of this data */
uint16_t force; /* replace existing file if set */
uint16_t modes; /* access rights for destination file */
uint32_t uid; /* owner for destination file */
uint32_t gid; /* group for destination file */
time_t atime; /* last access time for destination file */
time_t mtime; /* last modification time for dest file */
uint32_t block_len; /* length of this data block */
char *data; /* data for this block */
} file_bcast_msg_t;
/*****************************************************************************\
* Slurm API Message Types
\*****************************************************************************/
......@@ -645,6 +660,7 @@ void slurm_free_job_step_info_response_msg(
void slurm_free_node_info_msg(node_info_msg_t * msg);
void slurm_free_partition_info_msg(partition_info_msg_t * msg);
void slurm_free_get_kvs_msg(kvs_get_msg_t *msg);
void inline slurm_free_file_bcast_msg(file_bcast_msg_t *msg);
extern char *job_reason_string(enum job_wait_reason inx);
extern char *job_state_string(enum job_states inx);
......
......@@ -40,6 +40,9 @@
#include <sys/types.h>
#include <sys/stat.h>
#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/sbcast/sbcast.h"
......@@ -56,29 +59,34 @@ static void _get_job_info(void);
int main(int argc, char *argv[])
{
log_options_t opts = LOG_OPTS_STDERR_ONLY;
log_init("sbcast", opts, SYSLOG_FACILITY_DAEMON, NULL);
parse_command_line(argc, argv);
if (params.verbose) {
opts.stderr_level += params.verbose;
log_alter(opts, SYSLOG_FACILITY_DAEMON, NULL);
}
/* validate the source file */
if ((fd = open(params.src_fname, O_RDONLY)) < 0) {
fprintf(stderr, "Can't open `%s`: %s\n", params.src_fname,
error("Can't open `%s`: %s", params.src_fname,
strerror(errno));
exit(1);
}
if (fstat(fd, &f_stat)) {
fprintf(stderr, "Can't stat `%s`: %s\n", params.src_fname,
error("Can't stat `%s`: %s", params.src_fname,
strerror(errno));
exit(1);
}
if (params.verbose) {
printf("modes = %o\n", (unsigned int) f_stat.st_mode);
printf("uid = %d\n", (int) f_stat.st_uid);
printf("gid = %d\n", (int) f_stat.st_gid);
printf("atime = %s", ctime(&f_stat.st_atime));
printf("mtime = %s", ctime(&f_stat.st_mtime));
printf("ctime = %s", ctime(&f_stat.st_ctime));
printf("size = %ld\n", (long) f_stat.st_size);
printf("-----------------------------\n");
}
verbose("modes = %o", (unsigned int) f_stat.st_mode);
verbose("uid = %d", (int) f_stat.st_uid);
verbose("gid = %d", (int) f_stat.st_gid);
verbose("atime = %s", ctime(&f_stat.st_atime));
verbose("mtime = %s", ctime(&f_stat.st_mtime));
verbose("ctime = %s", ctime(&f_stat.st_ctime));
verbose("size = %ld", (long) f_stat.st_size);
verbose("-----------------------------");
/* identify the nodes allocated to the job */
_get_job_info();
......@@ -97,36 +105,99 @@ static void _get_job_info(void)
jobid_str = getenv("SLURM_JOBID");
if (!jobid_str) {
fprintf(stderr, "Command only valid from within SLURM job\n");
error("Command only valid from within SLURM job");
exit(1);
}
jobid = (uint32_t) atol(jobid_str);
if (slurm_allocation_lookup(jobid, &alloc_resp) != SLURM_SUCCESS) {
fprintf(stderr, "SLURM jobid %u lookup error: %s\n",
error("SLURM jobid %u lookup error: %s",
jobid, slurm_strerror(slurm_get_errno()));
exit(1);
}
if (params.verbose) {
printf("node_list = %s\n", alloc_resp->node_list);
printf("node_cnt = %u\n", alloc_resp->node_cnt);
/* also see alloc_resp->node_addr (array) */
}
verbose("node_list = %s\n", alloc_resp->node_list);
verbose("node_cnt = %u\n", alloc_resp->node_cnt);
/* also see alloc_resp->node_addr (array) */
/* do not bother to release the return message,
* we need to preserve and use most of the information later */
}
/* broadcast the file */
/* load a buffer with data from the file to broadcast,
* return number of bytes read, zero on end of file */
static int _get_block(char *buffer, size_t buf_size)
{
static int fd = 0;
int buf_used = 0, rc;
if (!fd) {
fd = open(params.src_fname, O_RDONLY);
if (!fd) {
error("Can't open `%s`: %s\n",
params.src_fname, strerror(errno));
exit(1);
}
}
while (buf_size) {
rc = read(fd, buffer, buf_size);
if (rc == -1) {
if ((errno == EINTR) || (errno == EAGAIN))
continue;
error("Can't read `%s`: %s\n",
params.src_fname, strerror(errno));
exit(1);
} else if (rc == 0) {
debug("end of file reached");
break;
}
buffer += rc;
buf_size -= rc;
buf_used += rc;
}
return buf_used;
}
/* issue the RPC to ship the file's data */
static void _send_rpc(file_bcast_msg_t *bcast_msg)
{
//REQUEST_FILE_BCAST;
verbose("sending block %u with %u bytes", bcast_msg->block_no,
bcast_msg->block_len);
}
/* read and broadcast the file */
static void _bcast_file(void)
{
int buf_size, size_read;
file_bcast_msg_t bcast_msg;
char *buffer;
buf_size = MIN(SSIZE_MAX, (16 * 1024));
buf_size = MIN(SSIZE_MAX, (64 * 1024));
buf_size = MIN(buf_size, f_stat.st_size);
buffer = xmalloc(buf_size);
size_read = read(fd, buffer, buf_size);
/* get source for compress and re-use */
bcast_msg.fname = params.src_fname;
bcast_msg.block_no = 0;
bcast_msg.force = params.force;
bcast_msg.modes = f_stat.st_mode;
bcast_msg.uid = f_stat.st_uid;
bcast_msg.gid = f_stat.st_gid;
bcast_msg.data = buffer;
if (params.preserve) {
bcast_msg.atime = f_stat.st_atime;
bcast_msg.mtime = f_stat.st_mtime;
} else {
bcast_msg.atime = 0;
bcast_msg.mtime = 0;
}
while ((bcast_msg.block_len = _get_block(buffer, buf_size))) {
bcast_msg.block_no++;
_send_rpc(&bcast_msg);
if (bcast_msg.block_len < buf_size)
break; /* end of file */
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment