Skip to content
Snippets Groups Projects
Commit c3a6f92c authored by Moe Jette's avatar Moe Jette
Browse files

Multi-thread sbcast logic (4-way fanout from sbcast itself).

parent f396c66b
No related branches found
No related tags found
No related merge requests found
......@@ -32,6 +32,7 @@
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
......@@ -39,14 +40,19 @@
#include "src/common/hostlist.h"
#include "src/common/log.h"
#include "src/common/read_config.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_defs.h"
#include "src/common/slurm_protocol_interface.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/forward.h"
#include "src/sbcast/sbcast.h"
#define AGENT_THREAD_COUNT 10
#define MAX_RETRIES 10
#define MAX_THREADS 4 /* These are huge messages, so only
* run MAX_THREADS at one time */
typedef enum {
DSH_NEW, /* Request not yet started */
DSH_ACTIVE, /* Request in progress */
......@@ -56,32 +62,31 @@ typedef enum {
} state_t;
typedef struct thd {
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
state_t state; /* thread state */
time_t start_time; /* start time */
time_t end_time; /* end time or delta time
* upon termination */
struct sockaddr_in slurm_addr; /* structure holding info for all
* forwarding info */
char node_name[MAX_SLURM_NAME]; /* node's name */
List ret_list;
pthread_t thread; /* thread ID */
pthread_attr_t attr; /* thread attributes */
slurm_msg_t *msg; /* message to send */
int rc; /* highest return codes from RPC */
} thd_t;
static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t agent_cnt_cond = PTHREAD_COND_INITIALIZER;
static int agent_cnt = 0;
static void *_agent_thread(void *args);
/* issue the RPC to ship the file's data */
extern void send_rpc(file_bcast_msg_t *bcast_msg,
resource_allocation_response_msg_t *alloc_resp)
{
/* This code will handle message fanout to multiple slurmd */
forward_t from, forward;
slurm_msg_t msg;
forward_t from, *forward;
hostlist_t hl;
List ret_list = NULL;
ListIterator itr, data_itr;
ret_types_t *ret_type = NULL;
ret_data_info_t *ret_data_info = NULL;
int i, rc = SLURM_SUCCESS;
int i, rc = 0, retries = 0;
int thr_count = 0, *span = set_span(alloc_resp->node_cnt);
thd_t *thread_info;
thread_info = xmalloc(slurm_get_tree_width() * sizeof(thd_t));
forward = xmalloc(slurm_get_tree_width() * sizeof(forward_t));
from.cnt = alloc_resp->node_cnt;
from.name = xmalloc(MAX_SLURM_NAME * alloc_resp->node_cnt);
hl = hostlist_create(alloc_resp->node_list);
......@@ -94,18 +99,73 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg,
from.addr = alloc_resp->node_addr;
from.node_id = NULL;
from.timeout = SLURM_MESSAGE_TIMEOUT_MSEC_STATIC;
i = 0;
forward_set(&forward, alloc_resp->node_cnt, &i, &from);
msg.msg_type = REQUEST_FILE_BCAST;
msg.address = alloc_resp->node_addr[0];
msg.data = bcast_msg;
msg.forward = forward;
msg.ret_list = NULL;
msg.orig_addr.sin_addr.s_addr = 0;
msg.srun_node_id = 0;
ret_list = slurm_send_recv_rc_msg(&msg,
for (i=0; i<alloc_resp->node_cnt; i++) {
forward_set(&forward[i], span[thr_count], &i, &from);
thread_info[i].msg = xmalloc(sizeof(slurm_msg_t));
thread_info[i].msg->msg_type = REQUEST_FILE_BCAST;
thread_info[i].msg->data = bcast_msg;
thread_info[i].msg->forward = forward[i];
thread_info[i].msg->ret_list = NULL;
thread_info[i].msg->orig_addr.sin_addr.s_addr = 0;
thread_info[i].msg->srun_node_id= 0;
thread_info[i].rc = -1;
thread_info[i].msg->address = alloc_resp->node_addr[i];
thr_count++;
}
xfree(span);
debug("spawning %d threads", thr_count);
for (i=0; i<thr_count; i++) {
slurm_mutex_lock(&agent_cnt_mutex);
while (agent_cnt >= MAX_THREADS)
pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
agent_cnt++;
slurm_mutex_unlock(&agent_cnt_mutex);
slurm_attr_init(&thread_info[i].attr);
if (pthread_attr_setdetachstate (&thread_info[i].attr,
PTHREAD_CREATE_JOINABLE))
error("pthread_attr_setdetachstate error %m");
while (pthread_create(&thread_info[i].thread,
&thread_info[i].attr,
_agent_thread,
(void *) &thread_info[i])) {
error("pthread_create error %m");
if (++retries > MAX_RETRIES)
fatal("Can't create pthread");
sleep(1); /* sleep and again */
}
}
/* wait until pthreads complete */
slurm_mutex_lock(&agent_cnt_mutex);
while (agent_cnt)
pthread_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
slurm_mutex_unlock(&agent_cnt_mutex);
for (i=0; i<thr_count; i++) {
rc = MAX(rc, thread_info[i].rc);
xfree(thread_info[i].msg);
destroy_forward(&forward[i]);
}
xfree(from.name);
xfree(forward);
xfree(thread_info);
if (rc)
exit(1);
}
static void *_agent_thread(void *args)
{
List ret_list = NULL;
ret_types_t *ret_type = NULL;
thd_t *thread_ptr = (thd_t *) args;
slurm_msg_t *msg = thread_ptr->msg;
ListIterator itr, data_itr;
ret_data_info_t *ret_data_info = NULL;
int rc = 0;
ret_list = slurm_send_recv_rc_msg(msg,
SLURM_MESSAGE_TIMEOUT_MSEC_STATIC);
if (ret_list == NULL) {
error("slurm_send_recv_rc_msg: %m");
......@@ -115,28 +175,33 @@ extern void send_rpc(file_bcast_msg_t *bcast_msg,
itr = list_iterator_create(ret_list);
while ((ret_type = list_next(itr)) != NULL) {
data_itr = list_iterator_create(ret_type->ret_data_list);
while((ret_data_info = list_next(data_itr)) != NULL) {
i = ret_type->msg_rc;
if (i != SLURM_SUCCESS) {
if (!strcmp(ret_data_info->node_name,
"localhost")) {
xfree(ret_data_info->node_name);
ret_data_info->node_name =
xstrdup(from.name);
}
error("REQUEST_FILE_BCAST(%s): %s",
ret_data_info->node_name,
slurm_strerror(i));
rc = i;
while ((ret_data_info = list_next(data_itr)) != NULL) {
if (ret_type->msg_rc == SLURM_SUCCESS)
continue;
if (!strcmp(ret_data_info->node_name,
"localhost")) {
xfree(ret_data_info->node_name);
ret_data_info->node_name =
xmalloc(MAX_SLURM_NAME);
getnodename(ret_data_info->node_name,
MAX_SLURM_NAME);
}
error("REQUEST_FILE_BCAST(%s): %s",
ret_data_info->node_name,
slurm_strerror(ret_type->msg_rc));
rc = MAX(rc, ret_type->msg_rc);
}
list_iterator_destroy(data_itr);
}
thread_ptr->rc = rc;
list_iterator_destroy(itr);
if (ret_list)
list_destroy(ret_list);
xfree(from.name);
destroy_forward(&forward);
if (rc)
exit(1);
slurm_mutex_lock(&agent_cnt_mutex);
agent_cnt--;
slurm_mutex_unlock(&agent_cnt_mutex);
pthread_cond_broadcast(&agent_cnt_cond);
return NULL;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment