Skip to content
Snippets Groups Projects
Commit 2b462015 authored by Rod Schultz's avatar Rod Schultz Committed by Danny Auble
Browse files

Add an API to return the address of a collector node for aggregation.

parent 3d58e629
No related branches found
No related tags found
No related merge requests found
...@@ -99,13 +99,34 @@ Reset internal state during reconfigure. ...@@ -99,13 +99,34 @@ Reset internal state during reconfigure.
<span class="commandline">SLURM_SUCCESS</span> on success, or<br> <span class="commandline">SLURM_SUCCESS</span> on success, or<br>
<span class="commandline">SLURM_ERROR</span> on failure. <span class="commandline">SLURM_ERROR</span> on failure.
<p class="commandline">
extern slurm_addr_t* route_g_next_collector ( bool *is_collector );
<p style="margin-left:.2in"><b>Description</b>:<br>
get address of parent in message tree.
<p style="margin-left:.2in"><b>Arguments</b>: <br>
<span class="commandline">is_collector (out) flag indication this node
is a collector.
</span>
<p style="margin-left:.2in"><b>Returns</b>: <br>
<span class="commandline">slurm_addr_t*</span>
address of node to send messages to be aggregated <br>
<span class="commandline">NULL</span> if not set.
<p class="commandline">
extern slurm_addr_t* route_g_next_collector_backup ( void );
<p style="margin-left:.2in"><b>Description</b>:<br>
get address of parent's backup in message tree.
<p style="margin-left:.2in"><b>Returns</b>: <br>
<span class="commandline">slurm_addr_t*</span>
address of node to send messages to be aggregated when primary collector
is down. <br>
<span class="commandline">NULL</span> if not set.
<h2>Versioning</h2> <h2>Versioning</h2>
<p> This document describes version 101 of the Slurm Route API. <p> This document describes version 101 of the Slurm Route API.
Future releases of Slurm may revise this API.</p> Future releases of Slurm may revise this API.</p>
<p class="footer"><a href="#top">top</a></p> <p class="footer"><a href="#top">top</a></p>
<p style="text-align:center;">Last modified 19 May 2014</p> <p style="text-align:center;">Last modified 15 July 2014</p>
<!--#include virtual="footer.txt"--> <!--#include virtual="footer.txt"-->
...@@ -38,10 +38,16 @@ ...@@ -38,10 +38,16 @@
#include <pthread.h> #include <pthread.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h>
#include <sys/param.h> /* MAXPATHLEN */
#include "slurm/slurm.h"
#include "src/common/log.h" #include "src/common/log.h"
#include "src/common/forward.h" #include "src/common/forward.h"
#include "src/common/node_conf.h"
#include "src/common/plugrack.h" #include "src/common/plugrack.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h" #include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_route.h" #include "src/common/slurm_route.h"
#include "src/common/timers.h" #include "src/common/timers.h"
...@@ -59,6 +65,8 @@ typedef struct slurm_route_ops { ...@@ -59,6 +65,8 @@ typedef struct slurm_route_ops {
hostlist_t** sp_hl, hostlist_t** sp_hl,
int* count); int* count);
int (*reconfigure) (void); int (*reconfigure) (void);
slurm_addr_t* (*next_collector) (bool* is_collector);
slurm_addr_t* (*next_collector_backup) (void);
} slurm_route_ops_t; } slurm_route_ops_t;
/* /*
...@@ -66,7 +74,9 @@ typedef struct slurm_route_ops { ...@@ -66,7 +74,9 @@ typedef struct slurm_route_ops {
*/ */
static const char *syms[] = { static const char *syms[] = {
"route_p_split_hostlist", "route_p_split_hostlist",
"route_p_reconfigure" "route_p_reconfigure",
"route_p_next_collector",
"route_p_next_collector_backup"
}; };
static slurm_route_ops_t ops; static slurm_route_ops_t ops;
...@@ -75,8 +85,183 @@ static pthread_mutex_t g_context_lock = PTHREAD_MUTEX_INITIALIZER; ...@@ -75,8 +85,183 @@ static pthread_mutex_t g_context_lock = PTHREAD_MUTEX_INITIALIZER;
static bool init_run = false; static bool init_run = false;
static uint32_t debug_flags = 0; static uint32_t debug_flags = 0;
static uint16_t tree_width; static uint16_t tree_width;
static bool this_is_collector = false; /* this node is a collector node */
static slurm_addr_t *msg_collect_node = NULL; /* address of node to aggregate
messages from this node */
static slurm_addr_t *msg_collect_backup = NULL; /* address of backup node to
aggregate messages from this node */
/* _get_all_nodes creates a hostlist containing all the nodes in the
* node_record_table.
*
* Caller must destroy the list.
*/
static hostlist_t _get_all_nodes( void )
{
int i;
hostlist_t nodes = hostlist_create(NULL);
for (i = 0; i < node_record_count; i++) {
hostlist_push_host(nodes, node_record_table_ptr[i].name);
}
return nodes;
}
/*
* _set_collectors call the split_hostlist API on the all nodes hostlist
* to set the node to be used as a collector for unsolicited node aggregation.
*
* If this node is a forwarding node (first node in any hostlist),
* then its collector and backup are the ControlMachine and it's backup.
*
* Otherwise, we find the hostlist containing this node.
* The forwarding node in that hostlist becomes a collector, the next node
* which is not this node becomes the backup.
* That list is split, we iterate through it and searching for a list in
* which this node is a forwarding node. If found, we set the collector and
* backup, else this process is repeated.
*/
static void _set_collectors( void )
{
slurm_ctl_conf_t *conf;
hostlist_t nodes;
hostlist_t* hll = NULL;
char *this_node_name = NULL, *parent = NULL, *backup = NULL;
char addrbuf[32];
int i, j, f;
int hl_count = 0;
uint16_t port;
uint16_t slurmd_port;
bool found = false;
bool ctldparent = true;
if (!run_in_daemon("slurmd"))
return; /* Only compute nodes have collectors */
/* Set the initial iteration, collector is controller,
* full list is split */
this_node_name = slurm_conf_get_aliased_nodename();
slurmd_port = slurm_get_slurmd_port();
conf = slurm_conf_lock();
nodes = _get_all_nodes();
parent = strdup(conf->control_addr);
if (conf->backup_addr) {
backup = strdup(conf->backup_addr);
}
port = conf->slurmctld_port;
slurm_conf_unlock();
while (!found) {
if ( route_g_split_hostlist(nodes, &hll, &hl_count) ) {
error("unable to split forward hostlist");
goto clean; /* collector addrs remains null */
}
/* Find which hostlist contains this node */
for (i=0; i < hl_count; i++) {
f = hostlist_find(hll[i], this_node_name);
if (f != -1)
break;
}
if (i == hl_count) {
fatal("ROUTE -- %s not found in node_record_table",
this_node_name);
}
if (f == 0) {
/* we are a forwarded to node,
* so our parent is parent */
if (hostlist_count(hll[i]) > 1)
this_is_collector = true;
xfree(msg_collect_node);
msg_collect_node = xmalloc(sizeof(slurm_addr_t));
if (ctldparent)
slurm_set_addr(msg_collect_node, port, parent);
else {
slurm_conf_get_addr(parent, msg_collect_node);
msg_collect_node->sin_port = htons(port);
}
if (debug_flags & DEBUG_FLAG_ROUTE) {
slurm_print_slurm_addr(msg_collect_node,
addrbuf, 32);
info("ROUTE -- message collector address is %s",
addrbuf);
}
xfree(msg_collect_backup);
if (backup) {
msg_collect_backup =
xmalloc(sizeof(slurm_addr_t));
if (ctldparent) {
slurm_set_addr(msg_collect_backup,
port, backup);
} else {
slurm_conf_get_addr(backup,
msg_collect_backup);
msg_collect_backup->sin_port =
htons(port);
}
if (debug_flags & DEBUG_FLAG_ROUTE) {
slurm_print_slurm_addr(
msg_collect_backup,
addrbuf, 32);
info("ROUTE -- message collector backup"
" address is %s", addrbuf);
}
} else {
if (debug_flags & DEBUG_FLAG_ROUTE) {
info("ROUTE -- no message collector "
"backup");
}
}
found = true;
goto clean;
}
/* We are not a forwarding node, the first node in this list
* will split the forward_list.
* We also know that the forwarding node is not a controller.
*
* clean up parent context */
port = slurmd_port;
ctldparent = false;
hostlist_destroy(nodes);
if (parent)
free(parent);
if (backup)
free(backup);
nodes = hostlist_copy(hll[i]);
for (j=0; j < hl_count; j++) {
hostlist_destroy(hll[j]);
}
xfree(hll);
/* set our parent, backup, and continue search */
parent = hostlist_shift(nodes);
backup = hostlist_nth(nodes, 0);
if (strcmp(backup, this_node_name) == 0) {
free(backup);
backup = NULL;
if (hostlist_count(nodes) > 1)
backup = hostlist_nth(nodes, 1);
}
}
clean:
if (debug_flags & DEBUG_FLAG_ROUTE) {
if (this_is_collector)
info("ROUTE -- %s is a collector node", this_node_name);
else
info("ROUTE -- %s is a leaf node", this_node_name);
}
hostlist_destroy(nodes);
if (parent)
free(parent);
if (backup)
free(backup);
for (i=0; i < hl_count; i++) {
hostlist_destroy(hll[i]);
}
xfree(hll);
xfree(this_node_name);
}
/* ************************************************************************** */ /* ************************************************************************** */
/* TAG( slurm_route_init ) */ /* TAG( slurm_route_init ) */
/* ************************************************************************** */ /* ************************************************************************** */
...@@ -108,6 +293,7 @@ extern int route_g_init(void) ...@@ -108,6 +293,7 @@ extern int route_g_init(void)
tree_width = slurm_get_tree_width(); tree_width = slurm_get_tree_width();
debug_flags = slurm_get_debug_flags(); debug_flags = slurm_get_debug_flags();
init_run = true; init_run = true;
_set_collectors();
done: done:
slurm_mutex_unlock(&g_context_lock); slurm_mutex_unlock(&g_context_lock);
...@@ -128,6 +314,8 @@ extern int route_g_fini(void) ...@@ -128,6 +314,8 @@ extern int route_g_fini(void)
init_run = false; init_run = false;
rc = plugin_context_destroy(g_context); rc = plugin_context_destroy(g_context);
g_context = NULL; g_context = NULL;
xfree(msg_collect_node);
msg_collect_node = NULL;
return rc; return rc;
} }
...@@ -200,12 +388,44 @@ extern int route_g_reconfigure(void) ...@@ -200,12 +388,44 @@ extern int route_g_reconfigure(void)
{ {
if (route_g_init() < 0) if (route_g_init() < 0)
return SLURM_ERROR; return SLURM_ERROR;
debug_flags = slurm_get_debug_flags(); debug_flags = slurm_get_debug_flags();
tree_width = slurm_get_tree_width(); tree_width = slurm_get_tree_width();
return (*(ops.reconfigure))(); return (*(ops.reconfigure))();
}
/* ************************************************************************** */
/* TAG( route_g_next_collector ) */
/* ************************************************************************** */
/*
* route_g_next_collector - return address of next collector
*
* IN: is_collector - bool* - flag indication if this node is a collector
*
* RET: slurm_addr_t* - address of node to send messages to be aggregated.
*/
extern slurm_addr_t* route_g_next_collector ( bool *is_collector )
{
if (route_g_init() < 0)
return NULL;
return (*(ops.next_collector))(is_collector);
}
/* ************************************************************************** */
/* TAG( route_g_next_collector_backup ) */
/* ************************************************************************** */
/*
* route_g_next_collector_backup
*
* RET: slurm_addr_t* - address of backup node to send messages to be aggregated.
*/
extern slurm_addr_t* route_g_next_collector_backup ( void )
{
if (route_g_init() < 0)
return NULL;
return (*(ops.next_collector_backup))();
} }
...@@ -270,3 +490,32 @@ extern int route_split_hostlist_treewidth(hostlist_t hl, ...@@ -270,3 +490,32 @@ extern int route_split_hostlist_treewidth(hostlist_t hl,
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
/* ************************************************************************** */
/* TAG( route_next_collector ) */
/* ************************************************************************** */
/*
* route_next_collector - get collector node address based
*
* IN: is_collector - bool* - flag indication if this node is a collector
*
* RET: slurm_addr_t* - address of node to send messages to be aggregated.
*/
extern slurm_addr_t* route_next_collector ( bool *is_collector )
{
*is_collector = this_is_collector;
return msg_collect_node;
}
/* ************************************************************************** */
/* TAG( route_next_collector_backup ) */
/* ************************************************************************** */
/*
* route_next_collector_backup - get collector backup address based
*
* RET: slurm_addr_t* - address of backup node to send messages to be aggregated
*/
extern slurm_addr_t* route_next_collector_backup ( void )
{
return msg_collect_backup;
}
...@@ -87,6 +87,23 @@ extern int route_g_split_hostlist(hostlist_t hl, ...@@ -87,6 +87,23 @@ extern int route_g_split_hostlist(hostlist_t hl,
*/ */
extern int route_g_reconfigure(void); extern int route_g_reconfigure(void);
/*
* route_g_next_collector - return address of next collector
*
* IN: is_collector - bool* - flag indication if this node is a collector
*
* RET: slurm_addr_t* - address of node to send messages to be aggregated.
*/
extern slurm_addr_t* route_g_next_collector ( bool *is_collector );
/*
* route_g_next_collector_backup
*
* RET: slurm_addr_t* - address of backup node to send messages to be aggregated
*/
extern slurm_addr_t* route_g_next_collector_backup ( void );
/*****************************************************************************\ /*****************************************************************************\
* Plugin Common Functions * Plugin Common Functions
\*****************************************************************************/ \*****************************************************************************/
...@@ -114,4 +131,20 @@ extern int route_split_hostlist_treewidth(hostlist_t hl, ...@@ -114,4 +131,20 @@ extern int route_split_hostlist_treewidth(hostlist_t hl,
hostlist_t** sp_hl, hostlist_t** sp_hl,
int* count); int* count);
/*
* route_next_collector - return address of next collector
*
* IN: is_collector - bool* - flag indication if this node is a collector
*
* RET: slurm_addr_t* - address of node to send messages to be aggregated.
*/
extern slurm_addr_t* route_next_collector ( bool *is_collector );
/*
* route_next_collector_backup -- return address of backup collector
*
* RET: slurm_addr_t* - address of backup node to send messages to be aggregated
*/
extern slurm_addr_t* route_next_collector_backup ( void );
#endif /*___SLURM_ROUTE_PLUGIN_API_H__*/ #endif /*___SLURM_ROUTE_PLUGIN_API_H__*/
...@@ -138,3 +138,25 @@ extern int route_p_reconfigure (void) ...@@ -138,3 +138,25 @@ extern int route_p_reconfigure (void)
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
/*
* route_p_next_collector - return address of next collector
*
* IN: is_collector - bool* - flag indication if this node is a collector
*
* RET: slurm_addr_t* - address of node to send messages to be aggregated.
*/
extern slurm_addr_t* route_p_next_collector ( bool *is_collector )
{
return route_next_collector(is_collector);
}
/*
* route_g_next_collector_backup
*
* RET: slurm_addr_t* - address of backup node to send messages to be aggregated.
*/
extern slurm_addr_t* route_p_next_collector_backup ( void )
{
return route_next_collector_backup();
}
...@@ -263,3 +263,26 @@ extern int route_p_reconfigure (void) ...@@ -263,3 +263,26 @@ extern int route_p_reconfigure (void)
debug_flags = slurm_get_debug_flags(); debug_flags = slurm_get_debug_flags();
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
/*
* route_p_next_collector - return address of next collector
*
* IN: is_collector - bool* - flag indication if this node is a collector
*
* RET: slurm_addr_t* - address of node to send messages to be aggregated.
*/
extern slurm_addr_t* route_p_next_collector ( bool *is_collector )
{
return route_next_collector(is_collector);
}
/*
* route_g_next_collector_backup
*
* RET: slurm_addr_t* - address of backup node to send messages to be aggregated.
*/
extern slurm_addr_t* route_p_next_collector_backup ( void )
{
return route_next_collector_backup();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment