diff --git a/NEWS b/NEWS index 5c7489409efcaa68c9f107db60a47576b4b840e7..de9a580f350bbd4058c0b46a003c0af3dde4deb9 100644 --- a/NEWS +++ b/NEWS @@ -10,6 +10,9 @@ documents those changes that are of interest to users and administrators. -- burst_buffer/cray: If teardown operations fails, sleep and retry. -- Clean up the external pids when using the PrologFlags=Contain feature and the job finishes. + -- burst_buffer/cray: Support file staging when job lacks job-specific buffer + (i.e. only persistent burst buffers). + -- Added srun option of --bcast to copy executable file to compute nodes. * Changes in Slurm 15.08.4 ========================== diff --git a/doc/man/man1/srun.1 b/doc/man/man1/srun.1 index 2e4921e4dbddfd647396b362b3b817785c6867cf..6024880062cbef5fb595cacdc76924880f0878c0 100644 --- a/doc/man/man1/srun.1 +++ b/doc/man/man1/srun.1 @@ -1,4 +1,4 @@ -.TH srun "1" "Slurm Commands" "April 2015" "Slurm Commands" +.TH srun "1" "Slurm Commands" "November 2015" "Slurm Commands" .SH "NAME" srun \- Run parallel jobs @@ -139,6 +139,16 @@ Path of file containing burst buffer specification. The form of the specification is system dependent. Also see \fB\-\-bb\fR. +.TP +\fB\-\-bcast\fR[=<\fIdest_path\fR>] +Copy executable file to allocated compute nodes. +If a file name is specified, copy the executable to the specified destination +file path. If no path is specified, copy the file to a file named +"slurm_bcast_<job_id>.<step_id>" in the current working. +For example, "srun \-\-bcast=/tmp/mine -N3 a.out" will copy the file "a.out" +from your current directory to the file "/tmp/mine" on each of the three +allocated compute nodes and execute that file. + .TP \fB\-\-begin\fR=<\fItime\fR> Defer initiation of this job until the specified time. @@ -2126,6 +2136,9 @@ Same as \fB\-A, \-\-account\fR \fBSLURM_ACCTG_FREQ\fR Same as \fB\-\-acctg\-freq\fR .TP +\fBSLURM_BCAST\fR +Same as \fB\-\-bcast\fR +.TP \fBSLURM_BLRTS_IMAGE\fR Same as \fB\-\-blrts\-image\fR .TP diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 9b78fe5994406253a4d9d9526f53b035649af1b0..fc8d3c3296766662909adb6fc071307e865f812d 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -33,6 +33,7 @@ noinst_LTLIBRARIES = \ libcommon_la_SOURCES = \ cpu_frequency.c cpu_frequency.h \ assoc_mgr.c assoc_mgr.h \ + file_bcast.c file_bcast.h \ xmalloc.c xmalloc.h \ xassert.c xassert.h \ xstring.c xstring.h \ diff --git a/src/common/Makefile.in b/src/common/Makefile.in index 092d1d6fac508850f7fffc06cc9c185a6aa2a0cf..e266196d7560229e04f343da5bae54d5d92ea66b 100644 --- a/src/common/Makefile.in +++ b/src/common/Makefile.in @@ -149,15 +149,15 @@ LTLIBRARIES = $(noinst_LTLIBRARIES) am__DEPENDENCIES_1 = libcommon_la_DEPENDENCIES = $(am__DEPENDENCIES_1) am__libcommon_la_SOURCES_DIST = cpu_frequency.c cpu_frequency.h \ - assoc_mgr.c assoc_mgr.h xmalloc.c xmalloc.h xassert.c \ - xassert.h xstring.c xstring.h xsignal.c xsignal.h strnatcmp.c \ - strnatcmp.h forward.c forward.h msg_aggr.c msg_aggr.h \ - strlcpy.c strlcpy.h list.c list.h xtree.c xtree.h xhash.c \ - xhash.h net.c net.h log.c log.h cbuf.c cbuf.h safeopen.c \ - safeopen.h bitstring.c bitstring.h mpi.c slurm_mpi.h pack.c \ - pack.h parse_config.c parse_config.h parse_value.c \ - parse_value.h parse_spec.c parse_spec.h plugin.c plugin.h \ - plugrack.c plugrack.h power.c power.h print_fields.c \ + assoc_mgr.c assoc_mgr.h file_bcast.c file_bcast.h xmalloc.c \ + xmalloc.h xassert.c xassert.h xstring.c xstring.h xsignal.c \ + xsignal.h strnatcmp.c strnatcmp.h forward.c forward.h \ + msg_aggr.c msg_aggr.h strlcpy.c strlcpy.h list.c list.h \ + xtree.c xtree.h xhash.c xhash.h net.c net.h log.c log.h cbuf.c \ + cbuf.h safeopen.c safeopen.h bitstring.c bitstring.h mpi.c \ + slurm_mpi.h pack.c pack.h parse_config.c parse_config.h \ + parse_value.c parse_value.h parse_spec.c parse_spec.h plugin.c \ + plugin.h plugrack.c plugrack.h power.c power.h print_fields.c \ print_fields.h read_config.c read_config.h node_select.c \ node_select.h env.c env.h fd.c fd.h slurm_cred.h slurm_cred.c \ slurm_errno.c slurm_ext_sensors.c slurm_ext_sensors.h \ @@ -196,10 +196,10 @@ am__libcommon_la_SOURCES_DIST = cpu_frequency.c cpu_frequency.h \ xcgroup_read_config.c xcgroup_read_config.h callerid.c \ callerid.h @HAVE_UNSETENV_FALSE@am__objects_1 = unsetenv.lo -am_libcommon_la_OBJECTS = cpu_frequency.lo assoc_mgr.lo xmalloc.lo \ - xassert.lo xstring.lo xsignal.lo strnatcmp.lo forward.lo \ - msg_aggr.lo strlcpy.lo list.lo xtree.lo xhash.lo net.lo log.lo \ - cbuf.lo safeopen.lo bitstring.lo mpi.lo pack.lo \ +am_libcommon_la_OBJECTS = cpu_frequency.lo assoc_mgr.lo file_bcast.lo \ + xmalloc.lo xassert.lo xstring.lo xsignal.lo strnatcmp.lo \ + forward.lo msg_aggr.lo strlcpy.lo list.lo xtree.lo xhash.lo \ + net.lo log.lo cbuf.lo safeopen.lo bitstring.lo mpi.lo pack.lo \ parse_config.lo parse_value.lo parse_spec.lo plugin.lo \ plugrack.lo power.lo print_fields.lo read_config.lo \ node_select.lo env.lo fd.lo slurm_cred.lo slurm_errno.lo \ @@ -581,6 +581,7 @@ noinst_LTLIBRARIES = \ libcommon_la_SOURCES = \ cpu_frequency.c cpu_frequency.h \ assoc_mgr.c assoc_mgr.h \ + file_bcast.c file_bcast.h \ xmalloc.c xmalloc.h \ xassert.c xassert.h \ xstring.c xstring.h \ @@ -798,6 +799,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/entity.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/env.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/fd.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/file_bcast.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/forward.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/getopt.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/getopt1.Plo@am__quote@ diff --git a/src/sbcast/agent.c b/src/common/file_bcast.c similarity index 52% rename from src/sbcast/agent.c rename to src/common/file_bcast.c index 07d48287acfbe634230dd32cae7229b4a2a431d9..b4dcff0a117b2ea18297291f06d70cb6ae321ec3 100644 --- a/src/sbcast/agent.c +++ b/src/common/file_bcast.c @@ -1,8 +1,9 @@ /*****************************************************************************\ - * agent.c - File transfer agent (handles message traffic) + * file_bcast.c - File transfer agent (handles message traffic) ***************************************************************************** - * Copyright (C) 2006-2007 The Regents of the University of California. + * Copyright (C) 2015 SchedMD LLC. * Copyright (C) 2008-2009 Lawrence Livermore National Security. + * Copyright (C) 2006-2007 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Morris Jette <jette1@llnl.gov> * CODE-OCEC-09-009. All rights reserved. @@ -46,9 +47,13 @@ #include <pthread.h> #include <stdio.h> #include <stdlib.h> +#include <sys/stat.h> +#include <sys/types.h> #include <unistd.h> #include "slurm/slurm_errno.h" +#include "src/common/file_bcast.h" +#include "src/common/forward.h" #include "src/common/hostlist.h" #include "src/common/log.h" #include "src/common/macros.h" @@ -56,10 +61,10 @@ #include "src/common/slurm_protocol_api.h" #include "src/common/slurm_protocol_defs.h" #include "src/common/slurm_protocol_interface.h" +#include "src/common/slurm_time.h" +#include "src/common/uid.h" #include "src/common/xmalloc.h" #include "src/common/xstring.h" -#include "src/common/forward.h" -#include "src/sbcast/sbcast.h" #define MAX_RETRIES 10 #define MAX_THREADS 8 /* These can be huge messages, so @@ -74,8 +79,76 @@ typedef struct thd { static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t agent_cnt_cond = PTHREAD_COND_INITIALIZER; static int agent_cnt = 0; +static int msg_timeout = 0; + +static int fd; /* source file descriptor */ +static struct stat f_stat; /* source file stats */ +job_sbcast_cred_msg_t *sbcast_cred; /* job alloc info and sbcast cred */ static void *_agent_thread(void *args); +static int _bcast_file(struct bcast_parameters *params); +static int _file_bcast(struct bcast_parameters *params, + file_bcast_msg_t *bcast_msg, + job_sbcast_cred_msg_t *sbcast_cred); +static int _file_state(struct bcast_parameters *params); +static ssize_t _get_block(struct bcast_parameters *params, + char *buffer, size_t buf_size); +static int _get_job_info(struct bcast_parameters *params); + + +static int _file_state(struct bcast_parameters *params) +{ + /* validate the source file */ + if ((fd = open(params->src_fname, O_RDONLY)) < 0) { + error("Can't open `%s`: %s", params->src_fname, + strerror(errno)); + return SLURM_ERROR; + } + if (fstat(fd, &f_stat)) { + error("Can't stat `%s`: %s", params->src_fname, + strerror(errno)); + return SLURM_ERROR; + } + verbose("modes = %o", (unsigned int) f_stat.st_mode); + verbose("uid = %d", (int) f_stat.st_uid); + verbose("gid = %d", (int) f_stat.st_gid); + verbose("atime = %s", slurm_ctime2(&f_stat.st_atime)); + verbose("mtime = %s", slurm_ctime2(&f_stat.st_mtime)); + verbose("ctime = %s", slurm_ctime2(&f_stat.st_ctime)); + verbose("size = %ld", (long) f_stat.st_size); + + return SLURM_SUCCESS; +} + +/* get details about this slurm job: jobid and allocated node */ +static int _get_job_info(struct bcast_parameters *params) +{ + int rc; + + xassert(params->job_id != NO_VAL); + + rc = slurm_sbcast_lookup(params->job_id, params->step_id, &sbcast_cred); + if (rc != SLURM_SUCCESS) { + error("Slurm step ID %u.%u lookup error: %s", + params->job_id, params->step_id, + slurm_strerror(slurm_get_errno())); + return rc; + } + + verbose("jobid = %u.%u", params->job_id, params->step_id); + verbose("node_cnt = %u", sbcast_cred->node_cnt); + verbose("node_list = %s", sbcast_cred->node_list); + /* also see sbcast_cred->node_addr (array) */ + + if (params->verbose) + print_sbcast_cred(sbcast_cred->sbcast_cred); + + /* do not bother to release the return message, + * we need to preserve and use most of the information later */ + + return rc; +} + static void *_agent_thread(void *args) { @@ -87,7 +160,7 @@ static void *_agent_thread(void *args) ret_list = slurm_send_recv_msgs(thread_ptr->nodelist, &thread_ptr->msg, - params.timeout, false); + msg_timeout, false); if (ret_list == NULL) { error("slurm_send_recv_msgs: %m"); exit(1); @@ -117,8 +190,9 @@ static void *_agent_thread(void *args) } /* Issue the RPC to transfer the file's data */ -extern void send_rpc(file_bcast_msg_t *bcast_msg, - job_sbcast_cred_msg_t *sbcast_cred) +static int _file_bcast(struct bcast_parameters *params, + file_bcast_msg_t *bcast_msg, + job_sbcast_cred_msg_t *sbcast_cred) { /* Preserve some data structures across calls for better performance */ static int threads_used = 0; @@ -134,10 +208,11 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg, int *span = NULL; char *name = NULL; - if (params.fanout) - fanout = MIN(MAX_THREADS, params.fanout); + if (params->fanout) + fanout = MIN(MAX_THREADS, params->fanout); else fanout = MAX_THREADS; + msg_timeout = params->timeout; span = set_span(sbcast_cred->node_cnt, fanout); @@ -154,7 +229,7 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg, new_hl = hostlist_create(name); free(name); i++; - for(j = 0; j < span[threads_used]; j++) { + for (j = 0; j < span[threads_used]; j++) { name = hostlist_shift(hl); if (!name) break; @@ -176,13 +251,13 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg, } slurm_attr_init(&attr); - if (pthread_attr_setstacksize(&attr, 3 * 1024*1024)) + if (pthread_attr_setstacksize(&attr, 3 * 1024 * 1024)) error("pthread_attr_setstacksize: %m"); if (pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED)) error("pthread_attr_setdetachstate error %m"); - for (i=0; i<threads_used; i++) { + for (i = 0; i < threads_used; i++) { thread_info[i].msg.data = bcast_msg; slurm_mutex_lock(&agent_cnt_mutex); agent_cnt++; @@ -205,9 +280,121 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg, slurm_mutex_unlock(&agent_cnt_mutex); pthread_attr_destroy(&attr); - for (i=0; i<threads_used; i++) + for (i = 0; i < threads_used; i++) rc = MAX(rc, thread_info[i].rc); - if (rc) - exit(1); + return rc; +} + +/* load a buffer with data from the file to broadcast, + * return number of bytes read, zero on end of file */ +static ssize_t _get_block(struct bcast_parameters *params, + char *buffer, size_t buf_size) +{ + static int fd = 0; + ssize_t buf_used = 0, rc; + + if (!fd) { + fd = open(params->src_fname, O_RDONLY); + if (!fd) { + error("Can't open `%s`: %s", + params->src_fname, strerror(errno)); + return SLURM_ERROR; + } + } + + while (buf_size) { + rc = read(fd, buffer, buf_size); + if (rc == -1) { + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + error("Can't read `%s`: %s", + params->src_fname, strerror(errno)); + return SLURM_ERROR; + } else if (rc == 0) { + debug("end of file reached"); + break; + } + + buffer += rc; + buf_size -= rc; + buf_used += rc; + } + return buf_used; +} + +/* read and broadcast the file */ +static int _bcast_file(struct bcast_parameters *params) +{ + int buf_size, rc = SLURM_SUCCESS; + ssize_t size_read = 0; + file_bcast_msg_t bcast_msg; + char *buffer; + int32_t block_len; + + if (params->block_size) + buf_size = MIN(params->block_size, f_stat.st_size); + else + buf_size = MIN((512 * 1024), f_stat.st_size); + + bcast_msg.fname = params->dst_fname; + bcast_msg.block_no = 1; + bcast_msg.last_block = 0; + bcast_msg.force = params->force; + bcast_msg.modes = f_stat.st_mode; + bcast_msg.uid = f_stat.st_uid; + bcast_msg.user_name = uid_to_string(f_stat.st_uid); + bcast_msg.gid = f_stat.st_gid; + buffer = xmalloc(buf_size); + bcast_msg.block = buffer; + bcast_msg.block_len = 0; + bcast_msg.cred = sbcast_cred->sbcast_cred; + + if (params->preserve) { + bcast_msg.atime = f_stat.st_atime; + bcast_msg.mtime = f_stat.st_mtime; + } else { + bcast_msg.atime = 0; + bcast_msg.mtime = 0; + } + + while (1) { + block_len = _get_block(params, buffer, buf_size); + if (block_len < 0) + rc = SLURM_ERROR; + if (block_len <= 0) + break; + bcast_msg.block_len = block_len; + debug("block %d, size %u", bcast_msg.block_no, + bcast_msg.block_len); + size_read += bcast_msg.block_len; + if (size_read >= f_stat.st_size) + bcast_msg.last_block = 1; + + rc = _file_bcast(params, &bcast_msg, sbcast_cred); + if (rc != SLURM_SUCCESS) + break; + if (bcast_msg.last_block) + break; /* end of file */ + bcast_msg.block_no++; + } + xfree(bcast_msg.user_name); + xfree(buffer); + + return rc; +} + +extern int bcast_file(struct bcast_parameters *params) +{ + int rc; + + if ((rc = _file_state(params)) != SLURM_SUCCESS) + return rc; + if ((rc = _get_job_info(params)) != SLURM_SUCCESS) + return rc; + if ((rc = _bcast_file(params)) != SLURM_SUCCESS) + return rc; + +/* slurm_free_sbcast_cred_msg(sbcast_cred); */ + return rc; } diff --git a/src/common/file_bcast.h b/src/common/file_bcast.h new file mode 100644 index 0000000000000000000000000000000000000000..2c0bd5045cfc7bb0efb20921bd94e3cc01905109 --- /dev/null +++ b/src/common/file_bcast.h @@ -0,0 +1,63 @@ +/****************************************************************************\ + * file_bcast.h - definitions used for file broadcast functions + ***************************************************************************** + * Copyright (C) 2015 SchedMD LLC. + * + * This file is part of SLURM, a resource management program. + * For details, see <http://slurm.schedmd.com/>. + * Please also read the included file: DISCLAIMER. + * + * SLURM is free software; you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * In addition, as a special exception, the copyright holders give permission + * to link the code of portions of this program with the OpenSSL library under + * certain conditions as described in each individual source file, and + * distribute linked combinations including the two. You must obey the GNU + * General Public License in all respects for all of the code used other than + * OpenSSL. If you modify file(s) with this exception, you may extend this + * exception to your version of the file(s), but you are not obligated to do + * so. If you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files in + * the program, then also delete it here. + * + * SLURM is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along + * with SLURM; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +\****************************************************************************/ + +#ifndef _FILE_BCAST_H +#define _FILE_BCAST_H + +#if HAVE_CONFIG_H +# include "config.h" +#endif + +#include "slurm/slurm.h" +#include "src/common/macros.h" +#include "src/common/slurm_protocol_defs.h" + +struct bcast_parameters { + uint32_t block_size; + bool compress; + int fanout; + bool force; + uint32_t job_id; + uint32_t step_id; + bool preserve; + int timeout; + int verbose; + char *src_fname; + char *dst_fname; +}; + +extern int bcast_file(struct bcast_parameters *params); + +#endif diff --git a/src/common/gres.c b/src/common/gres.c index 6d04fef8683ca9ea06dd590e88611a52dc2b089a..0f5d9158911066673936c78688b99e7dfa46d6c0 100644 --- a/src/common/gres.c +++ b/src/common/gres.c @@ -814,11 +814,12 @@ static int _parse_gres_config(void **dest, slurm_parser_enum_t type, else if ((last[0] == 'm') || (last[0] == 'M')) tmp_uint64 *= (1024 * 1024); else if ((last[0] == 'g') || (last[0] == 'G')) - tmp_uint64 *= (1024 * 1024 * 1024); + tmp_uint64 *= ((uint64_t)1024 * 1024 * 1024); else if ((last[0] == 't') || (last[0] == 'T')) - tmp_uint64 *= (uint64_t)pow(1024, 4); + tmp_uint64 *= ((uint64_t)1024 * 1024 * 1024 * 1024); else if ((last[0] == 'p') || (last[0] == 'P')) - tmp_uint64 *= (uint64_t)pow(1024, 5); + tmp_uint64 *= ((uint64_t)1024 * 1024 * 1024 * 1024 * + 1024); else if (last[0] != '\0') { fatal("Invalid gres data for %s, Count=%s", p->name, tmp_str); @@ -1505,11 +1506,13 @@ static void _get_gres_cnt(gres_node_state_t *gres_data, char *orig_config, else if ((last_num[0] == 'm') || (last_num[0] == 'M')) tmp_gres_cnt *= (1024 * 1024); else if ((last_num[0] == 'g') || (last_num[0] == 'G')) - tmp_gres_cnt *= (1024 * 1024 * 1024); + tmp_gres_cnt *= ((uint64_t)1024 * 1024 * 1024); else if ((last_num[0] == 't') || (last_num[0] == 'T')) - tmp_gres_cnt *= (uint64_t)pow(1024, 4); + tmp_gres_cnt *= ((uint64_t)1024 * 1024 * 1024 * + 1024); else if ((last_num[0] == 'p') || (last_num[0] == 'P')) - tmp_gres_cnt *= (uint64_t)pow(1024, 5); + tmp_gres_cnt *= ((uint64_t)1024 * 1024 * 1024 * + 1024 * 1024); else { error("Bad GRES configuration: %s", tok); break; diff --git a/src/plugins/burst_buffer/common/burst_buffer_common.c b/src/plugins/burst_buffer/common/burst_buffer_common.c index 888a9524b304b8beed2d1bd455bbde02893a0368..7e5413749619037cbdf0cd6b3f2171376c909926 100644 --- a/src/plugins/burst_buffer/common/burst_buffer_common.c +++ b/src/plugins/burst_buffer/common/burst_buffer_common.c @@ -1322,8 +1322,8 @@ extern void bb_job_log(bb_state_t *state_ptr, bb_job_t *bb_job) int i; if (bb_job) { - xstrfmtcat(out_buf, "%s: Job:%u ", - state_ptr->name, bb_job->job_id); + xstrfmtcat(out_buf, "%s: Job:%u UserID:%u ", + state_ptr->name, bb_job->job_id, bb_job->user_id); for (i = 0; i < bb_job->gres_cnt; i++) { xstrfmtcat(out_buf, "Gres[%d]:%s:%"PRIu64" ", i, bb_job->gres_ptr[i].name, diff --git a/src/plugins/burst_buffer/common/burst_buffer_common.h b/src/plugins/burst_buffer/common/burst_buffer_common.h index ca0dc4c450c7b0f09ba6a2ec527221bb6b7c8a0e..b4062cef05c04de39d2eacc17895162ec44cbd42 100644 --- a/src/plugins/burst_buffer/common/burst_buffer_common.h +++ b/src/plugins/burst_buffer/common/burst_buffer_common.h @@ -151,7 +151,7 @@ typedef struct { * burst_buffer string field */ #define BB_JOB_MAGIC 0xDEAD3412 typedef struct bb_job { - char *account; /* Associated account (for limits) */ + char *account; /* Associated account (for limits) */ uint32_t buf_cnt; /* Number of records in buf_ptr */ bb_buf_t *buf_ptr; /* Buffer creation records */ uint32_t gres_cnt; /* number of records in gres_ptr */ @@ -168,6 +168,7 @@ typedef struct bb_job { uint32_t swap_nodes; /* Number of nodes needed */ uint64_t total_size; /* Total bytes required for job (excludes * persistent buffers) */ + uint32_t user_id; /* user the job runs as */ } bb_job_t; /* Persistent buffer requests which are pending */ diff --git a/src/plugins/burst_buffer/cray/burst_buffer_cray.c b/src/plugins/burst_buffer/cray/burst_buffer_cray.c index 561e896ce61e8d5fa68ebec1a090ca63883a5622..8bce3b5265601c4b1bbf7ee4aa7362e4b990e51d 100644 --- a/src/plugins/burst_buffer/cray/burst_buffer_cray.c +++ b/src/plugins/burst_buffer/cray/burst_buffer_cray.c @@ -245,16 +245,14 @@ static void _purge_bb_files(uint32_t job_id, struct job_record *job_ptr); static void _purge_vestigial_bufs(void); static void _python2json(char *buf); static void _recover_bb_state(void); -static int _queue_setup(struct job_record *job_ptr, bb_job_t *bb_job); static int _queue_stage_in(struct job_record *job_ptr, bb_job_t *bb_job); -static int _queue_stage_out(struct job_record *job_ptr); +static int _queue_stage_out(bb_job_t *bb_job); static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry); static void _reset_buf_state(uint32_t user_id, uint32_t job_id, char *name, int new_state, uint64_t buf_size); static void _save_bb_state(void); static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc); static void * _start_pre_run(void *x); -static void * _start_setup(void *x); static void * _start_stage_in(void *x); static void * _start_stage_out(void *x); static void * _start_teardown(void *x); @@ -389,21 +387,13 @@ static int _alloc_job_bb(struct job_record *job_ptr, bb_job_t *bb_job, (_create_bufs(job_ptr, bb_job, job_ready) > 0)) return EAGAIN; - if (bb_job->total_size || bb_job->swap_size) { - if (bb_job->state < BB_STATE_STAGING_IN) { - bb_job->state = BB_STATE_STAGING_IN; - rc = _queue_stage_in(job_ptr, bb_job); - if (rc != SLURM_SUCCESS) { - bb_job->state = BB_STATE_TEARDOWN; - _queue_teardown(job_ptr->job_id, - job_ptr->user_id, true); - } - } - } else { - /* Job uses persistent burst buffer, just run setup */ - if (bb_job->state < BB_STATE_STAGING_IN) { - bb_job->state = BB_STATE_STAGING_IN; - rc = _queue_setup(job_ptr, bb_job); + if (bb_job->state < BB_STATE_STAGING_IN) { + bb_job->state = BB_STATE_STAGING_IN; + rc = _queue_stage_in(job_ptr, bb_job); + if (rc != SLURM_SUCCESS) { + bb_job->state = BB_STATE_TEARDOWN; + _queue_teardown(job_ptr->job_id, job_ptr->user_id, + true); } } @@ -462,6 +452,7 @@ static bb_job_t *_get_bb_job(struct job_record *job_ptr) bb_job->qos = xstrdup(qos_ptr->name); } bb_job->state = BB_STATE_PENDING; + bb_job->user_id = job_ptr->user_id; bb_specs = xstrdup(job_ptr->burst_buffer); tok = strtok_r(bb_specs, "\n", &save_ptr); while (tok) { @@ -1248,140 +1239,6 @@ static int _write_file(char *file_name, char *buf) return SLURM_SUCCESS; } -static int _queue_setup(struct job_record *job_ptr, bb_job_t *bb_job) -{ - char *hash_dir = NULL, *job_dir = NULL; - char *client_nodes_file_nid = NULL; - char **setup_argv; - stage_args_t *stage_args; - int hash_inx = job_ptr->job_id % 10; - pthread_attr_t stage_attr; - pthread_t stage_tid = 0; - int rc = SLURM_SUCCESS; - - xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx); - (void) mkdir(hash_dir, 0700); - xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id); - if (job_ptr->sched_nodes) { - xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir); - if (_write_nid_file(client_nodes_file_nid, - job_ptr->sched_nodes, job_ptr->job_id)) - xfree(client_nodes_file_nid); - } - setup_argv = xmalloc(sizeof(char *) * 20); /* NULL terminated */ - setup_argv[0] = xstrdup("dw_wlm_cli"); - setup_argv[1] = xstrdup("--function"); - setup_argv[2] = xstrdup("setup"); - setup_argv[3] = xstrdup("--token"); - xstrfmtcat(setup_argv[4], "%u", job_ptr->job_id); - setup_argv[5] = xstrdup("--caller"); - setup_argv[6] = xstrdup("SLURM"); - setup_argv[7] = xstrdup("--user"); - xstrfmtcat(setup_argv[8], "%d", job_ptr->user_id); - setup_argv[9] = xstrdup("--capacity"); - xstrfmtcat(setup_argv[10], "%s:%s", - bb_state.bb_config.default_pool, - bb_get_size_str(bb_job->total_size)); - setup_argv[11] = xstrdup("--job"); - xstrfmtcat(setup_argv[12], "%s/script", job_dir); - if (client_nodes_file_nid) { -#if defined(HAVE_NATIVE_CRAY) - setup_argv[13] = xstrdup("--nidlistfile"); -#else - setup_argv[13] = xstrdup("--nodehostnamefile"); -#endif - setup_argv[14] = xstrdup(client_nodes_file_nid); - } - - stage_args = xmalloc(sizeof(stage_args_t)); - stage_args->job_id = job_ptr->job_id; - stage_args->timeout = bb_state.bb_config.stage_in_timeout; - stage_args->args1 = setup_argv; -/* stage_args->args2 = NULL; Nothing to stage-in */ - - slurm_attr_init(&stage_attr); - if (pthread_attr_setdetachstate(&stage_attr, PTHREAD_CREATE_DETACHED)) - error("pthread_attr_setdetachstate error %m"); - while (pthread_create(&stage_tid, &stage_attr, _start_setup, - stage_args)) { - if (errno != EAGAIN) { - error("%s: pthread_create: %m", __func__); - _start_setup(stage_args); /* Do in-line */ - break; - } - usleep(100000); - } - slurm_attr_destroy(&stage_attr); - - xfree(hash_dir); - xfree(job_dir); - xfree(client_nodes_file_nid); - return rc; -} - -static void *_start_setup(void *x) -{ - stage_args_t *stage_args; - char **setup_argv, *resp_msg = NULL, *op = NULL; - int rc = SLURM_SUCCESS, status = 0, timeout; - slurmctld_lock_t job_write_lock = - { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK }; - struct job_record *job_ptr; - bb_job_t *bb_job; - DEF_TIMERS; - - stage_args = (stage_args_t *) x; - setup_argv = stage_args->args1; - - if (stage_args->timeout) - timeout = stage_args->timeout * 1000; - else - timeout = DEFAULT_OTHER_TIMEOUT * 1000; - op = "setup"; - START_TIMER; - resp_msg = bb_run_script("setup", - bb_state.bb_config.get_sys_state, - setup_argv, timeout, &status); - END_TIMER; - info("%s: setup for job %u ran for %s", - __func__, stage_args->job_id, TIME_STR); - _log_script_argv(setup_argv, resp_msg); - if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) { - error("%s: setup for job %u status:%u response:%s", - __func__, stage_args->job_id, status, resp_msg); - rc = SLURM_ERROR; - } - lock_slurmctld(job_write_lock); - job_ptr = find_job_record(stage_args->job_id); - if (!job_ptr) { - error("%s: unable to find job record for job %u", - __func__, stage_args->job_id); - } else if (rc == SLURM_SUCCESS) { - pthread_mutex_lock(&bb_state.bb_mutex); - bb_job = bb_job_find(&bb_state, stage_args->job_id); - if (bb_job) - bb_job->state = BB_STATE_STAGED_IN; - pthread_mutex_unlock(&bb_state.bb_mutex); - } else { - xfree(job_ptr->state_desc); - job_ptr->state_reason = FAIL_BURST_BUFFER_OP; - xstrfmtcat(job_ptr->state_desc, "%s: %s: %s", - plugin_type, op, resp_msg); - job_ptr->priority = 0; /* Hold job */ - pthread_mutex_lock(&bb_state.bb_mutex); - bb_job = bb_job_find(&bb_state, stage_args->job_id); - if (bb_job) - bb_job->state = BB_STATE_COMPLETE; - pthread_mutex_unlock(&bb_state.bb_mutex); - } - unlock_slurmctld(job_write_lock); - - xfree(resp_msg); - _free_script_argv(setup_argv); - xfree(stage_args); - return NULL; -} - static int _queue_stage_in(struct job_record *job_ptr, bb_job_t *bb_job) { char *hash_dir = NULL, *job_dir = NULL; @@ -1509,10 +1366,12 @@ static void *_start_stage_in(void *x) } else if (!bb_job) { error("%s: unable to find bb_job record for job %u", __func__, stage_args->job_id); - } else { + } else if (bb_job->total_size) { bb_job->state = BB_STATE_STAGING_IN; bb_alloc = bb_alloc_job(&bb_state, job_ptr, bb_job); bb_alloc->create_time = time(NULL); + } else { + bb_job->state = BB_STATE_STAGING_IN; } pthread_mutex_unlock(&bb_state.bb_mutex); unlock_slurmctld(job_read_lock); @@ -1557,7 +1416,8 @@ static void *_start_stage_in(void *x) bb_alloc->state = BB_STATE_STAGED_IN; bb_alloc->state_time = time(NULL); if (bb_state.bb_config.debug_flag) { - info("%s: Stage-in complete for job %u", + info("%s: Setup/stage-in complete for " + "job %u", __func__, stage_args->job_id); } queue_job_scheduler(); @@ -1600,24 +1460,24 @@ static void *_start_stage_in(void *x) return NULL; } -static int _queue_stage_out(struct job_record *job_ptr) +static int _queue_stage_out(bb_job_t *bb_job) { char *hash_dir = NULL, *job_dir = NULL; char **post_run_argv, **data_out_argv; stage_args_t *stage_args; - int hash_inx = job_ptr->job_id % 10, rc = SLURM_SUCCESS; + int hash_inx = bb_job->job_id % 10, rc = SLURM_SUCCESS; pthread_attr_t stage_attr; pthread_t stage_tid = 0; xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx); - xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id); + xstrfmtcat(job_dir, "%s/job.%u", hash_dir, bb_job->job_id); data_out_argv = xmalloc(sizeof(char *) * 10); /* NULL terminated */ data_out_argv[0] = xstrdup("dw_wlm_cli"); data_out_argv[1] = xstrdup("--function"); data_out_argv[2] = xstrdup("data_out"); data_out_argv[3] = xstrdup("--token"); - xstrfmtcat(data_out_argv[4], "%u", job_ptr->job_id); + xstrfmtcat(data_out_argv[4], "%u", bb_job->job_id); data_out_argv[5] = xstrdup("--job"); xstrfmtcat(data_out_argv[6], "%s/script", job_dir); @@ -1626,16 +1486,16 @@ static int _queue_stage_out(struct job_record *job_ptr) post_run_argv[1] = xstrdup("--function"); post_run_argv[2] = xstrdup("post_run"); post_run_argv[3] = xstrdup("--token"); - xstrfmtcat(post_run_argv[4], "%u", job_ptr->job_id); + xstrfmtcat(post_run_argv[4], "%u", bb_job->job_id); post_run_argv[5] = xstrdup("--job"); xstrfmtcat(post_run_argv[6], "%s/script", job_dir); stage_args = xmalloc(sizeof(stage_args_t)); stage_args->args1 = data_out_argv; stage_args->args2 = post_run_argv; - stage_args->job_id = job_ptr->job_id; + stage_args->job_id = bb_job->job_id; stage_args->timeout = bb_state.bb_config.stage_out_timeout; - stage_args->user_id = job_ptr->user_id; + stage_args->user_id = bb_job->user_id; slurm_attr_init(&stage_attr); if (pthread_attr_setdetachstate(&stage_attr, PTHREAD_CREATE_DETACHED)) @@ -1737,8 +1597,8 @@ static void *_start_stage_out(void *x) if (bb_alloc) { if (rc == SLURM_SUCCESS) { if (bb_state.bb_config.debug_flag) { - info("%s: Stage-out complete for " - "job %u", + info("%s: Stage-out/post-run complete " + "for job %u", __func__, stage_args->job_id); } /* bb_alloc->state = BB_STATE_STAGED_OUT; */ @@ -1759,7 +1619,7 @@ static void *_start_stage_out(void *x) } } bb_state.last_update_time = time(NULL); - } else { + } else if (bb_job && bb_job->total_size) { error("%s: unable to find bb record for job %u", __func__, stage_args->job_id); } @@ -3411,12 +3271,9 @@ extern int bb_p_job_start_stage_out(struct job_record *job_ptr) /* No job buffers. Assuming use of persistent buffers only */ verbose("%s: %s bb job record not found", __func__, jobid2fmt(job_ptr, jobid_buf, sizeof(jobid_buf))); - } else if (bb_job->total_size == 0) { - bb_job->state = BB_STATE_TEARDOWN; - _queue_teardown(job_ptr->job_id, job_ptr->user_id, false); } else if (bb_job->state < BB_STATE_STAGING_OUT) { bb_job->state = BB_STATE_STAGING_OUT; - _queue_stage_out(job_ptr); + _queue_stage_out(bb_job); } pthread_mutex_unlock(&bb_state.bb_mutex); @@ -3806,8 +3663,8 @@ static void *_create_persistent(void *x) _log_script_argv(script_argv, resp_msg); _free_script_argv(script_argv); END_TIMER; - if (bb_state.bb_config.debug_flag) - debug("%s: ran for %s", __func__, TIME_STR); + info("create_persistent of %s ran for %s", + create_args->name, TIME_STR); if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) { error("%s: For JobID=%u Name=%s status:%u response:%s", __func__, create_args->job_id, create_args->name, @@ -3948,8 +3805,8 @@ static void *_destroy_persistent(void *x) _log_script_argv(script_argv, resp_msg); _free_script_argv(script_argv); END_TIMER; - if (bb_state.bb_config.debug_flag) - debug("%s: destroy_persistent ran for %s", __func__, TIME_STR); + info("destroy_persistent of %s ran for %s", + destroy_args->name, TIME_STR); if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) { error("%s: destroy_persistent for JobID=%u Name=%s " "status:%u response:%s", diff --git a/src/sbcast/Makefile.am b/src/sbcast/Makefile.am index 2dea1fb8a884f467ccd8752037bf491d47e18223..0c4c4a73540ee249d345ce84b5e04153ff64362a 100644 --- a/src/sbcast/Makefile.am +++ b/src/sbcast/Makefile.am @@ -9,7 +9,7 @@ bin_PROGRAMS = sbcast sbcast_LDADD = $(top_builddir)/src/api/libslurm.o $(DL_LIBS) -lm noinst_HEADERS = sbcast.h -sbcast_SOURCES = agent.c sbcast.c opts.c +sbcast_SOURCES = sbcast.c opts.c force: $(sbcast_LDADD) : force diff --git a/src/sbcast/Makefile.in b/src/sbcast/Makefile.in index 0681e79d2e8f272e026928ff4d1c2e3b2361cba2..803cae06eb146807f2ac9615811521b4680492ae 100644 --- a/src/sbcast/Makefile.in +++ b/src/sbcast/Makefile.in @@ -139,7 +139,7 @@ CONFIG_CLEAN_FILES = CONFIG_CLEAN_VPATH_FILES = am__installdirs = "$(DESTDIR)$(bindir)" PROGRAMS = $(bin_PROGRAMS) -am_sbcast_OBJECTS = agent.$(OBJEXT) sbcast.$(OBJEXT) opts.$(OBJEXT) +am_sbcast_OBJECTS = sbcast.$(OBJEXT) opts.$(OBJEXT) sbcast_OBJECTS = $(am_sbcast_OBJECTS) am__DEPENDENCIES_1 = sbcast_DEPENDENCIES = $(top_builddir)/src/api/libslurm.o \ @@ -465,7 +465,7 @@ AUTOMAKE_OPTIONS = foreign AM_CPPFLAGS = -I$(top_srcdir) $(BG_INCLUDES) sbcast_LDADD = $(top_builddir)/src/api/libslurm.o $(DL_LIBS) -lm noinst_HEADERS = sbcast.h -sbcast_SOURCES = agent.c sbcast.c opts.c +sbcast_SOURCES = sbcast.c opts.c sbcast_LDFLAGS = -export-dynamic $(CMD_LDFLAGS) all: all-am @@ -561,7 +561,6 @@ mostlyclean-compile: distclean-compile: -rm -f *.tab.c -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/agent.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/opts.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/sbcast.Po@am__quote@ diff --git a/src/sbcast/opts.c b/src/sbcast/opts.c index d6354e5a421b01172c69b75242fa3d34eaa8c949..8c176432adbbf314a8e81f78813b0413b641484d 100644 --- a/src/sbcast/opts.c +++ b/src/sbcast/opts.c @@ -55,8 +55,9 @@ #include <stdlib.h> #include <unistd.h> -#include "src/common/xstring.h" +#include "src/common/file_bcast.h" #include "src/common/proc_args.h" +#include "src/common/xstring.h" #include "src/sbcast/sbcast.h" diff --git a/src/sbcast/sbcast.c b/src/sbcast/sbcast.c index 49997e650ee3a1fa533396335abb7b8ae8d1bf61..fba1427819460525df6403bbf06c7c1ed4c309f1 100644 --- a/src/sbcast/sbcast.c +++ b/src/sbcast/sbcast.c @@ -51,6 +51,7 @@ #include <sys/stat.h> #include "slurm/slurm_errno.h" +#include "src/common/file_bcast.h" #include "src/common/forward.h" #include "src/common/hostlist.h" #include "src/common/log.h" @@ -65,17 +66,11 @@ #include "src/common/xstring.h" /* global variables */ -int fd; /* source file descriptor */ -struct sbcast_parameters params; /* program parameters */ -struct stat f_stat; /* source file stats */ -job_sbcast_cred_msg_t *sbcast_cred; /* job alloc info and sbcast cred */ - -static void _bcast_file(void); -static void _get_job_info(void); - +struct bcast_parameters params; /* program parameters */ int main(int argc, char *argv[]) { + int rc; log_options_t opts = LOG_OPTS_STDERR_ONLY; log_init("sbcast", opts, SYSLOG_FACILITY_DAEMON, NULL); @@ -94,152 +89,6 @@ int main(int argc, char *argv[]) log_alter(opts, SYSLOG_FACILITY_DAEMON, NULL); } - /* validate the source file */ - if ((fd = open(params.src_fname, O_RDONLY)) < 0) { - error("Can't open `%s`: %s", params.src_fname, - strerror(errno)); - exit(1); - } - if (fstat(fd, &f_stat)) { - error("Can't stat `%s`: %s", params.src_fname, - strerror(errno)); - exit(1); - } - verbose("modes = %o", (unsigned int) f_stat.st_mode); - verbose("uid = %d", (int) f_stat.st_uid); - verbose("gid = %d", (int) f_stat.st_gid); - verbose("atime = %s", slurm_ctime2(&f_stat.st_atime)); - verbose("mtime = %s", slurm_ctime2(&f_stat.st_mtime)); - verbose("ctime = %s", slurm_ctime2(&f_stat.st_ctime)); - verbose("size = %ld", (long) f_stat.st_size); - verbose("-----------------------------"); - - /* identify the nodes allocated to the job */ - _get_job_info(); - - /* transmit the file */ - _bcast_file(); -/* slurm_free_sbcast_cred_msg(sbcast_cred); */ - - exit(0); -} - -/* get details about this slurm job: jobid and allocated node */ -static void _get_job_info(void) -{ - xassert(params.job_id != NO_VAL); - - if (slurm_sbcast_lookup(params.job_id, params.step_id, &sbcast_cred) - != SLURM_SUCCESS) { - if (params.step_id == NO_VAL) { - error("Slurm job ID %u lookup error: %s", - params.job_id, slurm_strerror(slurm_get_errno())); - } else { - error("Slurm step ID %u.%u lookup error: %s", - params.job_id, params.step_id, - slurm_strerror(slurm_get_errno())); - } - exit(1); - } - - if (params.step_id == NO_VAL) - verbose("jobid = %u", params.job_id); - else - verbose("jobid = %u.%u", params.job_id, params.step_id); - verbose("node_cnt = %u", sbcast_cred->node_cnt); - verbose("node_list = %s", sbcast_cred->node_list); - /* also see sbcast_cred->node_addr (array) */ - - if (params.verbose) - print_sbcast_cred(sbcast_cred->sbcast_cred); - - /* do not bother to release the return message, - * we need to preserve and use most of the information later */ -} - -/* load a buffer with data from the file to broadcast, - * return number of bytes read, zero on end of file */ -static ssize_t _get_block(char *buffer, size_t buf_size) -{ - static int fd = 0; - ssize_t buf_used = 0, rc; - - if (!fd) { - fd = open(params.src_fname, O_RDONLY); - if (!fd) { - error("Can't open `%s`: %s", - params.src_fname, strerror(errno)); - exit(1); - } - } - - while (buf_size) { - rc = read(fd, buffer, buf_size); - if (rc == -1) { - if ((errno == EINTR) || (errno == EAGAIN)) - continue; - error("Can't read `%s`: %s", - params.src_fname, strerror(errno)); - exit(1); - } else if (rc == 0) { - debug("end of file reached"); - break; - } - - buffer += rc; - buf_size -= rc; - buf_used += rc; - } - return buf_used; -} - -/* read and broadcast the file */ -static void _bcast_file(void) -{ - int buf_size; - ssize_t size_read = 0; - file_bcast_msg_t bcast_msg; - char *buffer; - - if (params.block_size) - buf_size = MIN(params.block_size, f_stat.st_size); - else - buf_size = MIN((512 * 1024), f_stat.st_size); - - bcast_msg.fname = params.dst_fname; - bcast_msg.block_no = 1; - bcast_msg.last_block = 0; - bcast_msg.force = params.force; - bcast_msg.modes = f_stat.st_mode; - bcast_msg.uid = f_stat.st_uid; - bcast_msg.user_name = uid_to_string(f_stat.st_uid); - bcast_msg.gid = f_stat.st_gid; - buffer = xmalloc(buf_size); - bcast_msg.block = buffer; - bcast_msg.block_len = 0; - bcast_msg.cred = sbcast_cred->sbcast_cred; - - if (params.preserve) { - bcast_msg.atime = f_stat.st_atime; - bcast_msg.mtime = f_stat.st_mtime; - } else { - bcast_msg.atime = 0; - bcast_msg.mtime = 0; - } - - while (1) { - bcast_msg.block_len = _get_block(buffer, buf_size); - debug("block %d, size %u", bcast_msg.block_no, - bcast_msg.block_len); - size_read += bcast_msg.block_len; - if (size_read >= f_stat.st_size) - bcast_msg.last_block = 1; - - send_rpc(&bcast_msg, sbcast_cred); - if (bcast_msg.last_block) - break; /* end of file */ - bcast_msg.block_no++; - } - xfree(bcast_msg.user_name); - xfree(buffer); + rc = bcast_file(¶ms); + return rc; } diff --git a/src/sbcast/sbcast.h b/src/sbcast/sbcast.h index 216fa66315b0383e908b7bf989482956630e9ad4..c201e6853c83de4a6a8a0526b66c8c06edfe1546 100644 --- a/src/sbcast/sbcast.h +++ b/src/sbcast/sbcast.h @@ -45,27 +45,12 @@ #endif #include "slurm/slurm.h" +#include "src/common/file_bcast.h" #include "src/common/macros.h" #include "src/common/slurm_protocol_defs.h" -struct sbcast_parameters { - uint32_t block_size; - bool compress; - int fanout; - bool force; - uint32_t job_id; - uint32_t step_id; - bool preserve; - int timeout; - int verbose; - char *src_fname; - char *dst_fname; -}; - -extern struct sbcast_parameters params; +extern struct bcast_parameters params; extern void parse_command_line(int argc, char *argv[]); -extern void send_rpc(file_bcast_msg_t *bcast_msg, - job_sbcast_cred_msg_t *sbcast_cred); #endif diff --git a/src/scancel/opt.c b/src/scancel/opt.c index c27a366c3d0c1e4d5e6e1198f62ae84d848d4c3b..b1d7f679a91e92607a19b79ee65faef51508218d 100644 --- a/src/scancel/opt.c +++ b/src/scancel/opt.c @@ -193,7 +193,7 @@ _xlate_state_name(const char *state_name, bool env_var) { uint32_t i = job_state_num(state_name); - if (i >= 0) + if (i != NO_VAL) return i; if (env_var) { diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 7f5988ccac427e4b0af504e875ea6cce6b199fa4..9a4d03d81692d4b47d01a3a3f948da8255c59c25 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -3573,9 +3573,9 @@ _rpc_file_bcast(slurm_msg_t *msg) error("sbcast: uid:%u can't chmod `%s`: %s", req_uid, req->fname, strerror(errno)); } - if (req->last_block && fchown(fd, req->uid, req->gid)) { - error("sbcast: uid:%u can't chown `%s`: %s", - req_uid, req->fname, strerror(errno)); + if (req->last_block && fchown(fd, req_uid, req_gid)) { + error("sbcast: uid:%u gid:%u can't chown `%s`: %s", + req_uid, req_gid, req->fname, strerror(errno)); } close(fd); if (req->last_block && req->atime) { diff --git a/src/squeue/opts.c b/src/squeue/opts.c index 3e45216de4223e4a84f280b88cd840cfcbd0ac5b..7d85dcce75190066d6952fc65733d8350c7d0337 100644 --- a/src/squeue/opts.c +++ b/src/squeue/opts.c @@ -481,7 +481,7 @@ _parse_state( char* str, uint32_t* states ) uint32_t i; char *state_names; - if ((i = job_state_num(str)) >= 0) { + if ((i = job_state_num(str)) != NO_VAL) { *states = i; return SLURM_SUCCESS; } diff --git a/src/srun/libsrun/opt.c b/src/srun/libsrun/opt.c index 581e59b5aa23bb06de59cec24d0d7c66d06477ef..113f64bed4e3b927dc4c358c7c74f885619829a5 100644 --- a/src/srun/libsrun/opt.c +++ b/src/srun/libsrun/opt.c @@ -128,6 +128,7 @@ #define OPT_SICP 0x1b #define OPT_POWER 0x1c #define OPT_THREAD_SPEC 0x1d +#define OPT_BCAST 0x1e #define OPT_PROFILE 0x20 #define OPT_EXPORT 0x21 #define OPT_HINT 0x22 @@ -136,6 +137,7 @@ #define LONG_OPT_HELP 0x100 #define LONG_OPT_USAGE 0x101 #define LONG_OPT_XTO 0x102 +#define LONG_OPT_BCAST 0x103 #define LONG_OPT_TIMEO 0x104 #define LONG_OPT_JOBID 0x105 #define LONG_OPT_TMP 0x106 @@ -461,6 +463,8 @@ static void _opt_default(void) opt.shared = (uint16_t)NO_VAL; opt.exclusive = false; opt.export_env = NULL; + opt.bcast_flag = false; + opt.bcast_file = NULL; opt.no_kill = false; opt.kill_bad_exit = NO_VAL; @@ -571,6 +575,7 @@ env_vars_t env_vars[] = { {"SLURMD_DEBUG", OPT_INT, &opt.slurmd_debug, NULL }, {"SLURM_ACCOUNT", OPT_STRING, &opt.account, NULL }, {"SLURM_ACCTG_FREQ", OPT_STRING, &opt.acctg_freq, NULL }, +{"SLURM_BCAST", OPT_BCAST, NULL, NULL }, {"SLURM_BLRTS_IMAGE", OPT_STRING, &opt.blrtsimage, NULL }, {"SLURM_BURST_BUFFER", OPT_STRING, &opt.burst_buffer, NULL }, {"SLURM_CHECKPOINT", OPT_STRING, &opt.ckpt_interval_str, NULL }, @@ -771,6 +776,14 @@ _process_env_var(env_vars_t *e, const char *val) opt.export_env = xstrdup(val); break; + case OPT_BCAST: + if (val) { + xfree(opt.bcast_file); + opt.bcast_file = xstrdup(val); + } + opt.bcast_flag = true; + break; + case OPT_RESV_PORTS: if (val) opt.resv_port_cnt = strtol(val, NULL, 10); @@ -913,6 +926,7 @@ static void _set_options(const int argc, char **argv) {"acctg-freq", required_argument, 0, LONG_OPT_ACCTG_FREQ}, {"bb", required_argument, 0, LONG_OPT_BURST_BUFFER_SPEC}, {"bbf", required_argument, 0, LONG_OPT_BURST_BUFFER_FILE}, + {"bcast", optional_argument, 0, LONG_OPT_BCAST}, {"begin", required_argument, 0, LONG_OPT_BEGIN}, {"blrts-image", required_argument, 0, LONG_OPT_BLRTS_IMAGE}, {"checkpoint", required_argument, 0, LONG_OPT_CHECKPOINT}, @@ -1261,6 +1275,13 @@ static void _set_options(const int argc, char **argv) xfree(opt.export_env); opt.export_env = xstrdup(optarg); break; + case LONG_OPT_BCAST: + if (optarg) { + xfree(opt.bcast_file); + opt.bcast_file = xstrdup(optarg); + } + opt.bcast_flag = true; + break; case LONG_OPT_CPU_BIND: if (slurm_verify_cpu_bind(optarg, &opt.cpu_bind, &opt.cpu_bind_type)) @@ -1859,9 +1880,9 @@ static void _opt_args(int argc, char **argv) } #else (void) launch_g_handle_multi_prog_verify(command_pos); - if (test_exec) { + if (test_exec || opt.bcast_flag) { if ((fullpath = search_path(opt.cwd, opt.argv[command_pos], - false, X_OK, test_exec))) { + false, X_OK, true))) { xfree(opt.argv[command_pos]); opt.argv[command_pos] = fullpath; } else { @@ -2484,6 +2505,10 @@ static void _opt_list(void) if (opt.gres) info("gres : %s", opt.gres); info("exclusive : %s", tf_(opt.exclusive)); + if (opt.bcast_file) + info("bcast : %s", opt.bcast_file); + else + info("bcast : %s", tf_(opt.bcast_flag)); info("qos : %s", opt.qos); if (opt.shared != (uint16_t) NO_VAL) info("shared : %u", opt.shared); @@ -2649,7 +2674,7 @@ static void _usage(void) " [--switches=max-switches{@max-time-to-wait}] [--reboot]\n" " [--core-spec=cores] [--thread-spec=threads]\n" " [--bb=burst_buffer_spec] [--bbf=burst_buffer_file]\n" -" [--acctg-freq=<datatype>=<interval>\n" +" [--acctg-freq=<datatype>=<interval>} [--bcast=<dest_path>]\n" " [-w hosts...] [-x hosts...] executable [args...]\n"); } @@ -2669,6 +2694,7 @@ static void _help(void) " network=<interval> filesystem=<interval>\n" " --bb=<spec> burst buffer specifications\n" " --bbf=<file_name> burst buffer specification file\n" +" --bcast=<dest_path> Copy executable file to compute nodes\n" " --begin=time defer job until HH:MM MM/DD/YY\n" " -c, --cpus-per-task=ncpus number of cpus required per task\n" " --checkpoint=time job step checkpoint interval\n" diff --git a/src/srun/libsrun/opt.h b/src/srun/libsrun/opt.h index 906891fe6208d535a7719687dc9e4699e3c9d7d3..4dfba016d668ae08759412d8fc5eb779e42227e2 100644 --- a/src/srun/libsrun/opt.h +++ b/src/srun/libsrun/opt.h @@ -113,6 +113,8 @@ typedef struct srun_options { char *ckpt_interval_str;/* --checkpoint (string) */ char *ckpt_dir; /* --checkpoint-dir (string) */ bool exclusive; /* --exclusive */ + char *bcast_file; /* --bcast, copy executable to compute nodes */ + bool bcast_flag; /* --bcast, copy executable to compute nodes */ int resv_port_cnt; /* --resv_ports */ char *partition; /* --partition=n, -p n */ enum task_dist_states diff --git a/src/srun/srun.c b/src/srun/srun.c index 93bd99723e16bd89ea673ecb548ae88fa9db940c..7bdef618f3366fdd19ae7abf9ff877d3ed2b4b34 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -72,6 +72,7 @@ #include <grp.h> #include "src/common/fd.h" +#include "src/common/file_bcast.h" #include "src/common/hostlist.h" #include "src/common/log.h" #include "src/common/net.h" @@ -131,6 +132,7 @@ int sig_array[] = { /* * forward declaration of static funcs */ +static int _file_bcast(void); static void _pty_restore(void); static void _set_exit_code(void); static void _set_node_alias(void); @@ -192,6 +194,8 @@ int srun(int ac, char **av) /* * Enhance environment for job */ + if (opt.bcast_flag) + _file_bcast(); if (opt.cpus_set) env->cpus_per_task = opt.cpus_per_task; if (opt.ntasks_per_node != NO_VAL) @@ -288,6 +292,45 @@ relaunch: return (int)global_rc; } +static int _file_bcast(void) +{ + struct bcast_parameters *params; + int rc; + + if ((opt.argc == 0) || (opt.argv[0] == NULL)) { + error("No command name to broadcast"); + return SLURM_ERROR; + } + params = xmalloc(sizeof(struct bcast_parameters)); + params->block_size = 8 * 1024 * 1024; + params->compress = 0; + if (opt.bcast_file) { + params->dst_fname = xstrdup(opt.bcast_file); + } else { + xstrfmtcat(params->dst_fname, "%s/slurm_bcast_%u.%u", + opt.cwd, job->jobid, job->stepid); + } + params->fanout = 0; + params->job_id = job->jobid; + params->force = true; + params->preserve = true; + params->src_fname = opt.argv[0]; + params->step_id = job->stepid; + params->timeout = 0; + params->verbose = 0; + + rc = bcast_file(params); + if (rc == SLURM_SUCCESS) { + xfree(opt.argv[0]); + opt.argv[0] = params->dst_fname; + } else { + xfree(params->dst_fname); + } + xfree(params); + + return rc; +} + static int _slurm_debug_env_val (void) { long int level = 0; diff --git a/testsuite/expect/Makefile.am b/testsuite/expect/Makefile.am index b249257e0d39015b63ee6868ac48739cc5546d5d..52f35e8a0f314e7f20ffee1d5e80c307af67615f 100644 --- a/testsuite/expect/Makefile.am +++ b/testsuite/expect/Makefile.am @@ -125,6 +125,7 @@ EXTRA_DIST = \ test1.97 \ test1.99 \ test1.100 \ + test1.101 \ test2.1 \ test2.2 \ test2.3 \ diff --git a/testsuite/expect/Makefile.in b/testsuite/expect/Makefile.in index 46ab8841fb7594077a953b777185f4f489125d9e..efc77f2ab1cd1576f9fc6f65f1bf5d2ba70c7033 100644 --- a/testsuite/expect/Makefile.in +++ b/testsuite/expect/Makefile.in @@ -524,6 +524,7 @@ EXTRA_DIST = \ test1.97 \ test1.99 \ test1.100 \ + test1.101 \ test2.1 \ test2.2 \ test2.3 \ diff --git a/testsuite/expect/README b/testsuite/expect/README index 903479d1a996b81b031b582ea16317a9156c09fb..d56a0e1618d4717d04ddd60821aa536dc20d515b 100644 --- a/testsuite/expect/README +++ b/testsuite/expect/README @@ -212,6 +212,7 @@ test1.97 Test that --ntask-per-node and -c options are enforced test1.98 AVAILBLE... test1.99 Validate that SrunPortRange is enforced when using srun test1.100 Test of pack/nopack task distribution. +test1.101 Test of --bcast option. **NOTE** The above tests for multiple processor/partition systems only test2.# Testing of scontrol options (to be run as unprivileged user). diff --git a/testsuite/expect/test1.101 b/testsuite/expect/test1.101 new file mode 100755 index 0000000000000000000000000000000000000000..6fa90a45e791c5737625f755db5bb8e1ff79306a --- /dev/null +++ b/testsuite/expect/test1.101 @@ -0,0 +1,150 @@ +#!/usr/bin/env expect +############################################################################ +# Purpose: Test of SLURM functionality +# est of --bcast option. +# +# Output: "TEST: #.#" followed by "SUCCESS" if test was successful, OR +# "WARNING: ..." with an explanation of why the test can't be made, OR +# "FAILURE: ..." otherwise with an explanation of the failure, OR +# anything else indicates a failure mode that must be investigated. +############################################################################ +# Copyright (C) 2015 SchedMD LLC +# +# This file is part of SLURM, a resource management program. +# For details, see <http://slurm.schedmd.com/>. +# Please also read the included file: DISCLAIMER. +# +# SLURM is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation; either version 2 of the License, or (at your option) +# any later version. +# +# SLURM is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along +# with SLURM; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +############################################################################ +source ./globals + +set test_id "1.101" +set exit_code 0 +set file_in "test$test_id.input" +set job_id 0 +set prompt "PROMPT: " +set timeout $max_job_delay + +print_header $test_id + +if {[test_alps]} { + send_user "\nWARNING: This test is incompatible with Cray systems\n" + exit 0 +} elseif {[test_bluegene]} { + send_user "\nWARNING: This test is incompatible with Bluegene systems\n" + exit 0 +} elseif {[test_serial]} { + send_user "\nWARNING: This test is incompatible with serial systems\n" + exit 0 +} elseif { [test_xcpu] } { + send_user "\nWARNING: This test is incompatible with XCPU systems\n" + exit 0 +} + +make_bash_script $file_in "echo \$0" +set salloc_pid [spawn $salloc -N1-5 -t1 bash] +expect { + -re "Granted job allocation ($number)" { + set job_id $expect_out(1,string) + send "export PS1=\"$prompt\"\r" + exp_continue + } + -re "\"$prompt" { + # skip this, just echo of setting prompt" + exp_continue + } + -re "$prompt" { + #send_user "Job initiated\n" + } + timeout { + send_user "\nFAILURE: salloc not responding\n" + if {$job_id != 0} { + cancel_job $job_id + } + slow_kill [expr 0 - $salloc_pid] + exit 1 + } +} +if {$job_id == 0} { + send_user "\nFAILURE: did not get job_id\n" + exit 1 +} + +send "$srun rm -f /tmp/test$test_id\r" +expect { + -re "$prompt" { + #break + } + timeout { + send_user "\nFAILURE: srun not responding\n" + set exit_code 1 + } +} + +set node_cnt 99 +send "echo \$SLURM_NNODES\r" +expect { + -re "($number)" { + set node_cnt $expect_out(1,string) + exp_continue + } + -re "$prompt" { + #break + } + timeout { + send_user "\nFAILURE: srun not responding\n" + set exit_code 1 + } +} + +set exec_cnt 0 +send "$srun -l --bcast=/tmp/test$test_id ./$file_in\r" +expect { + -re "($number): /tmp/test$test_id" { + incr exec_cnt + exp_continue + } + -re "$prompt" { + #break + } + timeout { + send_user "\nFAILURE: srun not responding\n" + set exit_code 1 + } +} + +send "$srun -l rm -v /tmp/test$test_id\r" +expect { + -re "$prompt" { + #break + } + timeout { + send_user "\nFAILURE: srun not responding\n" + set exit_code 1 + } +} + +send "exit\r" + +if {$exec_cnt != $node_cnt} { + send_user "\nFAILURE: executable count != node count ($exec_cnt != $node_cnt)\n" + set exit_code 1 +} + +if {$exit_code == 0} { + exec rm -f $file_in + send_user "\nSUCCESS\n" +} +exit $exit_code diff --git a/testsuite/expect/test1.5 b/testsuite/expect/test1.5 index 74545e89c7425821782d1a9fbf0fd12c140f4978..40f40bde23fe49710617dc479fc4f4d938d33e29 100755 --- a/testsuite/expect/test1.5 +++ b/testsuite/expect/test1.5 @@ -43,7 +43,7 @@ print_header $test_id # set srun_pid [spawn $srun --help] expect { - -re "srun .* executable" { + -re "Usage:" { incr matches exp_continue }