diff --git a/NEWS b/NEWS index 8a38dea5ca4529e2ba3956e6bd294597b276c86d..db0d6157f81c9fb197beb3491a431b748a371bee 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,10 @@ This file describes changes in recent versions of SLURM. It primarily documents those changes that are of interest to users and admins. +* Changes in SLURM 1.1.0-pre2 +============================= + -- Added basic "sbcast" support, still needs message fanout logic. + * Changes in SLURM 1.1.0-pre1 ============================= -- New --enable-multiple-slurmd configure parameter to allow running diff --git a/doc/man/man1/sbcast.1 b/doc/man/man1/sbcast.1 index 50f8da40e3bb16f0d08c571a75b01c0587fcf04a..cf7810efc711e9b2d542d5598a1b8f5cf5fa513f 100644 --- a/doc/man/man1/sbcast.1 +++ b/doc/man/man1/sbcast.1 @@ -1,4 +1,4 @@ -.TH SBCAST "1" "January 2006" "sbcast 1.1" "Slurm components" +.TH SBCAST "1" "February 2006" "sbcast 1.1" "Slurm components" .SH "NAME" sbcast \- transmit a file to the nodes allocated to a SLURM job. @@ -17,7 +17,8 @@ allocation. file copy to be created on each node. \fBDEST\fR should be on a file system local to that node. Note that parallel file systems may provide better performance -than \fBsbcast\fR can provide. +than \fBsbcast\fR can provide, particularly for larger files +(over one megabyte). .SH "OPTIONS" .TP diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 408993bf77789e0daf2b6d1ad3d08b405a4a821f..6dc52fab2a66a9c6e57760b3326fd77795d80bbc 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -545,6 +545,7 @@ typedef struct kvs_get_msg { typedef struct file_bcast_msg { char *fname; /* name of the destination file */ uint16_t block_no; /* block number of this data */ + uint16_t last_block; /* last block of bcast if set */ uint16_t force; /* replace existing file if set */ uint16_t modes; /* access rights for destination file */ uint32_t uid; /* owner for destination file */ diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index c0fe832cbe4639feed5dd167292a3af87f074c16..e79703d9e0ffcbe197698d1c0da4e2da8b304d27 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -279,6 +279,10 @@ static int _unpack_kvs_data(struct kvs_comm_set **msg_ptr, Buf buffer); static void _pack_kvs_get(kvs_get_msg_t *msg_ptr, Buf buffer); static int _unpack_kvs_get(kvs_get_msg_t **msg_ptr, Buf buffer); +static void _pack_file_bcast(file_bcast_msg_t * msg , Buf buffer ); + +static int _unpack_file_bcast(file_bcast_msg_t ** msg_ptr , Buf buffer ); + /* pack_header * packs a slurm protocol header that proceeds every slurm message * IN header - the header structure to pack @@ -618,6 +622,9 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) case MESSAGE_JOBACCT_DATA: _pack_jobacct_data((jobacct_msg_t *) msg->data, buffer); break; + case REQUEST_FILE_BCAST: + _pack_file_bcast((file_bcast_msg_t *) msg->data, buffer); + break; case PMI_KVS_PUT_REQ: case PMI_KVS_GET_RESP: _pack_kvs_data((struct kvs_comm_set *) msg->data, buffer); @@ -906,6 +913,10 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) rc = _unpack_jobacct_data( (jobacct_msg_t **) & msg->data, buffer); break; + case REQUEST_FILE_BCAST: + rc = _unpack_file_bcast( (file_bcast_msg_t **) + & msg->data, buffer); + break; case PMI_KVS_PUT_REQ: case PMI_KVS_GET_RESP: rc = _unpack_kvs_data((struct kvs_comm_set **) &msg->data, @@ -3536,6 +3547,62 @@ static int _unpack_jobacct_data(jobacct_msg_t ** msg_ptr , Buf buffer ) return SLURM_ERROR; } +static void _pack_file_bcast(file_bcast_msg_t * msg , Buf buffer ) +{ + xassert ( msg != NULL ); + + pack16 ( msg->block_no, buffer ); + pack16 ( msg->last_block, buffer ); + pack16 ( msg->force, buffer ); + pack16 ( msg->modes, buffer ); + + pack32 ( msg->uid, buffer ); + pack32 ( msg->gid, buffer ); + pack32 ( msg->block_len, buffer ); + + pack_time ( msg->atime, buffer ); + pack_time ( msg->mtime, buffer ); + + packstr( msg->fname, buffer ); + packmem ( msg->data, msg->block_len, buffer ) ; +} + +static int _unpack_file_bcast(file_bcast_msg_t ** msg_ptr , Buf buffer ) +{ + uint16_t uint16_tmp; + file_bcast_msg_t *msg ; + + xassert ( msg_ptr != NULL ); + + msg = xmalloc ( sizeof (file_bcast_msg_t) ) ; + *msg_ptr = msg; + + safe_unpack16 ( & msg->block_no, buffer ); + safe_unpack16 ( & msg->last_block, buffer ); + safe_unpack16 ( & msg->force, buffer ); + safe_unpack16 ( & msg->modes, buffer ); + + safe_unpack32 ( & msg->uid, buffer ); + safe_unpack32 ( & msg->gid, buffer ); + safe_unpack32 ( & msg->block_len, buffer ); + + safe_unpack_time ( & msg->atime, buffer ); + safe_unpack_time ( & msg->mtime, buffer ); + + safe_unpackstr_xmalloc ( & msg->fname, &uint16_tmp, buffer ); + safe_unpackmem_xmalloc ( & msg->data, &uint16_tmp , buffer ) ; + if ( uint16_tmp != msg->block_len ) + goto unpack_error; + return SLURM_SUCCESS; + + unpack_error: + xfree( msg -> data ); + xfree( msg -> fname ); + xfree( msg ); + *msg_ptr = NULL; + return SLURM_ERROR; +} + static void _pack_kvs_rec(struct kvs_comm *msg_ptr, Buf buffer) { int i; diff --git a/src/sbcast/sbcast.c b/src/sbcast/sbcast.c index f7a2d5669919c8c483f39c44b92aa68867d0ad51..d1197d3127076a31d01cb0b2fcc824b33453408d 100644 --- a/src/sbcast/sbcast.c +++ b/src/sbcast/sbcast.c @@ -42,6 +42,7 @@ #include "src/common/hostlist.h" #include "src/common/log.h" +#include "src/common/slurm_protocol_api.h" #include "src/common/slurm_protocol_defs.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" @@ -116,8 +117,8 @@ static void _get_job_info(void) exit(1); } - verbose("node_list = %s\n", alloc_resp->node_list); - verbose("node_cnt = %u\n", alloc_resp->node_cnt); + verbose("node_list = %s", alloc_resp->node_list); + verbose("node_cnt = %u", alloc_resp->node_cnt); /* also see alloc_resp->node_addr (array) */ /* do not bother to release the return message, @@ -134,7 +135,7 @@ static int _get_block(char *buffer, size_t buf_size) if (!fd) { fd = open(params.src_fname, O_RDONLY); if (!fd) { - error("Can't open `%s`: %s\n", + error("Can't open `%s`: %s", params.src_fname, strerror(errno)); exit(1); } @@ -145,7 +146,7 @@ static int _get_block(char *buffer, size_t buf_size) if (rc == -1) { if ((errno == EINTR) || (errno == EAGAIN)) continue; - error("Can't read `%s`: %s\n", + error("Can't read `%s`: %s", params.src_fname, strerror(errno)); exit(1); } else if (rc == 0) { @@ -163,29 +164,81 @@ static int _get_block(char *buffer, size_t buf_size) /* issue the RPC to ship the file's data */ static void _send_rpc(file_bcast_msg_t *bcast_msg) { - //REQUEST_FILE_BCAST; - verbose("sending block %u with %u bytes", bcast_msg->block_no, - bcast_msg->block_len); +#if 1 + slurm_msg_t msg; + int rc; + + msg.msg_type = REQUEST_FILE_BCAST; + msg.address = alloc_resp->node_addr[0]; + msg.data = bcast_msg; + + if (slurm_send_recv_rc_msg_only_one(&msg, &rc, 0)) { + error("slurm_send_recv_rc_msg_only_one: %m"); + exit(1); + } +#else +// This code will handle message fanout to multiple slurmd, not implemented yet + int i, rc; + + /* use static structures for persistent communcations data */ + static slurm_msg_t *msg = NULL; /* array of message structs, one per node */ + static int *rc_array; + static int node_cnt; + + if (!msg) { + node_cnt = alloc_resp->node_cnt; + msg = xmalloc(sizeof(slurm_msg_t) * node_cnt); + rc_array = xmalloc(sizeof(int) * node_cnt); + for (i = 0; i < node_cnt; i++) { + msg[i].msg_type = REQUEST_FILE_BCAST; + msg[i].address = alloc_resp->node_addr[i]; + } + slurm_free_resource_allocation_response_msg(alloc_resp); + } + for (i = 0; i < node_cnt; i++) + msg[i].data = bcast_msg; + + verbose("sending block %u with %u bytes to %d nodes", + bcast_msg->block_no, bcast_msg->block_len, node_cnt); + //_p_send_recv_rc_msg(alloc_resp->node_cnt, msg, rc_array, 10); + + rc = 0; + for (i = 0; i < node_cnt; i++) { + if (rc_array[i]) { + rc = rc_array[i]; + break; + } + } +#endif + + if (rc) { + error("REQUEST_FILE_BCAST: %s", slurm_strerror(rc)); + exit(1); + } } /* read and broadcast the file */ static void _bcast_file(void) { - int buf_size, size_read; + int buf_size; + off_t size_read = 0; file_bcast_msg_t bcast_msg; char *buffer; - buf_size = MIN(SSIZE_MAX, (64 * 1024)); + /* NOTE: packmem() uses 16 bits to express a block size, + * buf_size must be no larger than 64k - 1 */ + buf_size = MIN(SSIZE_MAX, (63 * 1024)); buf_size = MIN(buf_size, f_stat.st_size); buffer = xmalloc(buf_size); - bcast_msg.fname = params.src_fname; - bcast_msg.block_no = 0; - bcast_msg.force = params.force; - bcast_msg.modes = f_stat.st_mode; - bcast_msg.uid = f_stat.st_uid; - bcast_msg.gid = f_stat.st_gid; - bcast_msg.data = buffer; + bcast_msg.fname = params.dst_fname; + bcast_msg.block_no = 0; + bcast_msg.last_block = 0; + bcast_msg.force = params.force; + bcast_msg.modes = f_stat.st_mode; + bcast_msg.uid = f_stat.st_uid; + bcast_msg.gid = f_stat.st_gid; + bcast_msg.data = buffer; if (params.preserve) { bcast_msg.atime = f_stat.st_atime; bcast_msg.mtime = f_stat.st_mtime; @@ -196,8 +249,11 @@ static void _bcast_file(void) while ((bcast_msg.block_len = _get_block(buffer, buf_size))) { bcast_msg.block_no++; + size_read += bcast_msg.block_len; + if (size_read >= f_stat.st_size) + bcast_msg.last_block = 1; _send_rpc(&bcast_msg); - if (bcast_msg.block_len < buf_size) + if (bcast_msg.last_block) break; /* end of file */ } } diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 9b9d9344c528993281e98ca978b0174e6b80f858..973ab0245290d466a098265cd0b129369f97ce94 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -41,6 +41,7 @@ #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> +#include <utime.h> #include "src/common/hostlist.h" #include "src/common/log.h" @@ -91,6 +92,7 @@ static void _rpc_update_time(slurm_msg_t *, slurm_addr *); static void _rpc_shutdown(slurm_msg_t *msg, slurm_addr *cli_addr); static void _rpc_reconfig(slurm_msg_t *msg, slurm_addr *cli_addr); static void _rpc_pid2jid(slurm_msg_t *msg, slurm_addr *); +static int _rpc_file_bcast(slurm_msg_t *msg, slurm_addr *); static int _rpc_ping(slurm_msg_t *, slurm_addr *); static int _run_prolog(uint32_t jobid, uid_t uid, char *bg_part_id); static int _run_epilog(uint32_t jobid, uid_t uid, char *bg_part_id); @@ -213,6 +215,11 @@ slurmd_req(slurm_msg_t *msg, slurm_addr *cli) slurm_free_jobacct_msg(msg->data); } break; + case REQUEST_FILE_BCAST: + rc = _rpc_file_bcast(msg, cli); + slurm_send_rc_msg(msg, rc); + slurm_free_file_bcast_msg(msg->data); + break; default: error("slurmd_req: invalid request msg type %d\n", msg->msg_type); @@ -1083,6 +1090,99 @@ static void _rpc_pid2jid(slurm_msg_t *msg, slurm_addr *cli) } } +static int +_rpc_file_bcast(slurm_msg_t *msg, slurm_addr *cli) +{ + file_bcast_msg_t *req = msg->data; + int fd, flags, offset, inx, rc; + uid_t req_uid = g_slurm_auth_get_uid(msg->cred); + uid_t req_gid = g_slurm_auth_get_gid(msg->cred); + pid_t child; + +#if 0 + info("last_block=%u force=%u modes=%o", + req->last_block, req->force, req->modes); + info("uid=%u gid=%u atime=%lu mtime=%lu block_len=%u", + req->uid, req->gid, req->atime, req->mtime, req->block_len); + /* when the file being transferred is binary, the following line + * can break the terminal output for slurmd */ + /* info("req->data=%s, @ %lu", req->data, (unsigned long) &req->data); */ +#endif + + info("sbcast req_uid=%u fname=%s block_no=%u", + req_uid, req->fname, req->block_no); + child = fork(); + if (child == -1) { + error("sbcast: fork failure"); + return errno; + } else if (child > 0) { + waitpid(child, &rc, 0); + return WEXITSTATUS(rc); + } + + /* The child actually performs the I/O and exits with + * a return code, do not return! */ + if (setgid(req_gid) < 0) { + error("sbcast: uid:%u setgid(%u): %s", req_uid, req_gid, + strerror(errno)); + exit(errno); + } + if (setuid(req_uid) < 0) { + error("sbcast: getuid(%u): %s", req_uid, strerror(errno)); + exit(errno); + } + + flags = O_WRONLY; + if (req->block_no == 1) { + flags |= O_CREAT; + if (req->force) + flags |= O_TRUNC; + else + flags |= O_EXCL; + } else + flags |= O_APPEND; + + fd = open(req->fname, flags, 0700); + if (fd == -1) { + error("sbcast: uid:%u can't open `%s`: %s", + req_uid, req->fname, strerror(errno)); + exit(errno); + } + + offset = 0; + while (req->block_len - offset) { + inx = write(fd, &req->data[offset], (req->block_len - offset)); + if (inx == -1) { + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + error("sbcast: uid:%u can't write `%s`: %s", + req_uid, req->fname, strerror(errno)); + close(fd); + exit(errno); + } + offset += inx; + } + if (req->last_block && fchmod(fd, (req->modes & 0777))) { + error("sbcast: uid:%u can't chmod `%s`: %s", + req_uid, req->fname, strerror(errno)); + } + if (req->last_block && fchown(fd, req->uid, req->gid)) { + error("sbcast: uid:%u can't chown `%s`: %s", + req_uid, req->fname, strerror(errno)); + } + close(fd); + fd = 0; + if (req->last_block && req->atime) { + struct utimbuf time_buf; + time_buf.actime = req->atime; + time_buf.modtime = req->mtime; + if (utime(req->fname, &time_buf)) { + error("sbcast: uid:%u can't utime `%s`: %s", + req_uid, req->fname, strerror(errno)); + } + } + exit(SLURM_SUCCESS); +} static void _rpc_reattach_tasks(slurm_msg_t *msg, slurm_addr *cli)