From c5f542d9f9f0da2b191b63007407a79ef1702da8 Mon Sep 17 00:00:00 2001 From: Danny Auble <da@llnl.gov> Date: Fri, 4 Nov 2005 00:16:31 +0000 Subject: [PATCH] cleaned up things and got started on rewriting srun to handle api instead of itself. --- src/api/allocate.c | 118 ++++++- src/api/spawn.c | 365 +++++++++++++++++---- src/common/dist_tasks.c | 3 - src/common/dist_tasks.h | 10 +- src/common/env.c | 4 +- src/common/env.h | 8 +- src/plugins/switch/federation/federation.c | 5 +- src/slurmctld/node_scheduler.c | 33 +- src/slurmctld/proc_req.c | 6 +- src/slurmctld/step_mgr.c | 22 +- src/srun/Makefile.am | 2 +- src/srun/allocate.c | 16 +- src/srun/launch.c | 3 +- src/srun/launch.h | 1 + src/srun/msg.c | 5 +- src/srun/opt.c | 24 +- src/srun/opt.h | 7 +- src/srun/srun.c | 38 ++- src/srun/srun_job.c | 257 +++++++++++++-- src/srun/srun_job.h | 11 +- 20 files changed, 755 insertions(+), 183 deletions(-) diff --git a/src/api/allocate.c b/src/api/allocate.c index 6224b78cf23..a1fb82cf5d3 100644 --- a/src/api/allocate.c +++ b/src/api/allocate.c @@ -42,8 +42,13 @@ extern pid_t getsid(pid_t pid); /* missing from <unistd.h> */ #include "src/common/read_config.h" #include "src/common/slurm_protocol_api.h" +#include "src/common/hostlist.h" +#include "src/common/xmalloc.h" + +#define BUF_SIZE 1024 static int _handle_rc_msg(slurm_msg_t *msg); +static int _nodelist_from_hostfile(job_step_create_request_msg_t *req); /* * slurm_allocate_resources - allocate resources for a job request @@ -61,7 +66,6 @@ slurm_allocate_resources (job_desc_msg_t *req, slurm_msg_t resp_msg; bool host_set = false; char host[64]; - /* * set Node and session id for this request */ @@ -202,9 +206,18 @@ slurm_job_step_create (job_step_create_request_msg_t *req, { slurm_msg_t req_msg; slurm_msg_t resp_msg; + char *temp = NULL; + int count=0; req_msg.msg_type = REQUEST_JOB_STEP_CREATE; req_msg.data = req; + + if(temp = (char*)getenv("MP_PROCS")) { + if(strlen(temp)>0) { + if((count = _nodelist_from_hostfile(req)) == 0) + debug("nodelist was NULL"); + } + } if (slurm_send_recv_controller_msg(&req_msg, &resp_msg) < 0) return SLURM_ERROR; @@ -282,3 +295,106 @@ _handle_rc_msg(slurm_msg_t *msg) else return SLURM_SUCCESS; } + +static int _nodelist_from_hostfile(job_step_create_request_msg_t *req) +{ + char *hostfile = NULL; + char *hostname = NULL; + FILE *hostfilep = NULL; + char in_line[BUF_SIZE]; /* input line */ + int i, j; + int line_size; + hostlist_t hostlist = NULL; + int count; + int len = 0; + int ret = 0; + int line_num = 0; + char *nodelist = NULL; + + if (hostfile = (char *)getenv("MP_HOSTFILE")) { + if(strlen(hostfile)<1) + goto no_hostfile; + if((hostfilep = fopen(hostfile, "r")) == NULL) { + error("slurm_allocate_resources " + "error opening file %s, %m", + hostfile); + goto no_hostfile; + } + hostlist = hostlist_create(NULL); + + while (fgets (in_line, BUF_SIZE, hostfilep) != NULL) { + line_num++; + line_size = strlen(in_line); + if (line_size >= (BUF_SIZE - 1)) { + error ("Line %d, of hostfile %s too long", + line_num, hostfile); + fclose (hostfilep); + goto no_hostfile; + } + for (i = 0; i < line_size; i++) { + if (in_line[i] == '\n') { + in_line[i] = '\0'; + break; + } + if (in_line[i] == '\0') + break; + if (in_line[i] != '#') + continue; + if ((i > 0) && (in_line[i - 1] == '\\')) { + for (j = i; j < line_size; j++) { + in_line[j - 1] = in_line[j]; + } + line_size--; + continue; + } + in_line[i] = '\0'; + break; + } + + len += strlen(in_line)+1; + hostlist_push(hostlist,in_line); + if(req->num_tasks && (line_num+1)>req->num_tasks) + break; + } + fclose (hostfilep); + + nodelist = (char *)xmalloc(sizeof(char)*len); + memset(nodelist, 0, len); + + count = hostlist_count(hostlist); + if (count <= 0) { + error("Hostlist is empty!\n"); + xfree(*nodelist); + goto cleanup_hostfile; + } + + len = 0; + while (hostname = hostlist_shift(hostlist)) { + line_num = strlen(hostname)+1; + ret = sprintf(nodelist+len, + "%s,", hostname); + if (ret < 0 || ret > line_num) { + error("bad snprintf only %d printed",ret); + xfree(*nodelist); + goto cleanup_hostfile; + } + len += ret; + } + nodelist[--len] = '\0'; + debug2("Hostlist from MP_HOSTFILE = %s\n", + nodelist); + + cleanup_hostfile: + hostlist_destroy(hostlist); + + } +no_hostfile: + if(nodelist) { + if(req->node_list) + xfree(req->node_list); + req->node_list = nodelist; + req->num_tasks = count; + req->task_dist = SLURM_DIST_HOSTFILE; + } + return count; +} diff --git a/src/api/spawn.c b/src/api/spawn.c index 9dd4e80e385..0ce60fe2b0a 100644 --- a/src/api/spawn.c +++ b/src/api/spawn.c @@ -80,6 +80,16 @@ struct slurm_step_ctx_struct { uint32_t **tids; /* host id => task id mapping */ hostlist_t hl; /* hostlist of assigned nodes */ uint32_t nhosts; /* node count */ + + /*launch specific */ + uint32_t gid; /* group the job runs as */ + char *task_epilog; /* task-epilog */ + char *task_prolog; /* task-prolog */ + bool unbuffered; /* unbuffered */ + char *ofname; /* output filename */ + char *ifname; /* input filename */ + char *efname; /* error filename */ + bool parallel_debug; /* srun controlled by debugger */ }; typedef enum {DSH_NEW, DSH_ACTIVE, DSH_DONE, DSH_FAILED} state_t; @@ -103,6 +113,7 @@ static void _free_char_array(char ***argv_p, int cnt); static int _p_launch(slurm_msg_t *req, slurm_step_ctx ctx); static int _sock_bind_wild(int sockfd); static int _task_layout(slurm_step_ctx ctx); +static int _task_layout_hostfile(slurm_step_ctx ctx); static int _task_layout_block(slurm_step_ctx ctx); static int _task_layout_cyclic(slurm_step_ctx ctx); static void * _thread_per_node_rpc(void *args); @@ -123,18 +134,22 @@ slurm_step_ctx_create (job_step_create_request_msg_t *step_req) old_job_alloc_msg_t old_job_req; job_step_create_response_msg_t *step_resp = NULL; resource_allocation_response_msg_t *alloc_resp; - + char *temp = NULL; old_job_req.job_id = step_req->job_id; old_job_req.uid = getuid(); if (slurm_confirm_allocation(&old_job_req, &alloc_resp) < 0) return NULL; - + if ((slurm_job_step_create(step_req, &step_resp) < 0) || (step_resp == NULL)) { slurm_free_resource_allocation_response_msg(alloc_resp); return NULL; /* slurm errno already set */ } - + + temp = step_req->node_list; + step_req->node_list = step_resp->node_list; + step_resp->node_list = temp; + rc = xmalloc(sizeof(struct slurm_step_ctx_struct)); rc->magic = STEP_CTX_MAGIC; rc->job_id = step_req->job_id; @@ -143,8 +158,7 @@ slurm_step_ctx_create (job_step_create_request_msg_t *step_req) rc->task_dist = step_req->task_dist; rc->step_resp = step_resp; rc->alloc_resp = alloc_resp; - - rc->hl = hostlist_create(rc->step_resp->node_list); + rc->hl = hostlist_create(step_req->node_list); rc->nhosts = hostlist_count(rc->hl); (void) _task_layout(rc); @@ -174,35 +188,35 @@ slurm_step_ctx_get (slurm_step_ctx ctx, int ctx_key, ...) va_start(ap, ctx_key); switch (ctx_key) { - case SLURM_STEP_CTX_STEPID: - step_id_ptr = (uint32_t *) va_arg(ap, void *); - *step_id_ptr = ctx->step_resp->job_step_id; - break; - case SLURM_STEP_CTX_TASKS: - array_pptr = (uint32_t **) va_arg(ap, void *); - *array_pptr = ctx->tasks; - break; - - case SLURM_STEP_CTX_TID: - node_inx = va_arg(ap, uint32_t); - if ((node_inx < 0) || (node_inx > ctx->nhosts)) { - slurm_seterrno(EINVAL); - rc = SLURM_ERROR; - break; - } - array_pptr = (uint32_t **) va_arg(ap, void *); - *array_pptr = ctx->tids[node_inx]; - break; - - case SLURM_STEP_CTX_RESP: - step_resp_pptr = (job_step_create_response_msg_t **) - va_arg(ap, void *); - *step_resp_pptr = ctx->step_resp; - break; - - default: + case SLURM_STEP_CTX_STEPID: + step_id_ptr = (uint32_t *) va_arg(ap, void *); + *step_id_ptr = ctx->step_resp->job_step_id; + break; + case SLURM_STEP_CTX_TASKS: + array_pptr = (uint32_t **) va_arg(ap, void *); + *array_pptr = ctx->tasks; + break; + + case SLURM_STEP_CTX_TID: + node_inx = va_arg(ap, uint32_t); + if ((node_inx < 0) || (node_inx > ctx->nhosts)) { slurm_seterrno(EINVAL); rc = SLURM_ERROR; + break; + } + array_pptr = (uint32_t **) va_arg(ap, void *); + *array_pptr = ctx->tids[node_inx]; + break; + + case SLURM_STEP_CTX_RESP: + step_resp_pptr = (job_step_create_response_msg_t **) + va_arg(ap, void *); + *step_resp_pptr = ctx->step_resp; + break; + + default: + slurm_seterrno(EINVAL); + rc = SLURM_ERROR; } va_end(ap); @@ -246,40 +260,73 @@ slurm_step_ctx_set (slurm_step_ctx ctx, int ctx_key, ...) va_start(ap, ctx_key); switch (ctx_key) { - case SLURM_STEP_CTX_ARGS: - if (ctx->argv) - _xfree_char_array(&ctx->argv, ctx->argc); - ctx->argc = va_arg(ap, int); - if ((ctx->argc < 1) || (ctx->argc > 1024)) { - slurm_seterrno(EINVAL); - break; - } - _xcopy_char_array(&ctx->argv, va_arg(ap, char **), - ctx->argc); - break; - - case SLURM_STEP_CTX_CHDIR: - if (ctx->cwd) - xfree(ctx->cwd); - ctx->cwd = xstrdup(va_arg(ap, char *)); - break; - - case SLURM_STEP_CTX_ENV: - ctx->env_set = 1; - if (ctx->env) - _xfree_char_array(&ctx->env, ctx->envc); - ctx->envc = va_arg(ap, int); - if ((ctx->envc < 1) || (ctx->envc > 1024)) { - slurm_seterrno(EINVAL); - break; - } - _xcopy_char_array(&ctx->env, va_arg(ap, char **), - ctx->envc); + case SLURM_STEP_CTX_ARGS: + if (ctx->argv) + _xfree_char_array(&ctx->argv, ctx->argc); + ctx->argc = va_arg(ap, int); + if ((ctx->argc < 1) || (ctx->argc > 1024)) { + slurm_seterrno(EINVAL); break; - - default: + } + _xcopy_char_array(&ctx->argv, va_arg(ap, char **), + ctx->argc); + break; + + case SLURM_STEP_CTX_CHDIR: + if (ctx->cwd) + xfree(ctx->cwd); + ctx->cwd = xstrdup(va_arg(ap, char *)); + break; + + case SLURM_STEP_CTX_ENV: + ctx->env_set = 1; + if (ctx->env) + _xfree_char_array(&ctx->env, ctx->envc); + ctx->envc = va_arg(ap, int); + if ((ctx->envc < 1) || (ctx->envc > 1024)) { slurm_seterrno(EINVAL); - rc = SLURM_ERROR; + break; + } + _xcopy_char_array(&ctx->env, va_arg(ap, char **), + ctx->envc); + break; + case SLURM_STEP_CTX_GID: + ctx->gid = va_arg(ap, int); + break; + case SLURM_STEP_CTX_UNBUFFERED: + ctx->unbuffered = va_arg(ap, int); + break; + case SLURM_STEP_CTX_PDEBUG: + ctx->parallel_debug = va_arg(ap, int); + break; + case SLURM_STEP_CTX_TASK_EPILOG: + if (ctx->task_epilog) + xfree(ctx->task_epilog); + ctx->task_epilog = xstrdup(va_arg(ap, char *)); + break; + case SLURM_STEP_CTX_TASK_PROLOG: + if (ctx->task_prolog) + xfree(ctx->task_prolog); + ctx->task_prolog = xstrdup(va_arg(ap, char *)); + break; + case SLURM_STEP_CTX_OFNAME: + if (ctx->ofname) + xfree(ctx->ofname); + ctx->ofname = xstrdup(va_arg(ap, char *)); + break; + case SLURM_STEP_CTX_IFNAME: + if (ctx->ifname) + xfree(ctx->ifname); + ctx->ifname = xstrdup(va_arg(ap, char *)); + break; + case SLURM_STEP_CTX_EFNAME: + if (ctx->efname) + xfree(ctx->efname); + ctx->efname = xstrdup(va_arg(ap, char *)); + break; + default: + slurm_seterrno(EINVAL); + rc = SLURM_ERROR; } va_end(ap); @@ -348,9 +395,12 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) spawn_task_request_msg_t *msg_array_ptr; int *sock_array; slurm_msg_t *req_array_ptr; - int i, rc = SLURM_SUCCESS; + int i, j, rc = SLURM_SUCCESS; uint16_t slurmd_debug = 0; char *env_var; + hostlist_t hostlist = NULL; + hostlist_iterator_t itr = NULL; + char *host = NULL; if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC) || @@ -390,6 +440,10 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) msg_array_ptr = xmalloc(sizeof(spawn_task_request_msg_t) * ctx->nhosts); req_array_ptr = xmalloc(sizeof(slurm_msg_t) * ctx->nhosts); + + hostlist = hostlist_create(ctx->alloc_resp->node_list); + itr = hostlist_iterator_create(hostlist); + for (i=0; i<ctx->nhosts; i++) { spawn_task_request_msg_t *r = &msg_array_ptr[i]; slurm_msg_t *m = &req_array_ptr[i]; @@ -408,7 +462,6 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) r->nprocs = ctx->num_tasks; r->switch_job = ctx->step_resp->switch_job; r->slurmd_debug = slurmd_debug; - /* Task specific message contents */ r->global_task_id = ctx->tids[i][0]; r->cpus_allocated = ctx->cpus[i]; @@ -416,14 +469,28 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) r->io_port = ntohs(sock_array[i]); m->msg_type = REQUEST_SPAWN_TASK; m->data = r; - memcpy(&m->address, &ctx->alloc_resp->node_addr[i], + + j=0; + while(host = hostlist_next(itr)) { + if(!strcmp(host,ctx->host[i])) { + free(host); + break; + } + j++; + free(host); + } + debug2("using %d %s with %d tasks\n", j, ctx->host[i], + r->nprocs); + hostlist_iterator_reset(itr); + memcpy(&m->address, &ctx->alloc_resp->node_addr[j], sizeof(slurm_addr)); #if _DEBUG printf("tid=%d, fd=%d, port=%u, node_id=%u\n", ctx->tids[i][0], fd_array[i], r->io_port, i); #endif } - + hostlist_iterator_destroy(itr); + hostlist_destroy(hostlist); rc = _p_launch(req_array_ptr, ctx); xfree(msg_array_ptr); @@ -433,6 +500,114 @@ extern int slurm_spawn (slurm_step_ctx ctx, int *fd_array) return rc; } +extern int slurm_launch(slurm_step_ctx ctx, + pthread_mutex_t task_mutex, + forked_msg_t *forked_msg, + int *listenport, + slurm_addr *jaddr, + int njfds, + int num_listen) +{ + slurm_msg_t *req_array_ptr; + launch_tasks_request_msg_t *msg_array_ptr; + int i, j; + uint16_t slurmd_debug = 0; + char *env_var = NULL; + char *host = NULL; + hostlist_t hostlist = NULL; + hostlist_iterator_t itr = NULL; + int rc = SLURM_SUCCESS; + + if (_validate_ctx(ctx)) + return SLURM_ERROR; + + /* get slurmd_debug level from SLURMD_DEBUG env var */ + env_var = getenv("SLURMD_DEBUG"); + if (env_var) { + i = atoi(env_var); + if (i >= 0) + slurmd_debug = i; + } + + debug("going to launch %d tasks on %d hosts", + ctx->num_tasks, ctx->nhosts); + debug("sending to slurmd port %d", slurm_get_slurmd_port()); + + msg_array_ptr = xmalloc(sizeof(launch_tasks_request_msg_t) * + ctx->nhosts); + req_array_ptr = xmalloc(sizeof(slurm_msg_t) * ctx->nhosts); + + hostlist = hostlist_create(ctx->alloc_resp->node_list); + itr = hostlist_iterator_create(hostlist); + + for (i = 0; i < ctx->nhosts; i++) { + launch_tasks_request_msg_t *r = &msg_array_ptr[i]; + slurm_msg_t *m = &req_array_ptr[i]; + + /* Common message contents */ + r->job_id = ctx->job_id; + r->uid = ctx->user_id; + r->gid = ctx->gid; + r->argc = ctx->argc; + r->argv = ctx->argv; + r->cred = ctx->step_resp->cred; + r->job_step_id = ctx->step_resp->job_step_id; + r->envc = ctx->envc; + r->env = ctx->env; + r->cwd = ctx->cwd; + r->nnodes = ctx->nhosts; + r->nprocs = ctx->num_tasks; + r->slurmd_debug = slurmd_debug; + r->switch_job = ctx->step_resp->switch_job; + r->task_prolog = ctx->task_prolog; + r->task_epilog = ctx->task_epilog; + + r->ofname = (char *)fname_remote_string (ctx->ofname); + r->efname = (char *)fname_remote_string (ctx->efname); + r->ifname = (char *)fname_remote_string (ctx->ifname); + r->buffered_stdio = !ctx->unbuffered; + + if (ctx->parallel_debug) + r->task_flags |= TASK_PARALLEL_DEBUG; + + /* Node specific message contents */ + if (slurm_mpi_single_task_per_node ()) + r->tasks_to_launch = 1; + else + r->tasks_to_launch = ctx->tasks[i]; + r->global_task_ids = ctx->tids[i]; + r->cpus_allocated = ctx->cpus[i]; + r->srun_node_id = (uint32_t)i; + r->io_port = ntohs(listenport[i%num_listen]); + r->resp_port = ntohs(jaddr[i%njfds].sin_port); + m->msg_type = REQUEST_LAUNCH_TASKS; + m->data = r; + j=0; + while(host = hostlist_next(itr)) { + if(!strcmp(host,ctx->host[i])) { + free(host); + break; + } + j++; + free(host); + } + debug2("using %d %s with %d tasks\n", j, ctx->host[i], + r->nprocs); + hostlist_iterator_reset(itr); + + memcpy(&m->address, &ctx->alloc_resp->node_addr[j], + sizeof(slurm_addr)); + } + hostlist_iterator_destroy(itr); + hostlist_destroy(hostlist); + + rc = _p_launch(req_array_ptr, ctx); + + xfree(msg_array_ptr); + xfree(req_array_ptr); + + return rc; +} /* * slurm_spawn_kill - send the specified signal to an existing job step @@ -535,10 +710,64 @@ static int _task_layout(slurm_step_ctx ctx) if (ctx->task_dist == SLURM_DIST_CYCLIC) return _task_layout_cyclic(ctx); + else if(ctx->task_dist == SLURM_DIST_HOSTFILE) + return _task_layout_hostfile(ctx); else return _task_layout_block(ctx); } +/* use specific set run tasks on each host listed in hostfile + */ +static int _task_layout_hostfile(slurm_step_ctx ctx) +{ + int i=0, j, taskid = 0; + bool over_subscribe = false; + hostlist_iterator_t itr = NULL, itr_task = NULL; + char *host = NULL; + char *host_task = NULL; + hostlist_t job_alloc_hosts = NULL; + hostlist_t step_alloc_hosts = NULL; + + job_alloc_hosts = hostlist_create(ctx->alloc_resp->node_list); + itr = hostlist_iterator_create(job_alloc_hosts); + step_alloc_hosts = hostlist_create(ctx->step_resp->node_list); + itr_task = hostlist_iterator_create(step_alloc_hosts); + while(host = hostlist_next(itr)) { + + ctx->tasks[i] = 0; + while(host_task = hostlist_next(itr_task)) { + if(!strcmp(host, host_task)) + ctx->tasks[i]++; + } + debug2("%s got %d tasks\n", + host, + ctx->tasks[i]); + if(ctx->tasks[i] == 0) + goto reset_hosts; + ctx->tids[i] = xmalloc(sizeof(uint32_t) * ctx->tasks[i]); + hostlist_iterator_reset(itr_task); + taskid = 0; + j = 0; + while(host_task = hostlist_next(itr_task)) { + if(!strcmp(host, host_task)) { + ctx->tids[i][j] = taskid; + j++; + } + taskid++; + free(host_task); + } + i++; + reset_hosts: + hostlist_iterator_reset(itr_task); + free(host); + } + + hostlist_iterator_destroy(itr); + hostlist_iterator_destroy(itr_task); + hostlist_destroy(job_alloc_hosts); + + return SLURM_SUCCESS; +} /* to effectively deal with heterogeneous nodes, we fake a cyclic * distribution to figure out how many tasks go on each node and diff --git a/src/common/dist_tasks.c b/src/common/dist_tasks.c index d1706bf46dd..1df7deb265d 100644 --- a/src/common/dist_tasks.c +++ b/src/common/dist_tasks.c @@ -47,7 +47,6 @@ #include "src/common/log.h" #include "src/common/xmalloc.h" - /* * distribute_tasks - determine how many tasks of a job will be run on each. * node. Distribution is influenced by number of cpus on @@ -92,7 +91,6 @@ int *distribute_tasks(const char *mlist, uint16_t num_cpu_groups, i = 0; ncpus = 0; while ((this_node_name = hostlist_shift(master_hl))) { - if (hostlist_find(task_hl, this_node_name) >= 0) { if (i >= nnodes) { fatal("Internal error: duplicate nodes? " @@ -110,7 +108,6 @@ int *distribute_tasks(const char *mlist, uint16_t num_cpu_groups, } hostlist_destroy(master_hl); hostlist_destroy(task_hl); - if (num_tasks >= ncpus) { /* * Evenly overcommit tasks over the hosts diff --git a/src/common/dist_tasks.h b/src/common/dist_tasks.h index b347aebbea7..565a388bfb5 100644 --- a/src/common/dist_tasks.h +++ b/src/common/dist_tasks.h @@ -58,10 +58,10 @@ * NOTE: allocates memory that should be xfreed by caller */ int * distribute_tasks(const char *mlist, - uint16_t num_cpu_groups, - uint32_t *cpus_per_node, - uint32_t *cpu_count_reps, - const char *tlist, - uint32_t num_tasks); + uint16_t num_cpu_groups, + uint32_t *cpus_per_node, + uint32_t *cpu_count_reps, + const char *tlist, + uint32_t num_tasks); #endif /* !_DIST_TASKS_H */ diff --git a/src/common/env.c b/src/common/env.c index 409448daa35..14e85e70423 100644 --- a/src/common/env.c +++ b/src/common/env.c @@ -250,8 +250,8 @@ int setup_env(env_t *env) } if (env->distribution - && env->distribution != SRUN_DIST_UNKNOWN) { - dist = (env->distribution == SRUN_DIST_BLOCK) ? + && env->distribution != SLURM_DIST_UNKNOWN) { + dist = (env->distribution == SLURM_DIST_BLOCK) ? "block" : "cyclic"; if (setenvf(&env->env, "SLURM_DISTRIBUTION", "%s", dist)) { diff --git a/src/common/env.h b/src/common/env.h index 1fa99a1763b..8e993d2c5c2 100644 --- a/src/common/env.h +++ b/src/common/env.h @@ -33,18 +33,12 @@ #include "src/common/macros.h" -enum distribution_t { - SRUN_DIST_BLOCK = 0, - SRUN_DIST_CYCLIC = 1, - SRUN_DIST_UNKNOWN = 2 -}; - typedef struct env_options { int nprocs; /* --nprocs=n, -n n */ char *task_count; bool nprocs_set; /* true if nprocs explicitly set */ bool cpus_set; /* true if cpus_per_task explicitly set */ - enum distribution_t + enum task_dist_states distribution; /* --distribution=, -m dist */ bool overcommit; /* --overcommit, -O */ int slurmd_debug; /* --slurmd-debug, -D */ diff --git a/src/plugins/switch/federation/federation.c b/src/plugins/switch/federation/federation.c index c16d3e7dada..48b19b4f6aa 100644 --- a/src/plugins/switch/federation/federation.c +++ b/src/plugins/switch/federation/federation.c @@ -1773,12 +1773,11 @@ fed_build_jobinfo(fed_jobinfo_t *jp, hostlist_t hl, int nprocs, int min_procs_per_node; int max_procs_per_node; - debug("Allocating windows in block mode"); + debug("Allocating windows in non-cyclic mode"); nnodes = hostlist_count(hl); full_node_cnt = nprocs % nnodes; min_procs_per_node = nprocs / nnodes; max_procs_per_node = (nprocs + nnodes - 1) / nnodes; - proc_cnt = 0; _lock(); for (i = 0; i < nnodes; i++) { @@ -1790,7 +1789,7 @@ fed_build_jobinfo(fed_jobinfo_t *jp, hostlist_t hl, int nprocs, task_cnt = max_procs_per_node; else task_cnt = min_procs_per_node; - + for (j = 0; j < task_cnt; j++) { rc = _allocate_windows(jp->tables_per_task, jp->tableinfo, diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c index 237479dc699..7650655666f 100644 --- a/src/slurmctld/node_scheduler.c +++ b/src/slurmctld/node_scheduler.c @@ -1146,16 +1146,20 @@ extern void build_node_details(struct job_record *job_ptr) } job_ptr->num_cpu_groups = 0; - job_ptr->node_cnt = bit_set_count(job_ptr->node_bitmap); + + /* Use hostlist here to insure ordering of info matches that of srun */ + if ((host_list = hostlist_create(job_ptr->nodes)) == NULL) + fatal("hostlist_create error for %s: %m", this_node_name); + + job_ptr->node_cnt = hostlist_count(host_list); + xrealloc(job_ptr->cpus_per_node, (sizeof(uint32_t) * job_ptr->node_cnt)); xrealloc(job_ptr->cpu_count_reps, (sizeof(uint32_t) * job_ptr->node_cnt)); xrealloc(job_ptr->node_addr, (sizeof(slurm_addr) * job_ptr->node_cnt)); - /* Use hostlist here to insure ordering of info matches that of srun */ - if ((host_list = hostlist_create(job_ptr->nodes)) == NULL) - fatal("hostlist_create error for %s: %m", job_ptr->nodes); + job_ptr->ntask_cnt = 0; xfree(job_ptr->ntask); @@ -1176,17 +1180,16 @@ extern void build_node_details(struct job_record *job_ptr) job_ptr->ntask[cr_count++] = usable_cpus; if(error_code != SLURM_SUCCESS) { xfree(job_ptr->ntask); - free(this_node_name); - error("Invalid node %s in JobId=%u", - this_node_name, - job_ptr->job_id); + error("unable to get extra_jobinfo " + "from JobId=%u", + job_ptr->job_id); } } else if (slurmctld_conf.fast_schedule) { usable_cpus = node_ptr->config_ptr->cpus; } else { usable_cpus = node_ptr->cpus; } - + if (usable_cpus <= 0) continue; memcpy(&job_ptr->node_addr[node_inx++], @@ -1196,11 +1199,12 @@ extern void build_node_details(struct job_record *job_ptr) usable_cpus)) { cpu_inx++; job_ptr->cpus_per_node[cpu_inx] = - usable_cpus; + usable_cpus; + job_ptr->cpu_count_reps[cpu_inx] = 1; } else job_ptr->cpu_count_reps[cpu_inx]++; - + } else { error("Invalid node %s in JobId=%u", this_node_name, job_ptr->job_id); @@ -1215,10 +1219,11 @@ extern void build_node_details(struct job_record *job_ptr) } job_ptr->num_cpu_groups = cpu_inx + 1; if ((cr_enabled) && (error_code == SLURM_SUCCESS)) { - error_code = select_g_update_nodeinfo(job_ptr, SELECT_CR_USED_CPUS); + error_code = select_g_update_nodeinfo(job_ptr, + SELECT_CR_USED_CPUS); if(error_code != SLURM_SUCCESS) - error("Invalid node %s in JobId=%u", - this_node_name, job_ptr->job_id); + error("Unable to update nodeinfo for JobId=%u", + job_ptr->job_id); } } diff --git a/src/slurmctld/proc_req.c b/src/slurmctld/proc_req.c index 135bb940daf..71083e81675 100644 --- a/src/slurmctld/proc_req.c +++ b/src/slurmctld/proc_req.c @@ -1045,7 +1045,7 @@ static void _slurm_rpc_job_step_create(slurm_msg_t * msg) /* return result */ if (error_code) { unlock_slurmctld(job_write_lock); - info("_slurm_rpc_job_step_create: %s", + error("_slurm_rpc_job_step_create: %s", slurm_strerror(error_code)); slurm_send_rc_msg(msg, error_code); } else { @@ -1053,7 +1053,7 @@ static void _slurm_rpc_job_step_create(slurm_msg_t * msg) step_rec->job_ptr->job_id, step_rec->step_id, TIME_STR); job_step_resp.job_step_id = step_rec->step_id; - job_step_resp.node_list = xstrdup(step_rec->step_node_list); + job_step_resp.node_list = xstrdup(req_step_msg->node_list); job_step_resp.cred = slurm_cred; job_step_resp.switch_job = switch_copy_jobinfo( step_rec->switch_job); @@ -1279,7 +1279,7 @@ static void _slurm_rpc_old_job_alloc(slurm_msg_t * msg) slurm_strerror(error_code)); slurm_send_rc_msg(msg, error_code); } else { - debug2("_slurm_rpc_old_job_alloc JobId=%u NodeList=%s %s", + info("_slurm_rpc_old_job_alloc JobId=%u NodeList=%s %s", job_desc_msg->job_id, job_ptr->nodes, TIME_STR); /* send job_ID and node_name_ptr */ diff --git a/src/slurmctld/step_mgr.c b/src/slurmctld/step_mgr.c index 0b2c68d40f4..9b1b9646b35 100644 --- a/src/slurmctld/step_mgr.c +++ b/src/slurmctld/step_mgr.c @@ -405,8 +405,11 @@ _pick_step_nodes (struct job_record *job_ptr, step_spec->node_list, job_ptr->job_id); goto cleanup; } - } - else if (step_spec->relative) { + if(step_spec->task_dist == SLURM_DIST_HOSTFILE) { + FREE_NULL_BITMAP(nodes_avail); + return nodes_picked; + } + } else if (step_spec->relative) { /* Remove first (step_spec->relative) nodes from * available list */ bitstr_t *relative_nodes = NULL; @@ -420,14 +423,13 @@ _pick_step_nodes (struct job_record *job_ptr, bit_not (relative_nodes); bit_and (nodes_avail, relative_nodes); bit_free (relative_nodes); - } - else { + } else { nodes_picked = bit_alloc (bit_size (nodes_avail) ); if (nodes_picked == NULL) fatal("bit_alloc malloc failure"); } - /* if user specifies step needs a specific processor count and */ + /* istep_specs->node_listf user specifies step needs a specific processor count and */ /* all nodes have the same processor count, just translate this to */ /* a node count */ if (step_spec->cpu_count && (job_ptr->num_cpu_groups == 1)) { @@ -524,7 +526,8 @@ step_create ( job_step_create_request_msg_t *step_specs, return ESLURM_ALREADY_DONE; if ((step_specs->task_dist != SLURM_DIST_CYCLIC) && - (step_specs->task_dist != SLURM_DIST_BLOCK)) + (step_specs->task_dist != SLURM_DIST_BLOCK) && + (step_specs->task_dist != SLURM_DIST_HOSTFILE)) return ESLURM_BAD_DIST; if (job_ptr->kill_on_step_done) @@ -537,7 +540,7 @@ step_create ( job_step_create_request_msg_t *step_specs, if (nodeset == NULL) return ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE ; node_count = bit_set_count(nodeset); - + if (step_specs->num_tasks == NO_VAL) { if (step_specs->cpu_count != NO_VAL) step_specs->num_tasks = step_specs->cpu_count; @@ -553,7 +556,10 @@ step_create ( job_step_create_request_msg_t *step_specs, fatal ("create_step_record failed with no memory"); /* set the step_record values */ - step_ptr->step_node_list = bitmap2node_name(nodeset); + /* Here is where the node list is set for the job */ + step_ptr->step_node_list = xstrdup(step_specs->node_list); + xfree(step_specs->node_list); + step_specs->node_list = bitmap2node_name(nodeset); step_ptr->step_node_bitmap = nodeset; step_ptr->cyclic_alloc = (uint16_t) (step_specs->task_dist == SLURM_DIST_CYCLIC); diff --git a/src/srun/Makefile.am b/src/srun/Makefile.am index af334630a0d..8dfd2c5422b 100644 --- a/src/srun/Makefile.am +++ b/src/srun/Makefile.am @@ -14,7 +14,7 @@ srun_SOURCES = \ msg.c msg.h \ signals.c signals.h \ io.c io.h \ - launch.c \ + launch2.c \ launch.h \ attach.h \ attach.c \ diff --git a/src/srun/allocate.c b/src/srun/allocate.c index 96824b85f88..23eb568b6fd 100644 --- a/src/srun/allocate.c +++ b/src/srun/allocate.c @@ -472,19 +472,22 @@ _step_req_create(srun_job_t *j) r->cpu_count = opt.overcommit ? j->nhosts : (opt.nprocs*opt.cpus_per_task); r->num_tasks = opt.nprocs; - r->node_list = j->nodelist; + r->node_list = xstrdup(j->nodelist); r->network = opt.network; r->name = opt.job_name; r->relative = false; /* XXX fix this oneday */ switch (opt.distribution) { - case SRUN_DIST_UNKNOWN: + case SLURM_DIST_UNKNOWN: r->task_dist = (opt.nprocs <= j->nhosts) ? SLURM_DIST_CYCLIC : SLURM_DIST_BLOCK; break; - case SRUN_DIST_CYCLIC: + case SLURM_DIST_CYCLIC: r->task_dist = SLURM_DIST_CYCLIC; break; + case SLURM_DIST_HOSTFILE: + r->task_dist = SLURM_DIST_HOSTFILE; + break; default: /* (opt.distribution == SRUN_DIST_BLOCK) */ r->task_dist = SLURM_DIST_BLOCK; break; @@ -503,6 +506,7 @@ _step_req_destroy(job_step_create_request_msg_t *r) { if (r) { xfree(r->host); + xfree(r->node_list); xfree(r); } } @@ -512,6 +516,7 @@ create_job_step(srun_job_t *job) { job_step_create_request_msg_t *req = NULL; job_step_create_response_msg_t *resp = NULL; + char *temp = NULL; if (!(req = _step_req_create(job))) { error ("Unable to allocate step request message"); @@ -521,7 +526,10 @@ create_job_step(srun_job_t *job) error ("Unable to create job step: %m"); return -1; } - + temp = req->node_list; + req->node_list = resp->node_list; + resp->node_list = temp; + job->stepid = resp->job_step_id; job->cred = resp->cred; job->switch_job = resp->switch_job; diff --git a/src/srun/launch.c b/src/srun/launch.c index edfe7b6ef13..cebe098551d 100644 --- a/src/srun/launch.c +++ b/src/srun/launch.c @@ -313,7 +313,6 @@ static void _p_launch(slurm_msg_t *req, srun_job_t *job) thd = xmalloc (job->nhosts * sizeof (thd_t)); for (i = 0; i < job->nhosts; i++) { - if (job->ntask[i] == 0) { /* No tasks for this node */ debug("Node %s is unused",job->host[i]); job->host_state[i] = SRUN_HOST_REPLIED; @@ -494,7 +493,7 @@ _print_launch_msg(launch_tasks_request_msg_t *msg, char * hostname) int i; char tmp_str[10], task_list[4096]; - if (opt.distribution == SRUN_DIST_BLOCK) { + if (opt.distribution == SLURM_DIST_BLOCK) { sprintf(task_list, "%u-%u", msg->global_task_ids[0], msg->global_task_ids[(msg->tasks_to_launch-1)]); diff --git a/src/srun/launch.h b/src/srun/launch.h index c7d99b5d466..fc920f7fbb4 100644 --- a/src/srun/launch.h +++ b/src/srun/launch.h @@ -50,6 +50,7 @@ typedef struct launch_thr { int i; /* temporary index into array */ } launch_thr_t; +//int launch_thr_create(srun_job_t *job); int launch_thr_create(srun_job_t *job); void * launch(void *arg); diff --git a/src/srun/msg.c b/src/srun/msg.c index 14edd64d73e..7ba221d764f 100644 --- a/src/srun/msg.c +++ b/src/srun/msg.c @@ -345,8 +345,9 @@ _launch_handler(srun_job_t *job, slurm_msg_t *resp) launch_tasks_response_msg_t *msg = resp->data; pipe_enum_t pipe_enum = PIPE_HOST_STATE; - debug2("received launch resp from %s nodeid=%d", msg->node_name, - msg->srun_node_id); + debug3("received launch resp from %s nodeid=%d", + msg->node_name, + msg->srun_node_id); if (msg->return_code != 0) { diff --git a/src/srun/opt.c b/src/srun/opt.c index 58b31c91d8a..9c3b38112f4 100644 --- a/src/srun/opt.c +++ b/src/srun/opt.c @@ -156,7 +156,7 @@ static bool _under_parallel_debugger(void); static void _usage(void); static bool _valid_node_list(char **node_list_pptr); -static enum distribution_t _verify_dist_type(const char *arg); +static enum task_dist_states _verify_dist_type(const char *arg); static bool _verify_node_count(const char *arg, int *min, int *max); static int _verify_geometry(const char *arg, int *geometry); static int _verify_conn_type(const char *arg); @@ -228,17 +228,19 @@ static bool _valid_node_list(char **node_list_pptr) /* * verify that a distribution type in arg is of a known form - * returns the distribution_t or SRUN_DIST_UNKNOWN + * returns the task_dist_states or SLURM_DIST_UNKNOWN */ -static enum distribution_t _verify_dist_type(const char *arg) +static enum task_dist_states _verify_dist_type(const char *arg) { int len = strlen(arg); - enum distribution_t result = SRUN_DIST_UNKNOWN; + enum task_dist_states result = SLURM_DIST_UNKNOWN; if (strncasecmp(arg, "cyclic", len) == 0) - result = SRUN_DIST_CYCLIC; + result = SLURM_DIST_CYCLIC; else if (strncasecmp(arg, "block", len) == 0) - result = SRUN_DIST_BLOCK; + result = SLURM_DIST_BLOCK; + else if (strncasecmp(arg, "hostfile", len) == 0) + result = SLURM_DIST_HOSTFILE; return result; } @@ -461,7 +463,7 @@ static void _opt_default() opt.dependency = NO_VAL; opt.account = NULL; - opt.distribution = SRUN_DIST_UNKNOWN; + opt.distribution = SLURM_DIST_UNKNOWN; opt.ofname = NULL; opt.ifname = NULL; @@ -607,7 +609,7 @@ static void _process_env_var(env_vars_t *e, const char *val) { char *end = NULL; - enum distribution_t dt; + enum task_dist_states dt; debug2("now processing env var %s=%s", e->var, val); @@ -637,7 +639,7 @@ _process_env_var(env_vars_t *e, const char *val) case OPT_DISTRIB: dt = _verify_dist_type(val); - if (dt == SRUN_DIST_UNKNOWN) { + if (dt == SLURM_DIST_UNKNOWN) { error("\"%s=%s\" -- invalid distribution type. " "ignoring...", e->var, val); } else @@ -933,7 +935,7 @@ void set_options(const int argc, char **argv, int first) break; opt.distribution = _verify_dist_type(optarg); - if (opt.distribution == SRUN_DIST_UNKNOWN) { + if (opt.distribution == SLURM_DIST_UNKNOWN) { error("distribution type `%s' " "is not recognized", optarg); exit(1); @@ -1605,7 +1607,7 @@ static void _opt_list() info("partition : %s", opt.partition == NULL ? "default" : opt.partition); info("job name : `%s'", opt.job_name); - info("distribution : %s", format_distribution_t(opt.distribution)); + info("distribution : %s", format_task_dist_states(opt.distribution)); info("core format : %s", core_format_name (opt.core_type)); info("verbose : %d", _verbose); info("slurmd_debug : %d", opt.slurmd_debug); diff --git a/src/srun/opt.h b/src/srun/opt.h index 6d748cd4986..2a641a0cc9b 100644 --- a/src/srun/opt.h +++ b/src/srun/opt.h @@ -62,8 +62,9 @@ enum modes { enum modes mode; -#define format_distribution_t(t) (t == SRUN_DIST_BLOCK) ? "block" : \ - (t == SRUN_DIST_CYCLIC) ? "cyclic" : \ +#define format_task_dist_states(t) (t == SLURM_DIST_BLOCK) ? "block" : \ + (t == SLURM_DIST_CYCLIC) ? "cyclic" : \ + (t == SLURM_DIST_HOSTFILE) ? "hostfile" : \ "unknown" enum io_t { @@ -97,7 +98,7 @@ typedef struct srun_options { bool nodes_set; /* true if nodes explicitly set */ int time_limit; /* --time, -t */ char *partition; /* --partition=n, -p n */ - enum distribution_t + enum task_dist_states distribution; /* --distribution=, -m dist */ char *job_name; /* --job-name=, -J name */ unsigned int jobid; /* --jobid=jobid */ diff --git a/src/srun/srun.c b/src/srun/srun.c index cf70753bbe2..2951b530bf5 100644 --- a/src/srun/srun.c +++ b/src/srun/srun.c @@ -115,7 +115,7 @@ static int _run_srun_script (srun_job_t *job, char *script); int srun(int ac, char **av) { allocation_resp *resp; - srun_job_t *job; + srun_job_t *job = NULL; char *task_cnt, *bgl_part_id = NULL; int exitcode = 0; env_t *env = xmalloc(sizeof(env_t)); @@ -191,6 +191,7 @@ int srun(int ac, char **av) info ("Warning: unable to assume uid=%lu\n", opt.uid); if (_verbose) _print_job_information(resp); + job = job_create_allocation(resp); if (msg_thr_create(job) < 0) job_fatal(job, "Unable to create msg thread"); @@ -222,18 +223,24 @@ int srun(int ac, char **av) exit (0); } else { + printf("hey\n"); sig_setup_sigmask(); if ( !(resp = allocate_nodes()) ) exit(1); if (_verbose) _print_job_information(resp); - - job = job_create_allocation(resp); - if (create_job_step(job) < 0) { - srun_job_destroy(job, 0); - exit(1); - } - slurm_free_resource_allocation_response_msg(resp); + job = job_create_structure(resp); + + job->alloc_resp = resp; + build_step_ctx(job); + + /* job = job_create_allocation(resp); */ + +/* if (create_job_step(job) < 0) { */ +/* srun_job_destroy(job, 0); */ +/* exit(1); */ +/* } */ +/* slurm_free_resource_allocation_response_msg(resp); */ } /* @@ -279,9 +286,16 @@ int srun(int ac, char **av) if (sig_thr_create(job) < 0) job_fatal(job, "Unable to create signals thread: %m"); - if (launch_thr_create(job) < 0) - job_fatal(job, "Unable to create launch thread: %m"); - + /* if (launch_thr_create(job) < 0) */ +/* job_fatal(job, "Unable to create launch thread: %m"); */ + update_job_state(job, SRUN_JOB_LAUNCHING); + if (slurm_launch(job->step_ctx, job->task_mutex, job->forked_msg, + job->listenport, job->jaddr, job->njfds, + job->num_listen)) { + fatal("slurm_launch: %s", + slurm_strerror(slurm_get_errno())); + } + update_job_state(job, SRUN_JOB_STARTING); /* wait for job to terminate */ slurm_mutex_lock(&job->state_mutex); @@ -368,7 +382,7 @@ _task_count_string (srun_job_t *job) static void _switch_standalone(srun_job_t *job) { - int cyclic = (opt.distribution == SRUN_DIST_CYCLIC); + int cyclic = (opt.distribution == SLURM_DIST_CYCLIC); if (switch_alloc_jobinfo(&job->switch_job) < 0) fatal("switch_alloc_jobinfo: %m"); diff --git a/src/srun/srun_job.c b/src/srun/srun_job.c index 5c9727e33fd..ff88a161cc9 100644 --- a/src/srun/srun_job.c +++ b/src/srun/srun_job.c @@ -52,6 +52,7 @@ #include "src/srun/fname.h" #include "src/srun/attach.h" #include "src/srun/io.h" +#include "src/srun/msg.h" /* @@ -135,34 +136,6 @@ _dist_cyclic(srun_job_t *job) } } -/* - * Create an srun job structure from a resource allocation response msg - */ -srun_job_t * -job_create_allocation(resource_allocation_response_msg_t *resp) -{ - srun_job_t *job; - allocation_info_t *i = xmalloc(sizeof(*i)); - - i->nodelist = _normalize_hostlist(resp->node_list); - i->nnodes = resp->node_cnt; - i->jobid = resp->job_id; - i->stepid = NO_VAL; - i->num_cpu_groups = resp->num_cpu_groups; - i->cpus_per_node = resp->cpus_per_node; - i->cpu_count_reps = resp->cpu_count_reps; - i->addrs = resp->node_addr; - i->select_jobinfo = select_g_copy_jobinfo(resp->select_jobinfo); - - job = _job_create_internal(i); - - xfree(i->nodelist); - xfree(i); - - return (job); -} - - /* * Create an srun job structure w/out an allocation response msg. * (i.e. use the command line options) @@ -220,6 +193,225 @@ job_create_noalloc(void) } +/* + * Create an srun job structure from a resource allocation response msg + */ +extern srun_job_t * +job_create_allocation(resource_allocation_response_msg_t *resp) +{ + srun_job_t *job; + allocation_info_t *i = xmalloc(sizeof(*i)); + + i->nodelist = _normalize_hostlist(resp->node_list); + i->nnodes = resp->node_cnt; + i->jobid = resp->job_id; + i->stepid = NO_VAL; + i->num_cpu_groups = resp->num_cpu_groups; + i->cpus_per_node = resp->cpus_per_node; + i->cpu_count_reps = resp->cpu_count_reps; + i->addrs = resp->node_addr; + i->select_jobinfo = select_g_copy_jobinfo(resp->select_jobinfo); + + job = _job_create_internal(i); + + xfree(i->nodelist); + xfree(i); + + return (job); +} + +/* + * Create an srun job structure from a resource allocation response msg + */ +extern srun_job_t * +job_create_structure(resource_allocation_response_msg_t *resp) +{ + srun_job_t *job = xmalloc(sizeof(srun_job_t)); + hostlist_t hl; + int i, cpu_inx, cpu_cnt; + + slurm_mutex_init(&job->state_mutex); + pthread_cond_init(&job->state_cond, NULL); + job->state = SRUN_JOB_INIT; + + job->nhosts = resp->node_cnt; + job->signaled = false; + job->rc = -1; + job->nodelist = xstrdup(resp->node_list); + hl = hostlist_create(job->nodelist); +#ifdef HAVE_FRONT_END /* Limited job step support */ + /* All jobs execute through front-end on Blue Gene/L. + * Normally we would not permit execution of job steps, + * but can fake it by just allocating all tasks to + * one of the allocated nodes. */ + job->nhosts = 1; + opt.overcommit = true; +#else + job->nhosts = hostlist_count(hl); +#endif + + job->select_jobinfo = select_g_copy_jobinfo(resp->select_jobinfo); + job->jobid = resp->job_id; + job->stepid = NO_VAL; + job->old_job = false; + job->removed = false; + + /* + * Initialize Launch and Exit timeout values + */ + job->ltimeout = 0; + job->etimeout = 0; + + job->slurmd_addr = xmalloc(job->nhosts * sizeof(slurm_addr)); + if (resp->node_addr) + memcpy( job->slurmd_addr, resp->node_addr, + sizeof(slurm_addr)*job->nhosts); + + job->host = (char **) xmalloc(job->nhosts * sizeof(char *)); + job->cpus = (int *) xmalloc(job->nhosts * sizeof(int) ); + + /* Compute number of file descriptors / Ports needed for Job + * control info server + */ + job->njfds = _estimate_nports(opt.nprocs, 48); + job->jfd = (slurm_fd *) xmalloc(job->njfds * sizeof(slurm_fd)); + job->jaddr = (slurm_addr *) xmalloc(job->njfds * sizeof(slurm_addr)); + + debug3("njfds = %d", job->njfds); + + /* Compute number of listening sockets needed to allow + * all of the slurmds to establish IO streams with srun, without + * overstressing the TCP/IP backoff/retry algorithm + */ + job->num_listen = _estimate_nports(opt.nprocs, 64); + job->listensock = (int *) xmalloc(job->num_listen * sizeof(int)); + job->listenport = (int *) xmalloc(job->num_listen * sizeof(int)); + + job->eio = eio_handle_create(); + job->ioservers_ready = 0; + /* "nhosts" number of IO protocol sockets */ + job->ioserver = (eio_obj_t **)xmalloc(job->nhosts*sizeof(eio_obj_t *)); + job->free_incoming = list_create(NULL); /* FIXME! Needs destructor */ + for (i = 0; i < STDIO_MAX_FREE_BUF; i++) { + list_enqueue(job->free_incoming, alloc_io_buf()); + } + job->free_outgoing = list_create(NULL); /* FIXME! Needs destructor */ + for (i = 0; i < STDIO_MAX_FREE_BUF; i++) { + list_enqueue(job->free_outgoing, alloc_io_buf()); + } + + /* nhost host states */ + job->host_state = xmalloc(job->nhosts * sizeof(srun_host_state_t)); + + /* ntask task states and statii*/ + job->task_state = xmalloc(opt.nprocs * sizeof(srun_task_state_t)); + job->tstatus = xmalloc(opt.nprocs * sizeof(int)); + + slurm_mutex_init(&job->task_mutex); + + for(i = 0; i < job->nhosts; i++) { + job->host[i] = hostlist_shift(hl); + + job->cpus[i] = resp->cpus_per_node[cpu_inx]; + if ((++cpu_cnt) >= resp->cpu_count_reps[cpu_inx]) { + /* move to next record */ + cpu_inx++; + cpu_cnt = 0; + } + } + +#ifdef HAVE_FRONT_END + job->ntask = (int *) xmalloc(sizeof(int *)); + job->ntask[0] = opt.nprocs; +#else + job->ntask = distribute_tasks(job->nodelist, + resp->num_cpu_groups, + resp->cpus_per_node, + resp->cpu_count_reps, + job->nodelist, + opt.nprocs); +#endif + + job->ntasks = 0; + for (i = 0; i < job->nhosts; i++) { + debug3("distribute_tasks placed %d tasks on host %d", + job->ntask[i], i); + job->ntasks += job->ntask[i]; + } + + /* Build task id list for each host */ + job->tids = xmalloc(job->nhosts * sizeof(uint32_t *)); + job->hostid = xmalloc(opt.nprocs * sizeof(uint32_t)); + for (i = 0; i < job->nhosts; i++) + job->tids[i] = xmalloc(job->ntask[i] * sizeof(uint32_t)); + + if (opt.distribution == SLURM_DIST_UNKNOWN) { + if (opt.nprocs <= job->nhosts) + opt.distribution = SLURM_DIST_CYCLIC; + else + opt.distribution = SLURM_DIST_BLOCK; + } + + if (opt.distribution == SLURM_DIST_BLOCK) + _dist_block(job); + else + _dist_cyclic(job); + + job_update_io_fnames(job); + + hostlist_destroy(hl); + + return (job); +} + +extern int build_step_ctx(srun_job_t *job) +{ + resource_allocation_response_msg_t *resp = job->alloc_resp; + job_step_create_request_msg_t *r = NULL; + r = xmalloc(sizeof(job_step_create_request_msg_t)); + if (r == NULL) { + error("calloc error"); + return -1; + } + r->job_id = resp->job_id; + r->user_id = opt.uid; + r->node_count = resp->node_cnt; + /* Processor count not relevant to poe */ + r->cpu_count = resp->node_cnt; + r->num_tasks = opt.nprocs; + r->node_list = resp->node_list; + switch (opt.distribution) { + case SLURM_DIST_UNKNOWN: + r->task_dist = (opt.nprocs <= resp->node_cnt) + ? SLURM_DIST_CYCLIC : SLURM_DIST_BLOCK; + break; + case SLURM_DIST_CYCLIC: + r->task_dist = SLURM_DIST_CYCLIC; + break; + case SLURM_DIST_HOSTFILE: + r->task_dist = SLURM_DIST_HOSTFILE; + break; + default: /* (opt.distribution == SLURM_DIST_BLOCK) */ + r->task_dist = SLURM_DIST_BLOCK; + break; + } + + r->network = opt.network; + if (slurmctld_comm_addr.port) { + r->host = strdup(slurmctld_comm_addr.hostname); + r->port = slurmctld_comm_addr.port; + } + job->step_ctx = slurm_step_ctx_create(r); + if (job->step_ctx == NULL) { + error("slurm_step_ctx_create: %s", + slurm_strerror(slurm_get_errno())); + return -1; + } + + xfree(job->alloc_resp->node_list); + job->alloc_resp->node_list = xstrdup(r->node_list); + slurm_free_job_step_create_request_msg(r); +} void update_job_state(srun_job_t *job, srun_job_state_t state) @@ -544,14 +736,14 @@ _job_create_internal(allocation_info_t *info) for (i = 0; i < job->nhosts; i++) job->tids[i] = xmalloc(job->ntask[i] * sizeof(uint32_t)); - if (opt.distribution == SRUN_DIST_UNKNOWN) { + if (opt.distribution == SLURM_DIST_UNKNOWN) { if (opt.nprocs <= job->nhosts) - opt.distribution = SRUN_DIST_CYCLIC; + opt.distribution = SLURM_DIST_CYCLIC; else - opt.distribution = SRUN_DIST_BLOCK; + opt.distribution = SLURM_DIST_BLOCK; } - if (opt.distribution == SRUN_DIST_BLOCK) + if (opt.distribution == SLURM_DIST_BLOCK) _dist_block(job); else _dist_cyclic(job); @@ -1025,3 +1217,4 @@ _normalize_hostlist(const char *hostlist) return xstrdup(buf); } + diff --git a/src/srun/srun_job.h b/src/srun/srun_job.h index bd0ee56e866..01168d70614 100644 --- a/src/srun/srun_job.h +++ b/src/srun/srun_job.h @@ -158,6 +158,9 @@ typedef struct srun_job { /* Output streams and stdin fileno */ forked_msg_t *forked_msg; select_jobinfo_t select_jobinfo; + resource_allocation_response_msg_t *alloc_resp; + struct slurm_step_ctx_struct *step_ctx; + } srun_job_t; extern int message_thread; @@ -167,8 +170,12 @@ void job_force_termination(srun_job_t *job); srun_job_state_t job_state(srun_job_t *job); -srun_job_t * job_create_noalloc(void); -srun_job_t * job_create_allocation(resource_allocation_response_msg_t *resp); +extern srun_job_t * job_create_noalloc(void); +extern srun_job_t * job_create_allocation( + resource_allocation_response_msg_t *resp); +extern srun_job_t * job_create_structure( + resource_allocation_response_msg_t *resp); +extern int build_step_ctx(srun_job_t *job); /* * Update job filenames and modes for stderr, stdout, and stdin. -- GitLab