Skip to content
Snippets Groups Projects
Commit 47e259fe authored by Morris Jette's avatar Morris Jette
Browse files

This was relocated in previous work, merge of v2.4 left vestigial file

parent e18e3ee0
No related branches found
No related tags found
No related merge requests found
/****************************************************************************\
* srun_job.c - job data structure creation functions
*****************************************************************************
* Copyright (C) 2002-2007 The Regents of the University of California.
* Copyright (C) 2008 Lawrence Livermore National Security.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Mark Grondona <grondona@llnl.gov>.
* CODE-OCEC-09-009. All rights reserved.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.schedmd.com/slurmdocs/>.
* Please also read the included file: DISCLAIMER.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* In addition, as a special exception, the copyright holders give permission
* to link the code of portions of this program with the OpenSSL library under
* certain conditions as described in each individual source file, and
* distribute linked combinations including the two. You must obey the GNU
* General Public License in all respects for all of the code used other than
* OpenSSL. If you modify file(s) with this exception, you may extend this
* exception to your version of the file(s), but you are not obligated to do
* so. If you do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source files in
* the program, then also delete it here.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <netdb.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <signal.h>
#include "src/common/bitstring.h"
#include "src/common/cbuf.h"
#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/io_hdr.h"
#include "src/common/forward.h"
#include "src/common/fd.h"
#include "src/srun/srun_job.h"
#include "src/srun/opt.h"
#include "src/srun/fname.h"
#include "src/srun/debugger.h"
/*
* allocation information structure used to store general information
* about node allocation to be passed to _job_create_structure()
*/
typedef struct allocation_info {
char *alias_list;
uint16_t *cpus_per_node;
uint32_t *cpu_count_reps;
uint32_t jobid;
uint32_t nnodes;
char *nodelist;
uint32_t num_cpu_groups;
dynamic_plugin_data_t *select_jobinfo;
uint32_t stepid;
} allocation_info_t;
/*
* Prototypes:
*/
static inline int _estimate_nports(int nclients, int cli_per_port);
static int _compute_task_count(allocation_info_t *info);
static void _set_ntasks(allocation_info_t *info);
static srun_job_t *_job_create_structure(allocation_info_t *info);
static char * _normalize_hostlist(const char *hostlist);
/*
* Create an srun job structure w/out an allocation response msg.
* (i.e. use the command line options)
*/
srun_job_t *
job_create_noalloc(void)
{
srun_job_t *job = NULL;
allocation_info_t *ai = xmalloc(sizeof(allocation_info_t));
uint16_t cpn = 1;
hostlist_t hl = hostlist_create(opt.nodelist);
if (!hl) {
error("Invalid node list `%s' specified", opt.nodelist);
goto error;
}
srand48(getpid());
ai->jobid = MIN_NOALLOC_JOBID +
((uint32_t) lrand48() %
(MAX_NOALLOC_JOBID - MIN_NOALLOC_JOBID + 1));
ai->stepid = (uint32_t) (lrand48());
ai->nodelist = opt.nodelist;
ai->nnodes = hostlist_count(hl);
hostlist_destroy(hl);
cpn = (opt.ntasks + ai->nnodes - 1) / ai->nnodes;
ai->cpus_per_node = &cpn;
ai->cpu_count_reps = &ai->nnodes;
/*
* Create job, then fill in host addresses
*/
job = _job_create_structure(ai);
job_update_io_fnames(job);
error:
xfree(ai);
return (job);
}
/*
* Create an srun job structure for a step w/out an allocation response msg.
* (i.e. inside an allocation)
*/
srun_job_t *
job_step_create_allocation(resource_allocation_response_msg_t *resp)
{
uint32_t job_id = resp->job_id;
srun_job_t *job = NULL;
allocation_info_t *ai = xmalloc(sizeof(allocation_info_t));
hostlist_t hl = NULL;
char *buf = NULL;
int count = 0;
uint32_t alloc_count = 0;
char *step_nodelist = NULL;
ai->jobid = job_id;
ai->stepid = NO_VAL;
ai->alias_list = resp->alias_list;
ai->nodelist = opt.alloc_nodelist;
hl = hostlist_create(ai->nodelist);
hostlist_uniq(hl);
alloc_count = hostlist_count(hl);
ai->nnodes = alloc_count;
hostlist_destroy(hl);
if (opt.exc_nodes) {
hostlist_t exc_hl = hostlist_create(opt.exc_nodes);
hostlist_t inc_hl = NULL;
char *node_name = NULL;
hl = hostlist_create(ai->nodelist);
if(opt.nodelist) {
inc_hl = hostlist_create(opt.nodelist);
}
hostlist_uniq(hl);
//info("using %s or %s", opt.nodelist, ai->nodelist);
while ((node_name = hostlist_shift(exc_hl))) {
int inx = hostlist_find(hl, node_name);
if (inx >= 0) {
debug("excluding node %s", node_name);
hostlist_delete_nth(hl, inx);
ai->nnodes--; /* decrement node count */
}
if (inc_hl) {
inx = hostlist_find(inc_hl, node_name);
if (inx >= 0) {
error("Requested node %s is also "
"in the excluded list.",
node_name);
error("Job not submitted.");
hostlist_destroy(exc_hl);
hostlist_destroy(inc_hl);
goto error;
}
}
free(node_name);
}
hostlist_destroy(exc_hl);
/* we need to set this here so if there are more nodes
* available than we requested we can set it
* straight. If there is no exclude list then we set
* the vars then.
*/
if (!opt.nodes_set) {
/* we don't want to set the number of nodes =
* to the number of requested processes unless we
* know it is less than the number of nodes
* in the allocation
*/
if(opt.ntasks_set && (opt.ntasks < ai->nnodes))
opt.min_nodes = opt.ntasks;
else
opt.min_nodes = ai->nnodes;
opt.nodes_set = true;
}
if (!opt.max_nodes)
opt.max_nodes = opt.min_nodes;
if ((opt.max_nodes > 0) && (opt.max_nodes < ai->nnodes))
ai->nnodes = opt.max_nodes;
count = hostlist_count(hl);
if (!count) {
error("Hostlist is now nothing! Can't run job.");
hostlist_destroy(hl);
goto error;
}
if (inc_hl) {
count = hostlist_count(inc_hl);
if (count < ai->nnodes) {
/* add more nodes to get correct number for
allocation */
hostlist_t tmp_hl = hostlist_copy(hl);
int i=0;
int diff = ai->nnodes - count;
buf = hostlist_ranged_string_xmalloc(inc_hl);
hostlist_delete(tmp_hl, buf);
xfree(buf);
while ((node_name = hostlist_shift(tmp_hl)) &&
(i < diff)) {
hostlist_push(inc_hl, node_name);
i++;
}
hostlist_destroy(tmp_hl);
}
buf = hostlist_ranged_string_xmalloc(inc_hl);
hostlist_destroy(inc_hl);
xfree(opt.nodelist);
opt.nodelist = buf;
} else {
if (count > ai->nnodes) {
/* remove more nodes than needed for
* allocation */
int i;
for (i = count; i >= ai->nnodes; i--)
hostlist_delete_nth(hl, i);
}
xfree(opt.nodelist);
opt.nodelist = hostlist_ranged_string_xmalloc(hl);
}
hostlist_destroy(hl);
} else {
if (!opt.nodes_set) {
/* we don't want to set the number of nodes =
* to the number of requested processes unless we
* know it is less than the number of nodes
* in the allocation
*/
if (opt.ntasks_set && (opt.ntasks < ai->nnodes))
opt.min_nodes = opt.ntasks;
else
opt.min_nodes = ai->nnodes;
opt.nodes_set = true;
}
if (!opt.max_nodes)
opt.max_nodes = opt.min_nodes;
if ((opt.max_nodes > 0) && (opt.max_nodes < ai->nnodes))
ai->nnodes = opt.max_nodes;
/* Don't reset the ai->nodelist because that is the
* nodelist we want to say the allocation is under
* opt.nodelist is what is used for the allocation.
*/
/* xfree(ai->nodelist); */
/* ai->nodelist = xstrdup(buf); */
}
/* get the correct number of hosts to run tasks on */
if (opt.nodelist)
step_nodelist = opt.nodelist;
else if ((opt.distribution == SLURM_DIST_ARBITRARY) && (count == 0))
step_nodelist = getenv("SLURM_ARBITRARY_NODELIST");
if (step_nodelist) {
hl = hostlist_create(step_nodelist);
if (opt.distribution != SLURM_DIST_ARBITRARY)
hostlist_uniq(hl);
if (!hostlist_count(hl)) {
error("Hostlist is now nothing! Can not run job.");
hostlist_destroy(hl);
goto error;
}
buf = hostlist_ranged_string_xmalloc(hl);
count = hostlist_count(hl);
hostlist_destroy(hl);
/* Don't reset the ai->nodelist because that is the
* nodelist we want to say the allocation is under
* opt.nodelist is what is used for the allocation.
*/
/* xfree(ai->nodelist); */
/* ai->nodelist = xstrdup(buf); */
xfree(opt.nodelist);
opt.nodelist = buf;
}
if ((opt.distribution == SLURM_DIST_ARBITRARY) &&
(count != opt.ntasks)) {
error("You asked for %d tasks but specified %d nodes",
opt.ntasks, count);
goto error;
}
if (ai->nnodes == 0) {
error("No nodes in allocation, can't run job");
goto error;
}
ai->num_cpu_groups = resp->num_cpu_groups;
ai->cpus_per_node = resp->cpus_per_node;
ai->cpu_count_reps = resp->cpu_count_reps;
/* info("looking for %d nodes out of %s with a must list of %s", */
/* ai->nnodes, ai->nodelist, opt.nodelist); */
/*
* Create job
*/
job = _job_create_structure(ai);
error:
xfree(ai);
return (job);
}
/*
* Create an srun job structure from a resource allocation response msg
*/
extern srun_job_t *
job_create_allocation(resource_allocation_response_msg_t *resp)
{
srun_job_t *job;
allocation_info_t *i = xmalloc(sizeof(allocation_info_t));
i->alias_list = resp->alias_list;
i->nodelist = _normalize_hostlist(resp->node_list);
i->nnodes = resp->node_cnt;
i->jobid = resp->job_id;
i->stepid = NO_VAL;
i->num_cpu_groups = resp->num_cpu_groups;
i->cpus_per_node = resp->cpus_per_node;
i->cpu_count_reps = resp->cpu_count_reps;
i->select_jobinfo = select_g_select_jobinfo_copy(resp->select_jobinfo);
job = _job_create_structure(i);
xfree(i->nodelist);
xfree(i);
return (job);
}
void
update_job_state(srun_job_t *job, srun_job_state_t state)
{
pthread_mutex_lock(&job->state_mutex);
if (job->state < state) {
job->state = state;
pthread_cond_signal(&job->state_cond);
}
pthread_mutex_unlock(&job->state_mutex);
return;
}
srun_job_state_t
job_state(srun_job_t *job)
{
srun_job_state_t state;
slurm_mutex_lock(&job->state_mutex);
state = job->state;
slurm_mutex_unlock(&job->state_mutex);
return state;
}
void
job_force_termination(srun_job_t *job)
{
static int kill_sent = 0;
static time_t last_msg = 0;
if (kill_sent == 0) {
info("forcing job termination");
/* Sends SIGKILL to tasks directly */
update_job_state(job, SRUN_JOB_FORCETERM);
} else {
time_t now = time(NULL);
if (last_msg != now) {
info("job abort in progress");
last_msg = now;
}
if (kill_sent == 1) {
/* Try sending SIGKILL through slurmctld */
slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
}
}
kill_sent++;
}
static inline int
_estimate_nports(int nclients, int cli_per_port)
{
div_t d;
d = div(nclients, cli_per_port);
return d.rem > 0 ? d.quot + 1 : d.quot;
}
static int
_compute_task_count(allocation_info_t *ainfo)
{
int i, cnt = 0;
#if defined HAVE_BGQ
//#if defined HAVE_BGQ && HAVE_BG_FILES
/* always return the ntasks here for Q */
info("returning %d", opt.ntasks);
return opt.ntasks;
#endif
if (opt.cpus_set) {
for (i = 0; i < ainfo->num_cpu_groups; i++)
cnt += ( ainfo->cpu_count_reps[i] *
(ainfo->cpus_per_node[i]/opt.cpus_per_task));
}
return (cnt < ainfo->nnodes) ? ainfo->nnodes : cnt;
}
static void
_set_ntasks(allocation_info_t *ai)
{
if (!opt.ntasks_set) {
opt.ntasks = _compute_task_count(ai);
if (opt.cpus_set)
opt.ntasks_set = true; /* implicit */
}
}
/*
* Create an srun job structure from a resource allocation response msg
*/
static srun_job_t *
_job_create_structure(allocation_info_t *ainfo)
{
srun_job_t *job = xmalloc(sizeof(srun_job_t));
int i;
_set_ntasks(ainfo);
debug2("creating job with %d tasks", opt.ntasks);
slurm_mutex_init(&job->state_mutex);
pthread_cond_init(&job->state_cond, NULL);
job->state = SRUN_JOB_INIT;
job->alias_list = xstrdup(ainfo->alias_list);
job->nodelist = xstrdup(ainfo->nodelist);
job->stepid = ainfo->stepid;
#if defined HAVE_BGQ
//#if defined HAVE_BGQ && defined HAVE_BG_FILES
job->nhosts = ainfo->nnodes;
select_g_alter_node_cnt(SELECT_APPLY_NODE_MAX_OFFSET, &job->nhosts);
#elif defined HAVE_FRONT_END /* Limited job step support */
opt.overcommit = true;
job->nhosts = 1;
#else
job->nhosts = ainfo->nnodes;
#endif
#if !defined HAVE_FRONT_END || (defined HAVE_BGQ)
//#if !defined HAVE_FRONT_END || (defined HAVE_BGQ && defined HAVE_BG_FILES)
if(opt.min_nodes > job->nhosts) {
error("Only allocated %d nodes asked for %d",
job->nhosts, opt.min_nodes);
if (opt.exc_nodes) {
/* When resources are pre-allocated and some nodes
* are explicitly excluded, this error can occur. */
error("Are required nodes explicitly excluded?");
}
return NULL;
}
if ((ainfo->cpus_per_node == NULL) ||
(ainfo->cpu_count_reps == NULL)) {
error("cpus_per_node array is not set");
return NULL;
}
#endif
job->select_jobinfo = ainfo->select_jobinfo;
job->jobid = ainfo->jobid;
job->ntasks = opt.ntasks;
for (i=0; i<ainfo->num_cpu_groups; i++) {
job->cpu_count += ainfo->cpus_per_node[i] *
ainfo->cpu_count_reps[i];
}
job->rc = -1;
job_update_io_fnames(job);
return (job);
}
void
job_update_io_fnames(srun_job_t *job)
{
job->ifname = fname_create(job, opt.ifname);
job->ofname = fname_create(job, opt.ofname);
job->efname = opt.efname ? fname_create(job, opt.efname) : job->ofname;
}
static char *
_normalize_hostlist(const char *hostlist)
{
char *buf = NULL;
hostlist_t hl = hostlist_create(hostlist);
if (hl) {
buf = hostlist_ranged_string_xmalloc(hl);
hostlist_destroy(hl);
}
if (!buf)
return xstrdup(hostlist);
return buf;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment