diff --git a/doc/man/man1/sbcast.1 b/doc/man/man1/sbcast.1 index f6da948797c0d7afdb62615bbb2850324ff8970f..6b60e5d46d88d111c809ca602a53ad1d283a7a47 100644 --- a/doc/man/man1/sbcast.1 +++ b/doc/man/man1/sbcast.1 @@ -17,8 +17,7 @@ allocation. file copy to be created on each node. \fBDEST\fR should be on a file system local to that node. Note that parallel file systems may provide better performance -than \fBsbcast\fR can provide, particularly for larger files -(over one megabyte). +than \fBsbcast\fR can provide. .SH "OPTIONS" .TP @@ -37,7 +36,8 @@ Specify the block size used for file broadcast. The size can have a suffix of \fIk\fR or \fIm\fR for kilobytes or megabytes respecitively (defaults to bytes). This size subject to rounding and range limits to maintain -good performance. +good performance. This value may need to be set on systems +with very limited memory. .TP \fB\-v\fR, \fB\-\-verbose\fR Provide detailed event logging through program execution. diff --git a/src/common/forward.c b/src/common/forward.c index 7d1aa94e78679298bec35bfd6cb2315eda96e3c2..f3fda37ce7ce5879f23785e5f8c72490caccfdb1 100644 --- a/src/common/forward.c +++ b/src/common/forward.c @@ -227,7 +227,7 @@ extern int forward_msg(forward_struct_t *forward_struct, int retries = 0; forward_msg_t *forward_msg; int thr_count = 0; - int *span = set_span(header->forward.cnt); + int *span = set_span(header->forward.cnt, 0); slurm_mutex_init(&forward_struct->forward_mutex); pthread_cond_init(&forward_struct->notify, NULL); diff --git a/src/common/forward.h b/src/common/forward.h index d4f8e8a1f259d6b674ff5e9c06a0131139779449..b08114c8e65675a4970e4fb3741118dfe59bec47 100644 --- a/src/common/forward.h +++ b/src/common/forward.h @@ -94,7 +94,7 @@ extern int forward_msg(forward_struct_t *forward_struct, // a message that could be forwarded. // Set the span with total count of hosts to send to -int *span = set_span(agent_arg_ptr->node_count); +int *span = set_span(agent_arg_ptr->node_count, 0); // Fill in a local forward structure with count of threads to created // by this program, an array of names and addrs of hosts and node_id @@ -154,7 +154,7 @@ Code taken from srun/launch.c This function should be used sending a launch message that could be forwarded. //set the span with total count of hosts to send to -int *span = set_span(job->step_layout->num_hosts); +int *span = set_span(job->step_layout->num_hosts, 0); //set up hostlist off the nodelist of the job hostlist = hostlist_create(job->nodelist); diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 085001e49c5ccde41b3f1e183d5a55673bca0d96..8529937230d90910943adb0c0f98d76b48dc9675 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -1760,12 +1760,14 @@ int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc) return ret_val; } -extern int *set_span(int total) +extern int *set_span(int total, uint16_t tree_width) { int *span; int left = total; int i = 0; - uint16_t tree_width = slurm_get_tree_width(); + + if (tree_width == 0) + tree_width = slurm_get_tree_width(); span = xmalloc(sizeof(int) * tree_width); //info("span count = %d", tree_width); diff --git a/src/common/slurm_protocol_api.h b/src/common/slurm_protocol_api.h index a27655faf780aaddf0d2aa266bfc4eb4a7081377..f5ecd346259da1fbab3ac1f53c0efea66cbe79ac 100644 --- a/src/common/slurm_protocol_api.h +++ b/src/common/slurm_protocol_api.h @@ -581,8 +581,10 @@ int slurm_send_only_node_msg(slurm_msg_t * request_msg); /* set_span * build an array indicating how message fanout should occur - * NODE: Returned array MUST be release by caller using xfree */ -extern int *set_span(int total); + * IN total - total number of nodes to communicate with + * IN tree_width - message fanout, use system default if zero + * NOTE: Returned array MUST be release by caller using xfree */ +extern int *set_span(int total, uint16_t tree_width); void slurm_free_msg(slurm_msg_t * msg); void slurm_free_cred(void *cred); diff --git a/src/sbcast/agent.c b/src/sbcast/agent.c index 981e4bcae75755a13a0490b8c6edc343e9d75c38..6aa9994fe96b24315e56645274dbf30adbb22c9e 100644 --- a/src/sbcast/agent.c +++ b/src/sbcast/agent.c @@ -50,20 +50,10 @@ #include "src/sbcast/sbcast.h" #define MAX_RETRIES 10 -#define MAX_THREADS 4 /* These are huge messages, so only - * run MAX_THREADS at one time */ - -typedef enum { - DSH_NEW, /* Request not yet started */ - DSH_ACTIVE, /* Request in progress */ - DSH_DONE, /* Request completed normally */ - DSH_NO_RESP, /* Request timed out */ - DSH_FAILED /* Request resulted in error */ -} state_t; - +#define MAX_THREADS 3 /* These can be huge messages, so + * only run MAX_THREADS at one time */ typedef struct thd { pthread_t thread; /* thread ID */ - pthread_attr_t attr; /* thread attributes */ slurm_msg_t *msg; /* message to send */ int rc; /* highest return codes from RPC */ } thd_t; @@ -74,87 +64,6 @@ static int agent_cnt = 0; static void *_agent_thread(void *args); -/* issue the RPC to ship the file's data */ -extern void send_rpc(file_bcast_msg_t *bcast_msg, - resource_allocation_response_msg_t *alloc_resp) -{ - /* This code will handle message fanout to multiple slurmd */ - forward_t from, *forward; - hostlist_t hl; - int i, rc = 0, retries = 0; - int thr_count = 0, *span = set_span(alloc_resp->node_cnt); - thd_t *thread_info; - - thread_info = xmalloc(slurm_get_tree_width() * sizeof(thd_t)); - forward = xmalloc(slurm_get_tree_width() * sizeof(forward_t)); - from.cnt = alloc_resp->node_cnt; - from.name = xmalloc(MAX_SLURM_NAME * alloc_resp->node_cnt); - hl = hostlist_create(alloc_resp->node_list); - for (i=0; i<alloc_resp->node_cnt; i++) { - char *host = hostlist_shift(hl); - strncpy(&from.name[MAX_SLURM_NAME*i], host, MAX_SLURM_NAME); - free(host); - } - hostlist_destroy(hl); - from.addr = alloc_resp->node_addr; - from.node_id = NULL; - from.timeout = SLURM_MESSAGE_TIMEOUT_MSEC_STATIC; - for (i=0; i<alloc_resp->node_cnt; i++) { - forward_set(&forward[i], span[thr_count], &i, &from); - thread_info[i].msg = xmalloc(sizeof(slurm_msg_t)); - thread_info[i].msg->msg_type = REQUEST_FILE_BCAST; - thread_info[i].msg->data = bcast_msg; - thread_info[i].msg->forward = forward[i]; - thread_info[i].msg->ret_list = NULL; - thread_info[i].msg->orig_addr.sin_addr.s_addr = 0; - thread_info[i].msg->srun_node_id= 0; - thread_info[i].rc = -1; - thread_info[i].msg->address = alloc_resp->node_addr[i]; - thr_count++; - } - xfree(span); - debug("spawning %d threads", thr_count); - - for (i=0; i<thr_count; i++) { - slurm_mutex_lock(&agent_cnt_mutex); - while (agent_cnt >= MAX_THREADS) - pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex); - agent_cnt++; - slurm_mutex_unlock(&agent_cnt_mutex); - - slurm_attr_init(&thread_info[i].attr); - if (pthread_attr_setdetachstate (&thread_info[i].attr, - PTHREAD_CREATE_JOINABLE)) - error("pthread_attr_setdetachstate error %m"); - while (pthread_create(&thread_info[i].thread, - &thread_info[i].attr, - _agent_thread, - (void *) &thread_info[i])) { - error("pthread_create error %m"); - if (++retries > MAX_RETRIES) - fatal("Can't create pthread"); - sleep(1); /* sleep and again */ - } - } - - /* wait until pthreads complete */ - slurm_mutex_lock(&agent_cnt_mutex); - while (agent_cnt) - pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex); - slurm_mutex_unlock(&agent_cnt_mutex); - - for (i=0; i<thr_count; i++) { - rc = MAX(rc, thread_info[i].rc); - xfree(thread_info[i].msg); - destroy_forward(&forward[i]); - } - xfree(from.name); - xfree(forward); - xfree(thread_info); - if (rc) - exit(1); -} - static void *_agent_thread(void *args) { List ret_list = NULL; @@ -205,3 +114,84 @@ static void *_agent_thread(void *args) return NULL; } +/* Issue the RPC to transfer the file's data */ +extern void send_rpc(file_bcast_msg_t *bcast_msg, + resource_allocation_response_msg_t *alloc_resp) +{ + /* Preserve some data structures across calls for better performance */ + static forward_t from, forward[MAX_THREADS]; + static int threads_used = 0; + static slurm_msg_t msg[MAX_THREADS]; + + int i, rc = SLURM_SUCCESS; + int retries = 0; + thd_t thread_info[MAX_THREADS]; + + if (threads_used == 0) { + hostlist_t hl; + int *span = set_span(alloc_resp->node_cnt, MAX_THREADS); + from.cnt = alloc_resp->node_cnt; + from.name = xmalloc(MAX_SLURM_NAME * alloc_resp->node_cnt); + hl = hostlist_create(alloc_resp->node_list); + for (i=0; i<alloc_resp->node_cnt; i++) { + char *host = hostlist_shift(hl); + strncpy(&from.name[MAX_SLURM_NAME*i], host, + MAX_SLURM_NAME); + free(host); + } + hostlist_destroy(hl); + from.addr = alloc_resp->node_addr; + from.node_id = NULL; + from.timeout = SLURM_MESSAGE_TIMEOUT_MSEC_STATIC; + + for (i=0; i<alloc_resp->node_cnt; i++) { + int j = i; + forward_set(&forward[threads_used], span[threads_used], + &i, &from); + msg[threads_used].msg_type = REQUEST_FILE_BCAST; + msg[threads_used].address = alloc_resp->node_addr[j]; + msg[threads_used].data = bcast_msg; + msg[threads_used].forward = forward[threads_used]; + msg[threads_used].ret_list = NULL; + msg[threads_used].orig_addr.sin_addr.s_addr = 0; + msg[threads_used].srun_node_id = 0; + threads_used++; + } + xfree(span); + debug("using %d threads", threads_used); + } + + for (i=0; i<threads_used; i++) { + pthread_attr_t attr; + slurm_mutex_lock(&agent_cnt_mutex); + agent_cnt++; + slurm_mutex_unlock(&agent_cnt_mutex); + + slurm_attr_init(&attr); + if (pthread_attr_setdetachstate (&attr, + PTHREAD_CREATE_JOINABLE)) + error("pthread_attr_setdetachstate error %m"); + thread_info[i].msg = &msg[i]; + while (pthread_create(&thread_info[i].thread, + &attr, _agent_thread, + (void *) &thread_info[i])) { + error("pthread_create error %m"); + if (++retries > MAX_RETRIES) + fatal("Can't create pthread"); + sleep(1); /* sleep and retry */ + } + pthread_attr_destroy(&attr); + } + + /* wait until pthreads complete */ + slurm_mutex_lock(&agent_cnt_mutex); + while (agent_cnt) + pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex); + slurm_mutex_unlock(&agent_cnt_mutex); + + for (i=0; i<threads_used; i++) + rc = MAX(rc, thread_info[i].rc); + + if (rc) + exit(1); +} diff --git a/src/sbcast/opts.c b/src/sbcast/opts.c index 69b6c4f3ab32d980de23cd674ed3e414c8e21060..ac866c5911885c379b54774c08fa6876d64272d7 100644 --- a/src/sbcast/opts.c +++ b/src/sbcast/opts.c @@ -123,10 +123,12 @@ extern void parse_command_line(int argc, char *argv[]) } if ((argc - optind) != 2) { - fprintf(stderr, "Missing file arguments\n"); + fprintf(stderr, "Need two file names, have %d names\n", + (argc - optind)); fprintf(stderr, "Try \"sbcast --help\" for more information\n"); exit(1); } + params.src_fname = xstrdup(argv[optind]); params.dst_fname = xstrdup(argv[optind+1]); diff --git a/src/slurmctld/agent.c b/src/slurmctld/agent.c index 3425c8496dfb85d5d34026a9513e1d2841c8f39a..9b3934be81213a87a7cdb340107eb1ee5cb8c0b4 100644 --- a/src/slurmctld/agent.c +++ b/src/slurmctld/agent.c @@ -344,7 +344,7 @@ static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr) int i; agent_info_t *agent_info_ptr; thd_t *thread_ptr; - int *span = set_span(agent_arg_ptr->node_count); + int *span = set_span(agent_arg_ptr->node_count, 0); int thr_count = 0; forward_t forward; diff --git a/src/srun/launch.c b/src/srun/launch.c index 2e80209d1001d1e3e22c57a4e7326313833f2abf..96890b804c8f215c22a8af4e6eb1a22231f564b5 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -115,7 +115,7 @@ launch(void *arg) hostlist_t hostlist = NULL; hostlist_iterator_t itr = NULL; char *host = NULL; - int *span = set_span(job->step_layout->num_hosts); + int *span = set_span(job->step_layout->num_hosts, 0); Buf buffer = NULL; update_job_state(job, SRUN_JOB_LAUNCHING);