Skip to content
Snippets Groups Projects
Commit c3df43e4 authored by tewk's avatar tewk
Browse files

moved job submitt and job allocate to new comm layer

parent acd89190
No related branches found
No related tags found
No related merge requests found
......@@ -195,29 +195,10 @@ void slurm_free_node_table ( node_table_t * node )
}
#define SLURM_JOB_DESC_NONCONTIGUOUS 0
#define SLURM_JOB_DESC_CONTIGUOUS 1
#define SLURM_JOB_DESC_DEFAULT_FEATURES "NONE"
#define SLURM_JOB_DESC_DEFAULT_GROUPS "SET_BY_API"
#define SLURM_JOB_DESC_DEFAULT_JOB_ID 0
#define SLURM_JOB_DESC_DEFAULT_JOB_NAME NULL
#define SLURM_JOB_DESC_DEFAULT_PARITION_KEY NULL
#define SLURM_JOB_DESC_DEFAULT_MIN_PROCS 0
#define SLURM_JOB_DESC_DEFAULT_MIN_MEMORY 0
#define SLURM_JOB_DESC_DEFAULT_MIN_TMP_DISK 0
#define SLURM_JOB_DESC_DEFAULT_PARTITION NULL
#define SLURM_JOB_DESC_DEFAULT_PRIORITY 0xfffffffe
#define SLURM_JOB_DESC_DEFAULT_REQ_NODES NULL
#define SLURM_JOB_DESC_DEFAULT_JOB_SCRIPT NULL
#define SLURM_JOB_DESC_DEFAULT_SHARED 0
#define SLURM_JOB_DESC_DEFAULT_TIME_LIMIT 0xfffffffe
#define SLURM_JOB_DESC_DEFAULT_NUM_PROCS 0
#define SLURM_JOB_DESC_DEFAULT_NUM_NODES 0
#define SLURM_JOB_DESC_DEFAULT_USER_ID 0
void slurm_init_job_desc_msg ( job_desc_msg_t * job_desc_msg )
{
job_desc_msg -> contiguous = SLURM_JOB_DESC_NONCONTIGUOUS ;
job_desc_msg -> contiguous = SLURM_JOB_DESC_DEFAULT_CONTIGUOUS ;
job_desc_msg -> features = SLURM_JOB_DESC_DEFAULT_FEATURES ;
job_desc_msg -> groups = SLURM_JOB_DESC_DEFAULT_GROUPS ; /* will be set by api */
job_desc_msg -> job_id = SLURM_JOB_DESC_DEFAULT_JOB_ID ; /* will be set by api */
......
......@@ -147,6 +147,8 @@ typedef struct job_desc_msg { /* Job descriptor for submit, allocate, and upd
* * value is #fffffffe */
uint32_t num_procs; /* number of processors required by job, default=0 */
uint32_t num_nodes; /* number of nodes required by job, default=0 */
uint32_t dist;
uint32_t procs_per_task;
uint32_t user_id; /* set only if different from current UID, default set
* to UID by API, can only be set if user is root */
} job_desc_msg_t ;
......@@ -279,6 +281,29 @@ void inline slurm_free_node_table ( node_table_t * node ) ;
void inline slurm_free_node_table_msg ( node_table_t * node ) ;
/* stuct init functions */
#define SLURM_JOB_DESC_NONCONTIGUOUS 0
#define SLURM_JOB_DESC_CONTIGUOUS 1
#define SLURM_JOB_DESC_DEFAULT_CONTIGUOUS SLURM_JOB_DESC_NONCONTIGUOUS
#define SLURM_JOB_DESC_DEFAULT_FEATURES NULL
#define SLURM_JOB_DESC_DEFAULT_GROUPS NULL
#define SLURM_JOB_DESC_DEFAULT_JOB_ID NO_VAL
#define SLURM_JOB_DESC_DEFAULT_JOB_NAME NULL
#define SLURM_JOB_DESC_DEFAULT_PARITION_KEY NULL
#define SLURM_JOB_DESC_DEFAULT_MIN_PROCS NO_VAL
#define SLURM_JOB_DESC_DEFAULT_MIN_MEMORY NO_VAL
#define SLURM_JOB_DESC_DEFAULT_MIN_TMP_DISK NO_VAL
#define SLURM_JOB_DESC_DEFAULT_PARTITION NULL
#define SLURM_JOB_DESC_DEFAULT_PRIORITY NO_VAL
#define SLURM_JOB_DESC_DEFAULT_REQ_NODES NULL
#define SLURM_JOB_DESC_DEFAULT_JOB_SCRIPT NULL
#define SLURM_JOB_DESC_SHARED 1
#define SLURM_JOB_DESC_NOT_SHARED 0
#define SLURM_JOB_DESC_FORCED_SHARED 2
#define SLURM_JOB_DESC_DEFAULT_SHARED SLURM_JOB_DESC_NOT_SHARED
#define SLURM_JOB_DESC_DEFAULT_TIME_LIMIT NO_VAL
#define SLURM_JOB_DESC_DEFAULT_NUM_PROCS NO_VAL
#define SLURM_JOB_DESC_DEFAULT_NUM_NODES NO_VAL
#define SLURM_JOB_DESC_DEFAULT_USER_ID NO_VAL
void slurm_init_job_desc_msg ( job_desc_msg_t * job_desc_msg ) ;
#endif
......@@ -13,6 +13,7 @@
/* general return codes */
#define SLURM_SUCCESS 0
#define SLURM_ERROR -1
#define SLURM_FAILURE -1
......
......@@ -594,6 +594,8 @@ void pack_job_desc ( job_desc_msg_t * job_desc_ptr, void ** buf_ptr , int * buff
pack32 (job_desc_ptr->num_procs, buf_ptr, buffer_size);
pack32 (job_desc_ptr->num_nodes, buf_ptr, buffer_size);
pack32 (job_desc_ptr->dist, buf_ptr, buffer_size);
pack32 (job_desc_ptr->procs_per_task, buf_ptr, buffer_size);
pack32 (job_desc_ptr->user_id, buf_ptr, buffer_size);
}
......@@ -642,6 +644,8 @@ int unpack_job_desc ( job_desc_msg_t **job_desc_buffer_ptr, void ** buf_ptr , in
unpack32 (&job_desc_ptr->num_procs, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->num_nodes, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->dist, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->procs_per_task, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->user_id, buf_ptr, buffer_size);
*job_desc_buffer_ptr = job_desc_ptr ;
......
......@@ -475,7 +475,7 @@ void slurm_rpc_allocate_resources ( slurm_msg_t * msg )
/* do RPC call */
error_code = job_allocate(job_desc_msg, /* skip "Allocate" */
&job_id, &node_name_ptr);
&job_id, &node_name_ptr, false , false );
/* return result */
if (error_code)
......
......@@ -18,16 +18,20 @@
#include "list.h"
#include "pack.h"
#include "slurmctld.h"
#include "xstring.h"
#include <src/common/slurm_errno.h>
#define BUF_SIZE 1024
#define MAX_STR_PACK 128
#define SLURM_CREATE_JOB_FLAG_NO_ALLOCATE_0 0
int job_count; /* job's in the system */
List job_list = NULL; /* job_record list */
time_t last_job_update; /* time of last update to job records */
static pthread_mutex_t job_mutex = PTHREAD_MUTEX_INITIALIZER; /* lock for job info */
char *job_state_string[] =
{ "PENDING", "STAGE_IN", "RUNNING", "STAGE_OUT", "COMPLETED", "FAILED", "TIME_OUT", "END" };
{ "PENDING", "STAGE_IN", "RUNNING", "STAGE_OUT", "COMPLETED", "FAILED", "TIME_OUT", "END" };
static struct job_record *job_hash[MAX_JOB_COUNT];
static struct job_record *job_hash_over[MAX_JOB_COUNT];
static int max_hash_over = 0;
......@@ -38,10 +42,12 @@ int list_find_job_old (void *job_entry, void *key);
void set_job_id (struct job_record *job_ptr);
void set_job_prio (struct job_record *job_ptr);
int top_priority (struct job_record *job_ptr);
int copy_job_desc_to_job_record ( job_desc_msg_t * job_desc , struct job_record ** job_ptr , struct part_record *part_ptr, bitstr_t *req_bitmap) ;
int validate_job_desc ( job_desc_msg_t * job_desc_msg , int allocate ) ;
#if DEBUG_MODULE
/* main is used here for module testing purposes only */
int
int
main (int argc, char *argv[])
{
int dump_size, error_code, error_count = 0, i;
......@@ -114,7 +120,7 @@ main (int argc, char *argv[])
}
else
printf ("found job %u, script=%s\n",
job_rec->job_id, job_rec->details->job_script);
job_rec->job_id, job_rec->details->job_script);
error_code = delete_job_record (tmp_id);
if (error_code) {
......@@ -145,7 +151,7 @@ main (int argc, char *argv[])
* NOTE: allocates memory that should be xfreed with either
* delete_job_record or list_delete_job
*/
struct job_record *
struct job_record *
create_job_record (int *error_code)
{
struct job_record *job_record_point;
......@@ -183,7 +189,7 @@ create_job_record (int *error_code)
* delete_job_details - delete a job's detail record and clear it's pointer
* input: job_entry - pointer to job_record to clear the record of
*/
void
void
delete_job_details (struct job_record *job_entry)
{
if (job_entry->details == NULL)
......@@ -215,7 +221,7 @@ delete_job_details (struct job_record *job_entry)
* This would be faster with hash table and doubly linked list. We intend to
* purge entries through purge_old_job() anyway.
*/
int
int
delete_job_record (uint32_t job_id)
{
ListIterator job_record_iterator;
......@@ -225,7 +231,7 @@ delete_job_record (uint32_t job_id)
job_record_iterator = list_iterator_create (job_list);
while ((job_record_point =
(struct job_record *) list_next (job_record_iterator))) {
(struct job_record *) list_next (job_record_iterator))) {
if (job_record_point->job_id != job_id)
continue;
......@@ -239,7 +245,7 @@ delete_job_record (uint32_t job_id)
if (job_record_point == NULL) {
error ("delete_job_record: attempt to delete non-existent job %u",
job_id);
job_id);
return ENOENT;
}
return 0;
......@@ -253,20 +259,20 @@ delete_job_record (uint32_t job_id)
* global: job_list - global job list pointer
* job_hash, job_hash_over, max_hash_over - hash table into job records
*/
struct job_record *
struct job_record *
find_job_record(uint32_t job_id)
{
int i;
/* First try to find via hash table */
if (job_hash[job_id % MAX_JOB_COUNT] &&
job_hash[job_id % MAX_JOB_COUNT]->job_id == job_id)
job_hash[job_id % MAX_JOB_COUNT]->job_id == job_id)
return job_hash[job_id % MAX_JOB_COUNT];
/* linear search of overflow hash table overflow */
for (i=0; i<max_hash_over; i++) {
if (job_hash_over[i] != NULL &&
job_hash_over[i]->job_id == job_id)
job_hash_over[i]->job_id == job_id)
return job_hash_over[i];
}
......@@ -282,7 +288,7 @@ find_job_record(uint32_t job_id)
* global: last_job_update - time of last job table update
* job_list - pointer to global job list
*/
int
int
init_job_conf ()
{
if (job_list == NULL) {
......@@ -313,24 +319,31 @@ init_job_conf ()
* last_job_update - time of last job table update
* NOTE: the calling program must xfree the memory pointed to by node_list
*/
int immediate_job_launch (job_desc_msg_t * job_specs, uint32_t *new_job_id, char **node_list, int immediate , int will_run )
{
return job_allocate (job_specs, new_job_id, node_list, true , false );
}
int will_job_run (job_desc_msg_t * job_specs, uint32_t *new_job_id, char **node_list, int immediate , int will_run )
{
return job_allocate (job_specs, new_job_id, node_list, false , true );
}
int
job_allocate (char *job_specs, uint32_t *new_job_id, char **node_list)
job_allocate (job_desc_msg_t * job_specs, uint32_t *new_job_id, char **node_list, int immediate , int will_run )
{
int error_code, immediate, will_run;
int error_code;
struct job_record *job_ptr;
node_list[0] = NULL;
immediate = will_run = 0;
(void) load_integer (&immediate, "Immediate", job_specs);
(void) load_integer (&will_run, "WillRun", job_specs);
error_code = job_create (job_specs, new_job_id, 1, will_run, &job_ptr);
if (error_code || will_run)
return error_code;
if (job_ptr == NULL)
fatal ("job_allocate: allocated job %u lacks record",
new_job_id);
new_job_id);
last_job_update = time (NULL);
if (immediate == 0) {
......@@ -370,7 +383,7 @@ job_allocate (char *job_specs, uint32_t *new_job_id, char **node_list)
* global: job_list - pointer global job list
* last_job_update - time of last job table update
*/
int
int
job_cancel (uint32_t job_id)
{
struct job_record *job_ptr;
......@@ -399,12 +412,11 @@ job_cancel (uint32_t job_id)
}
info ("job_cancel: job %u can't be cancelled from state=%s",
job_id, job_state_string[job_ptr->job_state]);
job_id, job_state_string[job_ptr->job_state]);
return EAGAIN;
}
/*
* job_create - parse the suppied job specification and create job_records for it
* input: job_specs - job specifications
......@@ -420,201 +432,140 @@ job_cancel (uint32_t job_id)
* default_part_loc - pointer to default partition
* job_hash, job_hash_over, max_hash_over - hash table into job records
*/
int
job_create (char *job_specs, uint32_t *new_job_id, int allocate,
int will_run, struct job_record **job_rec_ptr)
job_create ( job_desc_msg_t *job_desc, uint32_t *new_job_id, int allocate,
int will_run, struct job_record **job_rec_ptr)
{
char *req_features, *req_node_list, *job_name, *req_group;
char *req_partition, *script;
int contiguous, req_cpus, req_nodes, min_cpus, min_memory;
int i, min_tmp_disk, time_limit, procs_per_task, user_id;
int error_code, dist, key, shared;
long job_id;
int i;
int error_code;
uint32_t key;
int shared;
struct part_record *part_ptr;
struct job_record *job_ptr;
struct job_details *detail_ptr;
int priority;
bitstr_t *req_bitmap;
req_features = req_node_list = job_name = req_group = NULL;
req_partition = script = NULL;
req_bitmap = NULL;
contiguous = dist = req_cpus = req_nodes = min_cpus = NO_VAL;
min_memory = min_tmp_disk = time_limit = procs_per_task = NO_VAL;
key = shared = user_id = NO_VAL;
job_id = (long) NO_VAL;
priority = NO_VAL;
/* setup and basic parsing */
error_code =
parse_job_specs (job_specs, &req_features, &req_node_list,
&job_name, &req_group, &req_partition,
&contiguous, &req_cpus, &req_nodes,
&min_cpus, &min_memory, &min_tmp_disk, &key,
&shared, &dist, &script, &time_limit,
&procs_per_task, &job_id, &priority, &user_id);
if (error_code != 0) {
error_code = EINVAL; /* permanent error, invalid parsing */
error ("job_create: parsing failure on %s", job_specs);
goto cleanup;
}
if ((req_cpus == NO_VAL) && (req_nodes == NO_VAL) &&
(req_node_list == NULL)) {
info ("job_create: job failed to specify ReqNodes, TotalNodes or TotalProcs");
error_code = EINVAL;
goto cleanup;
}
if (allocate == 0 && script == NULL) {
info ("job_create: job failed to specify Script");
error_code = EINVAL;
goto cleanup;
}
if (user_id == NO_VAL) {
info ("job_create: job failed to specify User");
error_code = EINVAL;
goto cleanup;
}
if (job_name && strlen(job_name) > MAX_NAME_LEN) {
info ("job_create: job name %s too long", job_name);
error_code = EINVAL;
goto cleanup;
}
if (contiguous == NO_VAL)
contiguous = 0; /* default not contiguous */
if (job_id != NO_VAL &&
find_job_record ((uint32_t) job_id)) {
info ("job_create: Duplicate job id %d", job_id);
error_code = EINVAL;
goto cleanup;
}
if (req_cpus == NO_VAL)
req_cpus = 1; /* default cpu count of 1 */
if (req_nodes == NO_VAL)
req_nodes = 1; /* default node count of 1 */
if (min_memory == NO_VAL)
min_memory = 1; /* default is 1 MB memory per node */
if (min_tmp_disk == NO_VAL)
min_tmp_disk = 1; /* default is 1 MB disk per node */
if (shared == NO_VAL)
shared = 0; /* default is not shared nodes */
if (dist == NO_VAL)
dist = DIST_BLOCK; /* default is block distribution */
if (procs_per_task == NO_VAL)
procs_per_task = 1; /* default is 1 processor per task */
else if (procs_per_task <= 0) {
info ("job_create: Invalid procs_per_task");
error_code = EINVAL;
goto cleanup;
}
if (min_cpus == NO_VAL)
min_cpus = 1; /* default is 1 processor per node */
if (min_cpus < procs_per_task) {
info ("job_create: min_cpus < procs_per_task, reset to equal");
min_cpus = procs_per_task;
}
validate_job_desc ( job_desc , allocate ) ;
/* find selected partition */
if (req_partition) {
if (job_desc->partition) {
part_ptr = list_find_first (part_list, &list_find_part,
req_partition);
job_desc->partition);
if (part_ptr == NULL) {
info ("job_create: invalid partition specified: %s",
req_partition);
job_desc->partition);
error_code = EINVAL;
goto cleanup;
return error_code ;
}
xfree (req_partition);
}
else {
if (default_part_loc == NULL) {
error ("job_create: default partition not set.");
error_code = EINVAL;
goto cleanup;
return error_code ;
}
part_ptr = default_part_loc;
}
if (time_limit == NO_VAL) /* Default time_limit is partition maximum */
time_limit = part_ptr->max_time;
if (job_desc->time_limit == NO_VAL) /* Default time_limit is partition maximum */
job_desc->time_limit = part_ptr->max_time;
/* can this user access this partition */
if (part_ptr->key && (is_key_valid (key) == 0)) {
info ("job_create: job lacks key required of partition %s",
part_ptr->name);
part_ptr->name);
error_code = EINVAL;
goto cleanup;
return error_code;
}
if (match_group (part_ptr->allow_groups, req_group) == 0) {
if (match_group (part_ptr->allow_groups, job_desc->groups) == 0) {
info ("job_create: job lacks group required of partition %s",
part_ptr->name);
part_ptr->name);
error_code = EINVAL;
goto cleanup;
return error_code;
}
if (req_group)
xfree(req_group);
/* check if select partition has sufficient resources to satisfy request */
if (req_node_list) { /* insure that selected nodes are in this partition */
error_code = node_name2bitmap (req_node_list, &req_bitmap);
if (job_desc->req_nodes) { /* insure that selected nodes are in this partition */
error_code = node_name2bitmap (job_desc->req_nodes, &req_bitmap);
if (error_code == EINVAL)
goto cleanup;
{
bit_free (req_bitmap);
return error_code ;
}
if (error_code != 0) {
error_code = EAGAIN; /* no memory */
goto cleanup;
bit_free (req_bitmap);
return error_code ;
}
if (contiguous == 1)
if (job_desc->contiguous == SLURM_JOB_DESC_CONTIGUOUS )
bit_fill_gaps (req_bitmap);
if (bit_super_set (req_bitmap, part_ptr->node_bitmap) != 1) {
info ("job_create: requested nodes %s not in partition %s",
req_node_list, part_ptr->name);
job_desc->req_nodes, part_ptr->name);
error_code = EINVAL;
goto cleanup;
bit_free (req_bitmap);
return error_code ;
}
i = count_cpus (req_bitmap);
if (i > req_cpus)
req_cpus = i;
if (i > job_desc->num_procs)
job_desc->num_procs = i;
i = bit_set_count (req_bitmap);
if (i > req_nodes)
req_nodes = i;
if (i > job_desc->num_nodes)
job_desc->num_nodes = i;
}
if (req_cpus > part_ptr->total_cpus) {
if (job_desc->num_procs > part_ptr->total_cpus) {
info ("job_create: too many cpus (%d) requested of partition %s(%d)",
req_cpus, part_ptr->name, part_ptr->total_cpus);
job_desc->num_procs, part_ptr->name, part_ptr->total_cpus);
error_code = EINVAL;
goto cleanup;
bit_free (req_bitmap);
return error_code ;
}
if ((req_nodes > part_ptr->total_nodes) ||
(req_nodes > part_ptr->max_nodes)) {
if ((job_desc->num_nodes > part_ptr->total_nodes) ||
(job_desc->num_nodes > part_ptr->max_nodes)) {
if (part_ptr->total_nodes > part_ptr->max_nodes)
i = part_ptr->max_nodes;
else
i = part_ptr->total_nodes;
info ("job_create: too many nodes (%d) requested of partition %s(%d)",
req_nodes, part_ptr->name, i);
job_desc->req_nodes, part_ptr->name, i);
error_code = EINVAL;
goto cleanup;
bit_free (req_bitmap);
return error_code ;
}
if (will_run) {
error_code = 0;
goto cleanup;
bit_free (req_bitmap);
return error_code ;
}
if (part_ptr->shared == 2) /* shared=force */
shared = 1;
else if ((shared != 1) || (part_ptr->shared == 0)) /* user or partition want no sharing */
shared = 0;
if ( ( error_code = copy_job_desc_to_job_record ( job_desc , job_rec_ptr , part_ptr , req_bitmap ) ) == SLURM_ERROR )
bit_free (req_bitmap);
return error_code ;
*new_job_id = (*job_rec_ptr)->job_id;
return SLURM_SUCCESS ;
}
int copy_job_desc_to_job_record ( job_desc_msg_t * job_desc , struct job_record ** job_rec_ptr , struct part_record *part_ptr, bitstr_t *req_bitmap )
{
int error_code ;
struct job_details *detail_ptr;
struct job_record *job_ptr ;
int job_id ;
job_ptr = create_job_record (&error_code);
if ((job_ptr == NULL) || error_code)
goto cleanup;
return error_code ;
strncpy (job_ptr->partition, part_ptr->name, MAX_NAME_LEN);
job_ptr->part_ptr = part_ptr;
if (job_id != NO_VAL)
if (job_desc->job_id != NO_VAL)
job_ptr->job_id = (uint32_t) job_id;
else
set_job_id(job_ptr);
......@@ -623,83 +574,117 @@ job_create (char *job_specs, uint32_t *new_job_id, int allocate,
else
job_hash[job_ptr->job_id % MAX_JOB_COUNT] = job_ptr;
if (job_name) {
strcpy (job_ptr->name, job_name);
xfree (job_name);
if (job_desc->name) {
strncpy (job_ptr->name, job_desc->name , sizeof (job_ptr->name)) ;
}
job_ptr->user_id = (uid_t) user_id;
job_ptr->user_id = (uid_t) job_desc->user_id;
job_ptr->job_state = JOB_PENDING;
job_ptr->time_limit = time_limit;
if (key && is_key_valid (key) && (priority != NO_VAL))
job_ptr->priority = priority;
job_ptr->time_limit = job_desc->time_limit;
if (job_desc->partition_key && is_key_valid (!NO_VAL) && (job_desc->priority != NO_VAL))
job_ptr->priority = job_desc->priority;
else
set_job_prio (job_ptr);
detail_ptr = job_ptr->details;
detail_ptr->num_procs = req_cpus;
detail_ptr->num_nodes = req_nodes;
if (req_node_list) {
detail_ptr->req_nodes = req_node_list;
detail_ptr->num_procs = job_desc->num_procs;
detail_ptr->num_nodes = job_desc->num_nodes;
if (job_desc->req_nodes) {
detail_ptr->req_nodes = xstrdup ( job_desc->req_nodes );
detail_ptr->req_node_bitmap = req_bitmap;
}
if (req_features)
detail_ptr->features = req_features;
detail_ptr->shared = shared;
detail_ptr->contiguous = contiguous;
detail_ptr->min_procs = min_cpus;
detail_ptr->min_memory = min_memory;
detail_ptr->min_tmp_disk = min_tmp_disk;
detail_ptr->dist = (enum task_dist) dist;
detail_ptr->job_script = script;
detail_ptr->procs_per_task = procs_per_task;
if (job_desc->features)
detail_ptr->features = xstrdup ( job_desc->features );
detail_ptr->shared = job_desc->shared;
detail_ptr->contiguous = job_desc->contiguous;
detail_ptr->min_procs = job_desc->num_procs;
detail_ptr->min_memory = job_desc->min_memory;
detail_ptr->min_tmp_disk = job_desc->min_tmp_disk;
detail_ptr->dist = (enum task_dist) job_desc->dist;
detail_ptr->job_script = xstrdup ( job_desc->job_script);
detail_ptr->procs_per_task = job_desc->procs_per_task;
/* job_ptr->nodes *leave as NULL pointer for now */
/* job_ptr->start_time *leave as NULL pointer for now */
/* job_ptr->end_time *leave as NULL pointer for now */
/* detail_ptr->total_procs *leave as NULL pointer for now */
*new_job_id = job_ptr->job_id;
if (job_rec_ptr)
*job_rec_ptr = job_ptr;
*job_rec_ptr = job_ptr;
return 0;
cleanup:
if (job_name)
xfree (job_name);
if (req_bitmap)
bit_free (req_bitmap);
if (req_group)
xfree (req_group);
if (req_features)
xfree (req_features);
if (req_node_list)
xfree (req_node_list);
if (req_partition)
xfree (req_partition);
if (script)
xfree (script);
return error_code;
}
int validate_job_desc ( job_desc_msg_t * job_desc_msg , int allocate )
{
if ((job_desc_msg->num_procs == NO_VAL) && (job_desc_msg->num_nodes == NO_VAL) &&
(job_desc_msg->req_nodes == NULL)) {
info ("job_create: job failed to specify ReqNodes, TotalNodes or TotalProcs");
return EINVAL;
}
if (allocate == SLURM_CREATE_JOB_FLAG_NO_ALLOCATE_0 && job_desc_msg->job_script == NULL) {
info ("job_create: job failed to specify Script");
return EINVAL;
}
if (job_desc_msg->user_id == NO_VAL) {
info ("job_create: job failed to specify User");
return EINVAL;
}
if (job_desc_msg->name && strlen(job_desc_msg->name) > MAX_NAME_LEN) {
info ("job_create: job name %s too long", job_desc_msg->name);
return EINVAL;
}
if (job_desc_msg->contiguous == NO_VAL)
job_desc_msg->contiguous = SLURM_JOB_DESC_DEFAULT_CONTIGUOUS ; /* default not contiguous */
if (job_desc_msg->job_id != NO_VAL && find_job_record ((uint32_t) job_desc_msg->job_id))
{
info ("job_create: Duplicate job id %d", job_desc_msg->job_id);
return EINVAL;
}
if (job_desc_msg->num_procs == NO_VAL)
job_desc_msg->num_procs = 1; /* default cpu count of 1 */
if (job_desc_msg->num_nodes == NO_VAL)
job_desc_msg->num_nodes = 1; /* default node count of 1 */
if (job_desc_msg->min_memory == NO_VAL)
job_desc_msg->min_memory = 1; /* default is 1 MB memory per node */
if (job_desc_msg->min_tmp_disk == NO_VAL)
job_desc_msg->min_tmp_disk = 1; /* default is 1 MB disk per node */
if (job_desc_msg->shared == NO_VAL)
job_desc_msg->shared = 0; /* default is not shared nodes */
if (job_desc_msg->dist == NO_VAL)
job_desc_msg->dist = DIST_BLOCK; /* default is block distribution */
if (job_desc_msg->procs_per_task == NO_VAL)
job_desc_msg->procs_per_task = 1; /* default is 1 processor per task */
else if (job_desc_msg->procs_per_task <= 0) {
info ("job_create: Invalid procs_per_task");
return EINVAL;
}
if (job_desc_msg->min_procs == NO_VAL)
job_desc_msg->min_procs = 1; /* default is 1 processor per node */
if (job_desc_msg->min_procs < job_desc_msg->procs_per_task) {
info ("job_create: min_cpus < procs_per_task, reset to equal");
job_desc_msg->min_procs = job_desc_msg->procs_per_task;
}
return SLURM_SUCCESS ;
}
/* job_lock - lock the job information
* global: job_mutex - semaphore for the job table
*/
void
void
job_lock ()
{
int error_code;
error_code = pthread_mutex_lock (&job_mutex);
if (error_code)
fatal ("job_lock: pthread_mutex_lock error %d", error_code);
}
/* job_unlock - unlock the job information
* global: part_mutex - semaphore for the job table
*/
void
void
job_unlock ()
{
int error_code;
......@@ -716,7 +701,7 @@ job_unlock ()
* job_count - count of job list entries
* job_hash, job_hash_over, max_hash_over - hash table into job records
*/
void
void
list_delete_job (void *job_entry)
{
struct job_record *job_record_point;
......@@ -758,7 +743,7 @@ list_delete_job (void *job_entry)
* see common/list.h for documentation, key is the job's id
* global- job_list - the global partition list
*/
int
int
list_find_job_id (void *job_entry, void *key)
{
if (((struct job_record *) job_entry)->job_id == *((uint32_t *) key))
......@@ -772,7 +757,7 @@ list_find_job_id (void *job_entry, void *key)
* see common/list.h for documentation, key is ignored
* global- job_list - the global partition list
*/
int
int
list_find_job_old (void *job_entry, void *key)
{
time_t min_age;
......@@ -804,7 +789,7 @@ list_find_job_old (void *job_entry, void *key)
* NOTE: change JOB_STRUCT_VERSION in common/slurmlib.h whenever the format changes
* NOTE: change slurm_load_job() in api/job_info.c whenever the data format changes
*/
void
void
pack_all_jobs (char **buffer_ptr, int *buffer_size, time_t * update_time)
{
ListIterator job_record_iterator;
......@@ -834,7 +819,7 @@ pack_all_jobs (char **buffer_ptr, int *buffer_size, time_t * update_time)
/* write individual job records */
while ((job_record_point =
(struct job_record *) list_next (job_record_iterator))) {
(struct job_record *) list_next (job_record_iterator))) {
if (job_record_point->magic != JOB_MAGIC)
fatal ("dump_all_job: job integrity is bad");
......@@ -880,7 +865,7 @@ pack_all_jobs (char **buffer_ptr, int *buffer_size, time_t * update_time)
* NOTE: the caller must insure that the buffer is sufficiently large to hold
* the data being written (space remaining at least BUF_SIZE)
*/
void
void
pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
{
char tmp_str[MAX_STR_PACK];
......@@ -900,7 +885,7 @@ pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
packstr (dump_job_ptr->name, buf_ptr, buf_len);
if (dump_job_ptr->node_bitmap) {
(void) bit_fmt(tmp_str, MAX_STR_PACK,
dump_job_ptr->node_bitmap);
dump_job_ptr->node_bitmap);
packstr (tmp_str, buf_ptr, buf_len);
}
else
......@@ -908,7 +893,7 @@ pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
detail_ptr = dump_job_ptr->details;
if (detail_ptr &&
dump_job_ptr->job_state == JOB_PENDING) {
dump_job_ptr->job_state == JOB_PENDING) {
if (detail_ptr->magic != DETAILS_MAGIC)
fatal ("dump_all_job: job detail integrity is bad");
pack32 ((uint32_t) detail_ptr->num_procs, buf_ptr, buf_len);
......@@ -921,7 +906,7 @@ pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
pack32 ((uint32_t) detail_ptr->min_tmp_disk, buf_ptr, buf_len);
if (detail_ptr->req_nodes == NULL ||
strlen (detail_ptr->req_nodes) < MAX_STR_PACK)
strlen (detail_ptr->req_nodes) < MAX_STR_PACK)
packstr (detail_ptr->req_nodes, buf_ptr, buf_len);
else {
strncpy(tmp_str, detail_ptr->req_nodes, MAX_STR_PACK);
......@@ -931,14 +916,14 @@ pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
if (detail_ptr->req_node_bitmap) {
(void) bit_fmt(tmp_str, MAX_STR_PACK,
detail_ptr->req_node_bitmap);
detail_ptr->req_node_bitmap);
packstr (tmp_str, buf_ptr, buf_len);
}
else
packstr (NULL, buf_ptr, buf_len);
if (detail_ptr->features == NULL ||
strlen (detail_ptr->features) < MAX_STR_PACK)
strlen (detail_ptr->features) < MAX_STR_PACK)
packstr (detail_ptr->features, buf_ptr, buf_len);
else {
strncpy(tmp_str, detail_ptr->features, MAX_STR_PACK);
......@@ -947,7 +932,7 @@ pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
}
if (detail_ptr->job_script == NULL ||
strlen (detail_ptr->job_script) < MAX_STR_PACK)
strlen (detail_ptr->job_script) < MAX_STR_PACK)
packstr (detail_ptr->job_script, buf_ptr, buf_len);
else {
strncpy(tmp_str, detail_ptr->job_script, MAX_STR_PACK);
......@@ -984,12 +969,12 @@ pack_job (struct job_record *dump_job_ptr, void **buf_ptr, int *buf_len)
*/
int
parse_job_specs (char *job_specs, char **req_features, char **req_node_list,
char **job_name, char **req_group, char **req_partition,
int *contiguous, int *req_cpus, int *req_nodes,
int *min_cpus, int *min_memory, int *min_tmp_disk, int *key,
int *shared, int *dist, char **script, int *time_limit,
int *procs_per_task, long *job_id, int *priority,
int *user_id) {
char **job_name, char **req_group, char **req_partition,
int *contiguous, int *req_cpus, int *req_nodes,
int *min_cpus, int *min_memory, int *min_tmp_disk, int *key,
int *shared, int *dist, char **script, int *time_limit,
int *procs_per_task, long *job_id, int *priority,
int *user_id) {
int bad_index, error_code, i;
char *temp_specs, *contiguous_str, *dist_str, *shared_str;
......@@ -1005,27 +990,27 @@ parse_job_specs (char *job_specs, char **req_features, char **req_node_list,
strcpy (temp_specs, job_specs);
error_code = slurm_parser(temp_specs,
"Contiguous=", 's', &contiguous_str,
"Distribution=", 's', &dist_str,
"Features=", 's', req_features,
"Groups=", 's', req_group,
"JobId=", 'l', job_id,
"JobName=", 's', job_name,
"Key=", 'd', key,
"MinProcs=", 'd', min_cpus,
"MinRealMemory=", 'd', min_memory,
"MinTmpDisk=", 'd', min_tmp_disk,
"Partition=", 's', req_partition,
"Priority=", 'd', priority,
"ProcsPerTask=", 'd', procs_per_task,
"ReqNodes=", 's', req_node_list,
"Script=", 's', script,
"Shared=", 's', shared_str,
"TimeLimit=", 'd', time_limit,
"TotalNodes=", 'd', req_nodes,
"TotalProcs=", 'd', req_cpus,
"User=", 'd', user_id,
"END");
"Contiguous=", 's', &contiguous_str,
"Distribution=", 's', &dist_str,
"Features=", 's', req_features,
"Groups=", 's', req_group,
"JobId=", 'l', job_id,
"JobName=", 's', job_name,
"Key=", 'd', key,
"MinProcs=", 'd', min_cpus,
"MinRealMemory=", 'd', min_memory,
"MinTmpDisk=", 'd', min_tmp_disk,
"Partition=", 's', req_partition,
"Priority=", 'd', priority,
"ProcsPerTask=", 'd', procs_per_task,
"ReqNodes=", 's', req_node_list,
"Script=", 's', script,
"Shared=", 's', shared_str,
"TimeLimit=", 'd', time_limit,
"TotalNodes=", 'd', req_nodes,
"TotalProcs=", 'd', req_cpus,
"User=", 'd', user_id,
"END");
if (error_code)
goto cleanup;
......@@ -1067,7 +1052,7 @@ parse_job_specs (char *job_specs, char **req_features, char **req_node_list,
if (bad_index != -1) {
error ("parse_job_specs: bad job specification input: %s",
&temp_specs[bad_index]);
&temp_specs[bad_index]);
error_code = EINVAL;
}
......@@ -1083,7 +1068,7 @@ parse_job_specs (char *job_specs, char **req_features, char **req_node_list,
xfree (shared_str);
return error_code;
cleanup:
cleanup:
xfree (temp_specs);
if (contiguous_str)
xfree (contiguous_str);
......@@ -1117,7 +1102,7 @@ parse_job_specs (char *job_specs, char **req_features, char **req_node_list,
* global: job_list - global job table
* last_job_update - time of last job table update
*/
void
void
purge_old_job (void)
{
int i;
......@@ -1137,7 +1122,7 @@ purge_old_job (void)
* global: last_job_update - time of last job table update
* job_list - pointer to global job list
*/
void
void
reset_job_bitmaps ()
{
ListIterator job_record_iterator;
......@@ -1148,14 +1133,14 @@ reset_job_bitmaps ()
job_record_iterator = list_iterator_create (job_list);
while ((job_record_point =
(struct job_record *) list_next (job_record_iterator))) {
(struct job_record *) list_next (job_record_iterator))) {
if (job_record_point->magic != JOB_MAGIC)
fatal ("dump_all_job: job integrity is bad");
if (job_record_point->node_bitmap)
bit_free(job_record_point->node_bitmap);
if (job_record_point->nodes)
node_name2bitmap (job_record_point->nodes,
&job_record_point->node_bitmap);
&job_record_point->node_bitmap);
if (job_record_point->details == NULL)
continue;
......@@ -1163,7 +1148,7 @@ reset_job_bitmaps ()
bit_free(job_record_point->details->req_node_bitmap);
if (job_record_point->details->req_nodes)
node_name2bitmap (job_record_point->details->req_nodes,
&job_record_point->details->req_node_bitmap);
&job_record_point->details->req_node_bitmap);
}
list_iterator_destroy (job_record_iterator);
......@@ -1176,7 +1161,7 @@ reset_job_bitmaps ()
* insure that the job_id is unique
* input: job_ptr - pointer to the job_record
*/
void
void
set_job_id (struct job_record *job_ptr)
{
static uint32_t id_sequence = (1 << 16);
......@@ -1185,12 +1170,12 @@ set_job_id (struct job_record *job_ptr)
#endif
if ((job_ptr == NULL) ||
(job_ptr->magic != JOB_MAGIC))
(job_ptr->magic != JOB_MAGIC))
fatal ("set_job_id: invalid job_ptr");
if ((job_ptr->partition == NULL) || (strlen(job_ptr->partition) == 0))
fatal ("set_job_id: partition not set");
/* Include below code only if fear of rolling over 32 bit job IDs */
/* Include below code only if fear of rolling over 32 bit job IDs */
#ifdef HUGE_JOB_ID
while (1) {
new_id = id_sequence++;
......@@ -1210,13 +1195,13 @@ set_job_id (struct job_record *job_ptr)
* input: job_ptr - pointer to the job_record
* NOTE: this is a simple prototype, we need to re-establish value on restart
*/
void
void
set_job_prio (struct job_record *job_ptr)
{
static int default_prio = 100000;
if ((job_ptr == NULL) ||
(job_ptr->magic != JOB_MAGIC))
(job_ptr->magic != JOB_MAGIC))
fatal ("set_job_prio: invalid job_ptr");
job_ptr->priority = default_prio--;
}
......@@ -1237,7 +1222,7 @@ top_priority (struct job_record *job_ptr) {
top = 1; /* assume top priority until found otherwise */
job_record_iterator = list_iterator_create (job_list);
while ((job_record_point =
(struct job_record *) list_next (job_record_iterator))) {
(struct job_record *) list_next (job_record_iterator))) {
if (job_record_point->magic != JOB_MAGIC)
fatal ("top_priority: job integrity is bad");
if (job_record_point == job_ptr)
......@@ -1245,7 +1230,7 @@ top_priority (struct job_record *job_ptr) {
if (job_record_point->job_state == JOB_PENDING)
continue;
if (job_record_point->priority > job_ptr->priority &&
job_record_point->part_ptr == job_ptr->part_ptr) {
job_record_point->part_ptr == job_ptr->part_ptr) {
top = 0;
break;
}
......@@ -1266,7 +1251,7 @@ top_priority (struct job_record *job_ptr) {
* NOTE: the contents of spec are overwritten by white space
* NOTE: only the job's priority and time_limt may be changed
*/
int
int
update_job (uint32_t job_id, char *spec)
{
int bad_index, error_code, i, time_limit;
......@@ -1301,20 +1286,20 @@ update_job (uint32_t job_id, char *spec)
if (bad_index != -1) {
error ("update_job: ignored job_id %u update specification: %s",
job_id, &spec[bad_index]);
job_id, &spec[bad_index]);
return EINVAL;
}
if (time_limit != NO_VAL) {
job_ptr->time_limit = time_limit;
info ("update_job: setting time_limit to %d for job_id %u",
time_limit, job_id);
time_limit, job_id);
}
if (prio != NO_VAL) {
job_ptr->priority = prio;
info ("update_job: setting priority to %f for job_id %u",
(double) prio, job_id);
(double) prio, job_id);
}
return 0;
......
......@@ -445,7 +445,7 @@ extern int is_key_valid (int key);
* default_part_loc - pointer to default partition
* NOTE: the calling program must xfree the memory pointed to by node_list
*/
extern int job_allocate (char *job_specs, uint32_t *new_job_id, char **node_list);
extern int job_allocate (job_desc_msg_t *job_specs, uint32_t *new_job_id, char **node_list, int immediate, int will_run);
/*
* job_cancel - cancel the specified job
......@@ -472,7 +472,7 @@ extern int job_cancel (uint32_t job_id);
* default_part_loc - pointer to default partition
* job_hash, job_hash_over, max_hash_over - hash table into job records
*/
extern int job_create (char *job_specs, uint32_t *new_job_id, int allocate,
extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int allocate,
int will_run, struct job_record **job_rec_ptr);
/* job_lock - lock the job information */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment