Skip to content
Snippets Groups Projects
Commit 30c71994 authored by Christopher J. Morrone's avatar Christopher J. Morrone
Browse files

Add multi-adapter support so each task gets a window from each adapter on a

node when using sn_all.
sn_single is not yet supported.
parent 35e66539
No related branches found
No related tags found
No related merge requests found
......@@ -51,7 +51,7 @@
#define FED_JOBINFO_MAGIC 0xc00cc00e
#define FED_LIBSTATE_MAGIC 0xc00cc00f
#define FED_ADAPTERLEN 5
#define FED_ADAPTERNAME_LEN 5
#define FED_HOSTLEN 20
#define FED_VERBOSE_PRINT 0
#define FED_NODECOUNT 128
......@@ -82,7 +82,7 @@ typedef struct fed_window {
} fed_window_t;
typedef struct fed_adapter {
char name[FED_ADAPTERLEN];
char name[FED_ADAPTERNAME_LEN];
uint16_t lid;
uint16_t network_id;
uint32_t max_winmem;
......@@ -97,8 +97,6 @@ struct fed_nodeinfo {
char name[FED_HOSTLEN];
uint32_t adapter_count;
fed_adapter_t *adapter_list;
uint16_t next_window;
uint16_t next_adapter;
struct fed_nodeinfo *next;
};
......@@ -122,9 +120,8 @@ struct fed_jobinfo {
uint16_t job_key;
char job_desc[DESCLEN];
uint32_t window_memory;
uint32_t table_size;
NTBL **table;
char *lid_index;
uint16_t tables_per_task;
fed_tableinfo_t *tableinfo;
};
typedef struct {
......@@ -133,7 +130,7 @@ typedef struct {
} fed_status_t;
typedef struct {
char name[FED_ADAPTERLEN];
char name[FED_ADAPTERNAME_LEN];
uint16_t lid;
uint16_t network_id;
} fed_cache_entry_t;
......@@ -243,7 +240,7 @@ char *fed_sprint_jobinfo(fed_jobinfo_t *j, char *buf,
j->job_key,
j->job_desc,
j->window_memory,
j->table_size);
j->tables_per_task);
if(count < 0)
return buf;
remaining -= count;
......@@ -287,7 +284,7 @@ _cache_lid(fed_adapter_t *ap)
lid_cache[adapter_num].lid = ap->lid;
lid_cache[adapter_num].network_id = ap->network_id;
strncpy(lid_cache[adapter_num].name, ap->name, FED_ADAPTERLEN);
strncpy(lid_cache[adapter_num].name, ap->name, FED_ADAPTERNAME_LEN);
}
/* Check lid cache for a given lid and return the associated adapter
......@@ -327,6 +324,20 @@ _get_network_id_from_lid(int lid)
return -1;
}
static uint16_t
_get_network_id_from_adapter(char *adapter_name)
{
int i;
for (i = 0; i < FED_MAXADAPTERS; i++) {
if (!strncmp(adapter_name, lid_cache[i].name,
FED_ADAPTERNAME_LEN))
return lid_cache[i].network_id;
}
return (uint16_t) -1;
}
/* Explicitly strip out carriage-return and new-line */
static void _strip_cr_nl(char *line)
{
......@@ -386,7 +397,7 @@ static int _set_up_adapter(fed_adapter_t *fed_adapter, char *adapter_name)
return SLURM_ERROR;
strncpy(fed_adapter->name,
adapter_name,
FED_ADAPTERLEN);
FED_ADAPTERNAME_LEN);
fed_adapter->lid = res.lid;
fed_adapter->network_id = res.network_id;
/* FUTURE: check that we don't lose information when converting
......@@ -553,9 +564,8 @@ fed_alloc_jobinfo(fed_jobinfo_t **j)
new->magic = FED_JOBINFO_MAGIC;
new->job_key = -1;
new->window_memory = 0;
new->table_size = 0;
new->table = NULL;
new->lid_index = NULL;
new->tables_per_task = 0;
new->tableinfo = NULL;
*j = new;
return 0;
......@@ -772,12 +782,8 @@ fed_print_nodeinfo(fed_nodeinfo_t *n, char *buf, size_t size)
assert(n->magic == FED_NODEINFO_MAGIC);
count = snprintf(tmp, remaining,
"Node: %s\n"
" next_window: %u\n"
" next_adapter: %u\n",
n->name,
n->next_window,
n->next_adapter);
"Node: %s\n",
n->name);
if(count < 0)
return buf;
remaining -= count;
......@@ -865,7 +871,7 @@ fed_pack_nodeinfo(fed_nodeinfo_t *n, Buf buf)
pack32(n->adapter_count, buf);
for(i = 0; i < n->adapter_count; i++) {
a = n->adapter_list + i;
packmem(a->name, FED_ADAPTERLEN, buf);
packmem(a->name, FED_ADAPTERNAME_LEN, buf);
pack16(a->lid, buf);
pack16(a->network_id, buf);
pack32(a->max_winmem, buf);
......@@ -899,7 +905,7 @@ _copy_node(fed_nodeinfo_t *dest, fed_nodeinfo_t *src)
for(i = 0; i < dest->adapter_count; i++) {
sa = src->adapter_list + i;
da = dest->adapter_list +i;
strncpy(da->name, sa->name, FED_ADAPTERLEN);
strncpy(da->name, sa->name, FED_ADAPTERNAME_LEN);
da->lid = sa->lid;
da->network_id = sa->network_id;
da->max_winmem = sa->max_winmem;
......@@ -914,8 +920,6 @@ _copy_node(fed_nodeinfo_t *dest, fed_nodeinfo_t *src)
for(j = 0; j < da->window_count; j++)
da->window_list[j] = sa->window_list[j];
}
dest->next_window = src->next_window;
dest->next_adapter = src->next_adapter;
return SLURM_SUCCESS;
}
......@@ -1058,8 +1062,6 @@ _alloc_node(fed_libstate_t *lp, char *name)
n->name[0] = '\0';
n->adapter_list = (fed_adapter_t *)malloc(FED_MAXADAPTERS *
sizeof(fed_adapter_t));
n->next_adapter = 0;
n->next_window = 0;
if(name != NULL) {
strncpy(n->name, name, FED_HOSTLEN);
......@@ -1132,7 +1134,7 @@ _unpack_nodeinfo(fed_nodeinfo_t *n, Buf buf)
for(i = 0; i < tmp_n->adapter_count; i++) {
tmp_a = tmp_n->adapter_list + i;
unpackmem(tmp_a->name, &size, buf);
if(size != FED_ADAPTERLEN)
if(size != FED_ADAPTERNAME_LEN)
goto unpack_error;
safe_unpack16(&tmp_a->lid, buf);
safe_unpack16(&tmp_a->network_id, buf);
......@@ -1256,7 +1258,23 @@ _get_lid(fed_nodeinfo_t *np, int index)
return ap[index].lid;
}
/* For a given node, fill out an NTBL
/* FIXME - this could be a little smarter than walking the whole list each time */
static fed_window_t *
_find_free_window(fed_adapter_t *adapter) {
int i;
fed_window_t *window;
for (i = FED_MIN_WIN; i < adapter->window_count; i++) {
window = &adapter->window_list[i];
if (window->status == NTBL_UNLOADED_STATE)
return window;
}
return (fed_window_t *) NULL;
}
/* For a given process, fill out an NTBL
* struct (an array of these makes up the network table loaded
* for each job). Assign adapters, lids and switch windows to
* each task in a job. Update lid_index for quick mapping
......@@ -1265,50 +1283,47 @@ _get_lid(fed_nodeinfo_t *np, int index)
* Used by: slurmctld
*/
static int
_setup_table_entry(NTBL *table_entry, char *lid_index, char *host, int id)
_allocate_windows(fed_tableinfo_t *tableinfo, char *hostname, int task_id)
{
fed_nodeinfo_t *n;
int count = 0;
int max_windows;
fed_nodeinfo_t *node;
fed_adapter_t *adapter;
fed_window_t *window;
NTBL *table;
int i;
assert(host);
assert(table_entry);
assert(tableinfo);
assert(hostname);
n = _find_node(fed_state, host);
if(n == NULL) {
error("Failed to find node in node_list: %s", host);
node = _find_node(fed_state, hostname);
if(node == NULL) {
error("Failed to find node in node_list: %s", hostname);
return SLURM_ERROR;
}
table_entry->task_id = id;
table_entry->lid = _get_lid(n, n->next_adapter);
table_entry->window_id =
n->adapter_list[n->next_adapter].window_list[n->next_window].id;
strncpy(lid_index, n->adapter_list[n->next_adapter].name,
FED_ADAPTERLEN);
max_windows = n->adapter_list[n->next_adapter].window_count
* n->adapter_count;
do {
if(count++ > max_windows)
/* Allocate a window on each adapter for this task */
for (i = 0; i < node->adapter_count; i++) {
adapter = &node->adapter_list[i];
window = _find_free_window(adapter);
if (window == NULL) {
/* FIXME need to clean up */
error("No free windows");
return SLURM_ERROR;
n->next_window++;
if(n->next_window >=
n->adapter_list[n->next_adapter].window_count) {
n->next_window = FED_MIN_WIN;
n->next_adapter++;
if(n->next_adapter >= n->adapter_count)
n->next_adapter = 0;
}
} while(n->adapter_list->window_list[n->next_window].status
!= NTBL_UNLOADED_STATE);
window->status = NTBL_LOADED_STATE;
n->adapter_list->window_list[n->next_window].status
= NTBL_LOADED_STATE;
table = tableinfo[i].table[task_id];
table->task_id = task_id;
table->lid = adapter->lid;
table->window_id = window->id;
strncpy(tableinfo[i].adapter_name, adapter->name,
FED_ADAPTERNAME_LEN);
}
return SLURM_SUCCESS;
}
#if FED_DEBUG
/* Used by: all */
static void
......@@ -1340,12 +1355,13 @@ _print_index(char *index, int size)
printf("--Begin lid index--\n");
for(i = 0; i < size; i++) {
printf(" task_id: %u\n", i);
printf(" name: %s\n", index + (i * FED_ADAPTERLEN));
printf(" name: %s\n", index + (i * FED_ADAPTERNAME_LEN));
}
printf("--End lid index--\n");
}
#endif
/* Setup everything for the job. Assign tasks across
* nodes in a block or cyclic fashion and create the network table used
* on all nodes of the job.
......@@ -1358,14 +1374,10 @@ fed_build_jobinfo(fed_jobinfo_t *jp, hostlist_t hl, int nprocs, int cyclic)
int nnodes;
hostlist_iterator_t hi;
char *host;
int full_node_cnt;
int min_procs_per_node;
int max_procs_per_node;
int proc_cnt = 0;
int task_cnt;
NTBL **tmp_table;
char *cur_idx;
int i, j;
fed_nodeinfo_t *node;
assert(jp);
assert(jp->magic == FED_JOBINFO_MAGIC);
......@@ -1374,82 +1386,84 @@ fed_build_jobinfo(fed_jobinfo_t *jp, hostlist_t hl, int nprocs, int cyclic)
if((nprocs <= 0) || (nprocs > FED_MAX_PROCS))
slurm_seterrno_ret(EINVAL);
tmp_table = (NTBL **)malloc(sizeof(NTBL *) * nprocs);
if(tmp_table == NULL)
slurm_seterrno_ret(ENOMEM);
jp->lid_index = (char *)malloc(FED_ADAPTERLEN * nprocs);
if(jp->lid_index == NULL) {
free(tmp_table);
slurm_seterrno_ret(ENOMEM);
}
jp->job_key = _next_key();
/* FIX ME! skip setting job_desc for now, will default to
* "no_job_description_given". Also, let the adapter
* determine our window memory size.
*/
jp->window_memory = FED_AUTO_WINMEM;
nnodes = hostlist_count(hl);
full_node_cnt = nprocs % nnodes;
min_procs_per_node = nprocs / nnodes;
max_procs_per_node = (nprocs + nnodes - 1) / nnodes;
jp->window_memory = FED_AUTO_WINMEM;
hi = hostlist_iterator_create(hl);
if(cyclic) {
// allocate 1 window per node
/*
* Peek at the first host to figure out tables_per_task.
* This driver assumes that all nodes have the same number of adapters
* per node. Bad Things will happen if this assumption is incorrect.
*/
host = hostlist_next(hi);
node = _find_node(fed_state, host);
jp->tables_per_task = node->adapter_count;
hostlist_iterator_reset(hi);
/* Allocate memory for each fed_tableinfo_t */
jp->tableinfo = (fed_tableinfo_t *) xmalloc(jp->tables_per_task
* sizeof(fed_tableinfo_t));
for (i = 0; i < jp->tables_per_task; i++) {
jp->tableinfo[i].table_length = nprocs;
jp->tableinfo[i].table = (NTBL **) xmalloc(nprocs
* sizeof(NTBL *));
for (j = 0; j < nprocs; j++) {
jp->tableinfo[i].table[j] =
(NTBL *) xmalloc(sizeof(NTBL));
}
}
if (cyclic) {
/* Allocate tables for one process per node at a time */
debug("Allocating windows in cyclic mode");
hostlist_iterator_reset(hi);
while(proc_cnt < nprocs) {
for (proc_cnt = 0; proc_cnt < nprocs; proc_cnt++) {
host = hostlist_next(hi);
if(!host) {
hostlist_iterator_reset(hi);
host = hostlist_next(hi);
}
tmp_table[proc_cnt] = (NTBL *)malloc(sizeof(NTBL));
cur_idx = jp->lid_index + (proc_cnt * FED_ADAPTERLEN);
if(_setup_table_entry(tmp_table[proc_cnt], cur_idx,
host, proc_cnt)) {
free(tmp_table);
free(jp->lid_index);
slurm_seterrno_ret(EWINDOW);
}
proc_cnt++;
/* FIXME check return code */
_allocate_windows(jp->tableinfo, host, proc_cnt);
free(host);
}
} else {
// allocate windows up to max_procs_per_node
int task_cnt;
int full_node_cnt;
int min_procs_per_node;
int max_procs_per_node;
debug("Allocating windows in block mode");
hostlist_iterator_reset(hi);
for(i = 0; i < nnodes; i++) {
nnodes = hostlist_count(hl);
full_node_cnt = nprocs % nnodes;
min_procs_per_node = nprocs / nnodes;
max_procs_per_node = (nprocs + nnodes - 1) / nnodes;
proc_cnt = 0;
for (i = 0; i < nnodes; i++) {
host = hostlist_next(hi);
if(!host)
error("Failed to get next host");
if(i < full_node_cnt)
task_cnt = max_procs_per_node;
else
task_cnt = min_procs_per_node;
for (j = 0; j < task_cnt; j++) {
tmp_table[proc_cnt] =
(NTBL *)malloc(sizeof(NTBL));
cur_idx = jp->lid_index +
(proc_cnt * FED_ADAPTERLEN);
if(_setup_table_entry(tmp_table[proc_cnt],
cur_idx, host, proc_cnt)) {
free(tmp_table);
free(jp->lid_index);
slurm_seterrno_ret(EWINDOW);
}
/* FIXME check return code */
_allocate_windows(jp->tableinfo, host, proc_cnt);
proc_cnt++;
}
free(host);
}
}
}
jp->table_size = nprocs;
jp->table = tmp_table;
#if FED_DEBUG
_print_table(jp->table, jp->table_size);
#endif
......@@ -1457,11 +1471,25 @@ fed_build_jobinfo(fed_jobinfo_t *jp, hostlist_t hl, int nprocs, int cyclic)
return SLURM_SUCCESS;
}
void
_pack_tableinfo(fed_tableinfo_t *tableinfo, Buf buf)
{
int i, j;
pack32(tableinfo->table_length, buf);
for (i = 0; i < tableinfo->table_length; i++) {
pack16(tableinfo->table[i]->task_id, buf);
pack16(tableinfo->table[i]->lid, buf);
pack16(tableinfo->table[i]->window_id, buf);
}
packmem(tableinfo->adapter_name, FED_ADAPTERNAME_LEN, buf);
}
/* Used by: all */
int
fed_pack_jobinfo(fed_jobinfo_t *j, Buf buf)
{
int i;
int i, k;
assert(j);
assert(j->magic == FED_JOBINFO_MAGIC);
......@@ -1471,23 +1499,46 @@ fed_pack_jobinfo(fed_jobinfo_t *j, Buf buf)
pack16(j->job_key, buf);
packmem(j->job_desc, DESCLEN, buf);
pack32(j->window_memory, buf);
pack32(j->table_size, buf);
for(i = 0; i < j->table_size; i++) {
pack16(j->table[i]->task_id, buf);
pack16(j->table[i]->lid, buf);
pack16(j->table[i]->window_id, buf);
pack16(j->tables_per_task, buf);
for (i = 0; i < j->tables_per_task; i++) {
_pack_tableinfo(&j->tableinfo[i], buf);
}
packmem(j->lid_index, j->table_size * FED_ADAPTERLEN, buf);
return SLURM_SUCCESS;
}
void
_unpack_tableinfo(fed_tableinfo_t *tableinfo, Buf buf)
{
uint16_t size;
int i;
safe_unpack32(&tableinfo->table_length, buf);
tableinfo->table = (NTBL **) xmalloc(tableinfo->table_length
* sizeof(NTBL *));
for (i = 0; i < tableinfo->table_length; i++) {
tableinfo->table[i] = (NTBL *) xmalloc(sizeof(NTBL));
safe_unpack16(&tableinfo->table[i]->task_id, buf);
safe_unpack16(&tableinfo->table[i]->lid, buf);
safe_unpack16(&tableinfo->table[i]->window_id, buf);
}
unpackmem(tableinfo->adapter_name, &size, buf);
if (size != FED_ADAPTERNAME_LEN)
error("adapter_name unpack error");
return;
unpack_error: /* safe_unpackXX are macros which jump to unpack_error */
error("unpack error in _unpack_tableinfo");
return;
}
/* Used by: all */
int
fed_unpack_jobinfo(fed_jobinfo_t *j, Buf buf)
{
uint16_t size;
NTBL **tmp_table = NULL;
char *tmp_index = NULL;
int i;
assert(j);
......@@ -1501,119 +1552,124 @@ fed_unpack_jobinfo(fed_jobinfo_t *j, Buf buf)
if(size != DESCLEN)
goto unpack_error;
safe_unpack32(&j->window_memory, buf);
safe_unpack32(&j->table_size, buf);
tmp_table = (NTBL **)malloc(sizeof(NTBL *) * j->table_size);
if(!tmp_table)
safe_unpack16(&j->tables_per_task, buf);
j->tableinfo = (fed_tableinfo_t *) xmalloc(j->tables_per_task
* sizeof(fed_tableinfo_t));
if(!j->tableinfo)
slurm_seterrno_ret(ENOMEM);
for(i = 0; i < j->table_size; i++) {
tmp_table[i] = (NTBL *)malloc(sizeof(NTBL));
if(!tmp_table[i])
slurm_seterrno_ret(ENOMEM);
safe_unpack16(&tmp_table[i]->task_id, buf);
safe_unpack16(&tmp_table[i]->lid, buf);
safe_unpack16(&tmp_table[i]->window_id, buf);
for (i = 0; i < j->tables_per_task; i++) {
_unpack_tableinfo(&j->tableinfo[i], buf);
}
j->table = tmp_table;
tmp_index = (char *)malloc(j->table_size * FED_ADAPTERLEN);
if(!tmp_index)
slurm_seterrno_ret(ENOMEM);
unpackmem(tmp_index, &size, buf);
if(size != (j->table_size * FED_ADAPTERLEN))
goto unpack_error;
j->lid_index = tmp_index;
return SLURM_SUCCESS;
unpack_error:
/* FIX ME! Potential memory leak if we don't free
* tmp_table's elements.
*/
if(tmp_table)
free(tmp_table);
if(tmp_index)
free(tmp_index);
/* if(tmp_table) */
/* free(tmp_table); */
/* if(tmp_index) */
/* free(tmp_index); */
slurm_seterrno_ret(EUNPACK);
return SLURM_ERROR;
}
/* Used by: all */
fed_jobinfo_t *
fed_copy_jobinfo(fed_jobinfo_t *j)
fed_copy_jobinfo(fed_jobinfo_t *job)
{
fed_jobinfo_t *new;
int i;
assert(j);
assert(j->magic == FED_JOBINFO_MAGIC);
int i, k;
assert(job);
assert(job->magic == FED_JOBINFO_MAGIC);
if(fed_alloc_jobinfo(&new)) {
debug("fed_alloc_jobinfo failed");
goto cleanup1;
}
memcpy(new, j, sizeof(fed_jobinfo_t));
memcpy(new, job, sizeof(fed_jobinfo_t));
/* table will be empty (and table_size == 0) when the network string
* from poe does not contain "us".
* (See man poe: -euilib or MP_EUILIB)
*/
if (new->table_size > 0) {
int size;
size = new->table_size * FED_ADAPTERLEN;
new->lid_index = (char *)malloc(size);
if (new->lid_index == NULL) {
debug("fed_copy_jobinfo new->lid_index malloc failed");
if (job->tables_per_task > 0) {
/* Allocate memory for each fed_tableinfo_t */
new->tableinfo = (fed_tableinfo_t *)xmalloc(
job->tables_per_task * sizeof(fed_tableinfo_t));
if (new->tableinfo == NULL)
goto cleanup2;
}
memcpy(new->lid_index, j->lid_index, size);
memcpy(new->tableinfo, job->tableinfo,
sizeof(fed_tableinfo_t) * job->tables_per_task);
for (i = 0; i < job->tables_per_task; i++) {
new->tableinfo[i].table =
(NTBL **) xmalloc(job->tableinfo[i].table_length
* sizeof(NTBL *));
if (new->tableinfo[i].table == NULL)
goto cleanup3;
for (k = 0; k < new->tableinfo[i].table_length; k++) {
new->tableinfo[i].table[k] =
(NTBL *) xmalloc(sizeof(NTBL));
if (new->tableinfo[i].table[k] == NULL)
goto cleanup4;
memcpy(new->tableinfo[i].table[k],
job->tableinfo[i].table[k],
sizeof(fed_tableinfo_t));
}
size = sizeof(NTBL *) * new->table_size;
new->table = (NTBL **)malloc(size);
if (new->table == NULL) {
debug("fed_copy_jobinfo: new->table malloc failed");
goto cleanup3;
}
memset(new->table, 0, size);
for(i = 0; i < new->table_size; i++) {
new->table[i] = (NTBL *)malloc(sizeof(NTBL));
if (new->table[i] == NULL)
goto cleanup4;
memcpy(new->table[i], j->table[i], sizeof(NTBL));
}
}
return new;
cleanup4:
for (i = 0; i < new->table_size; i++)
if (new->table[i])
free(new->table[i]);
k--;
for ( ; k >= 0; k--)
xfree(new->tableinfo[i].table[k]);
cleanup3:
free(new->lid_index);
i--;
for ( ; i >= 0; i--) {
for (k = 0; k < new->tableinfo[i].table_length; k++)
xfree(new->tableinfo[i].table[k]);
xfree(new->tableinfo[i].table);
}
xfree(new->tableinfo);
cleanup2:
fed_free_jobinfo(new);
cleanup1:
error("Allocating new jobinfo");
slurm_seterrno(ENOMEM);
return NULL;
}
/* Used by: all */
void
fed_free_jobinfo(fed_jobinfo_t *jp)
{
int i;
int i, j;
fed_tableinfo_t *tableinfo;
if(!jp)
if(!jp) {
return;
}
if(jp->table) {
for(i = 0; i < jp->table_size; i++) {
if(!jp->table[i])
free(jp->table[i]);
if (jp->tables_per_task > 0 && jp->tableinfo != NULL) {
for (i = 0; i < jp->tables_per_task; i++) {
tableinfo = &jp->tableinfo[i];
if (tableinfo == NULL)
continue;
for (j = 0; j < tableinfo->table_length; j++)
xfree(tableinfo->table[j]);
xfree(tableinfo->table);
}
free(jp->table);
}
if(jp->lid_index)
free(jp->lid_index);
xfree(jp->tableinfo);
}
free(jp);
jp = NULL;
......@@ -1627,23 +1683,23 @@ fed_free_jobinfo(fed_jobinfo_t *jp)
int
fed_get_jobinfo(fed_jobinfo_t *jp, int key, void *data)
{
NTBL ***table = (NTBL ***)data;
fed_tableinfo_t **tableinfo = (fed_tableinfo_t **)data;
int *tables_per = (int *)data;
int *job_key = (int *)data;
char **index = (char **)data;
assert(jp);
assert(jp->magic == FED_JOBINFO_MAGIC);
switch(key) {
case FED_JOBINFO_TABLE:
*table = jp->table;
case FED_JOBINFO_TABLEINFO:
*tableinfo = jp->tableinfo;
break;
case FED_JOBINFO_TABLESPERTASK:
*tables_per = jp->tables_per_task;
break;
case FED_JOBINFO_KEY:
*job_key = jp->job_key;
break;
case FED_JOBINFO_LIDIDX:
*index = jp->lid_index;
break;
default:
slurm_seterrno_ret(EINVAL);
}
......@@ -1662,26 +1718,24 @@ fed_load_table(fed_jobinfo_t *jp, int uid, int pid)
{
int i;
int err;
char *adapter, *old_adapter = NULL;
unsigned long long winmem;
char *adapter;
uint16_t network_id;
#if FED_DEBUG
char buf[2000];
#endif
assert(jp);
assert(jp->magic == FED_JOBINFO_MAGIC);
for(i = 0; i < jp->tables_per_task; i++) {
#if FED_DEBUG
_print_table(jp->table, jp->table_size);
printf("%s", fed_sprint_jobinfo(jp, buf, 2000));
_print_table(jp->tableinfo[i].table, jp->tableinfo[i].table_length);
printf("%s", fed_sprint_jobinfo(jp, buf, 2000));
#endif
for(i = 0; i < jp->table_size; i++) {
adapter = _get_adapter_from_lid(jp->table[i]->lid);
network_id = _get_network_id_from_lid(jp->table[i]->lid);
/* FIX ME! This is a crude check to see if we have already
* loaded a table for this adapter (and therefore don't need
* to do it again). Make this better.
*/
if((adapter == NULL) || (adapter == old_adapter))
adapter = jp->tableinfo[i].adapter_name;
network_id = _get_network_id_from_adapter(jp->tableinfo[i].adapter_name);
if(adapter == NULL)
continue;
winmem = jp->window_memory;
err = ntbl_load_table(
......@@ -1693,14 +1747,13 @@ fed_load_table(fed_jobinfo_t *jp, int uid, int pid)
jp->job_key,
jp->job_desc,
&winmem,
jp->table_size,
jp->table);
jp->tableinfo[i].table_length,
jp->tableinfo[i].table);
if(err != NTBL_SUCCESS) {
error("unable to load table %s\n",
_lookup_fed_status_tab(err));
return SLURM_ERROR;
}
old_adapter = adapter;
}
return SLURM_SUCCESS;
......@@ -1714,25 +1767,32 @@ fed_load_table(fed_jobinfo_t *jp, int uid, int pid)
int
fed_unload_table(fed_jobinfo_t *jp)
{
int i;
int i, j;
int err;
char *adapter;
assert(jp);
assert(jp->magic == FED_JOBINFO_MAGIC);
for(i = 0; i < jp->table_size; i++) {
adapter = _get_adapter_from_lid(jp->table[i]->lid);
if(adapter == NULL)
continue;
err = ntbl_unload_window(NTBL_VERSION, adapter, jp->job_key,
jp->table[i]->window_id);
if(err != NTBL_SUCCESS) {
error("unloading window: %s\n",
_lookup_fed_status_tab(err));
slurm_seterrno_ret(EUNLOAD);
NTBL **table;
uint32_t table_length;
assert(jp);
assert(jp->magic == FED_JOBINFO_MAGIC);
for (i = 0; i < jp->tables_per_task; i++) {
table = jp->tableinfo[i].table;
table_length = jp->tableinfo[i].table_length;
for(j = 0; j < table_length; j++) {
adapter = _get_adapter_from_lid(table[j]->lid);
if(adapter == NULL)
continue;
err = ntbl_unload_window(NTBL_VERSION, adapter,
jp->job_key,
table[j]->window_id);
if(err != NTBL_SUCCESS) {
error("unloading window: %s\n",
_lookup_fed_status_tab(err));
slurm_seterrno_ret(EUNLOAD);
}
}
}
}
return SLURM_SUCCESS;
}
......@@ -1873,7 +1933,7 @@ _unpack_libstate(fed_libstate_t *lp, Buf buffer)
return SLURM_SUCCESS;
unpack_error:
slurm_seterrno_ret(EBADMAGIC_FEDLIBSTATE); /* corrupted libstate */
slurm_seterrno_ret(EBADMAGIC_FEDLIBSTATE);
return SLURM_ERROR;
}
......
......@@ -28,13 +28,22 @@
#ifndef _FEDERATION_KEYS_INCLUDED
#define _FEDERATION_KEYS_INCLUDED
#define FED_ADAPTERNAME_LEN 5
enum {
/* Federation specific get_jobinfo keys */
FED_JOBINFO_TABLE,
FED_JOBINFO_PROTOCOL,
FED_JOBINFO_MODE,
FED_JOBINFO_TABLEINFO,
FED_JOBINFO_TABLESPERTASK,
FED_JOBINFO_KEY,
FED_JOBINFO_LIDIDX
FED_JOBINFO_PROTOCOL,
FED_JOBINFO_MODE
};
/* Information shared between slurm_ll_api and the slurm federation driver */
typedef struct fed_tableinfo {
uint32_t table_length;
NTBL **table;
char adapter_name[FED_ADAPTERNAME_LEN];
} fed_tableinfo_t;
#endif /* _FEDERATION_KEYS_INCLUDED */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment