Skip to content
Snippets Groups Projects
Commit 5d5be7b2 authored by Christopher J. Morrone's avatar Christopher J. Morrone
Browse files

Add multiple-message-response socket support to slurm_launch_tasks

parent c59d3340
No related branches found
No related tags found
No related merge requests found
...@@ -55,7 +55,8 @@ struct step_launch_state { ...@@ -55,7 +55,8 @@ struct step_launch_state {
/* message thread variables */ /* message thread variables */
eio_handle_t *msg_handle; eio_handle_t *msg_handle;
pthread_t msg_thread; pthread_t msg_thread;
uint16_t msg_port; uint16_t num_resp_port;
uint16_t *resp_port; /* array of message response ports */
/* client side io variables */ /* client side io variables */
client_io_t *client_io; client_io_t *client_io;
......
...@@ -61,8 +61,6 @@ ...@@ -61,8 +61,6 @@
#include "src/api/step_ctx.h" #include "src/api/step_ctx.h"
#include "src/api/step_pmi.h" #include "src/api/step_pmi.h"
#define STEP_LAUNCH_TIMEOUT 10 /* FIXME - should be defined elsewhere */
/********************************************************************** /**********************************************************************
* General declarations for step launch code * General declarations for step launch code
**********************************************************************/ **********************************************************************/
...@@ -78,7 +76,7 @@ static client_io_t *_setup_step_client_io(slurm_step_ctx ctx, ...@@ -78,7 +76,7 @@ static client_io_t *_setup_step_client_io(slurm_step_ctx ctx,
* Message handler declarations * Message handler declarations
**********************************************************************/ **********************************************************************/
static uid_t slurm_uid; static uid_t slurm_uid;
static int _msg_thr_create(struct step_launch_state *sls); static int _msg_thr_create(struct step_launch_state *sls, int num_nodes);
static void _handle_msg(struct step_launch_state *sls, slurm_msg_t *msg); static void _handle_msg(struct step_launch_state *sls, slurm_msg_t *msg);
static bool _message_socket_readable(eio_obj_t *obj); static bool _message_socket_readable(eio_obj_t *obj);
static int _message_socket_accept(eio_obj_t *obj, List objs); static int _message_socket_accept(eio_obj_t *obj, List objs);
...@@ -157,8 +155,8 @@ int slurm_step_launch (slurm_step_ctx ctx, ...@@ -157,8 +155,8 @@ int slurm_step_launch (slurm_step_ctx ctx,
ctx->launch_state->task_start_callback = params->task_start_callback; ctx->launch_state->task_start_callback = params->task_start_callback;
ctx->launch_state->task_finish_callback = params->task_finish_callback; ctx->launch_state->task_finish_callback = params->task_finish_callback;
/* Create message receiving socket and handler thread */ /* Create message receiving sockets and handler thread */
_msg_thr_create(ctx->launch_state); _msg_thr_create(ctx->launch_state, ctx->step_req->node_count);
/* Start tasks on compute nodes */ /* Start tasks on compute nodes */
launch.job_id = ctx->alloc_resp->job_id; launch.job_id = ctx->alloc_resp->job_id;
...@@ -211,17 +209,17 @@ int slurm_step_launch (slurm_step_ctx ctx, ...@@ -211,17 +209,17 @@ int slurm_step_launch (slurm_step_ctx ctx,
if (client_io_handler_start(ctx->launch_state->client_io) != SLURM_SUCCESS) if (client_io_handler_start(ctx->launch_state->client_io) != SLURM_SUCCESS)
return SLURM_ERROR; return SLURM_ERROR;
launch.io_port = xmalloc(sizeof(uint16_t) * launch.nnodes);
launch.resp_port = xmalloc(sizeof(uint16_t) * launch.nnodes);
launch.num_io_port = ctx->launch_state->client_io->num_listen; launch.num_io_port = ctx->launch_state->client_io->num_listen;
launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
for (i = 0; i < launch.num_io_port; i++) { for (i = 0; i < launch.num_io_port; i++) {
launch.io_port[i] = launch.io_port[i] =
ntohs(ctx->launch_state->client_io->listenport[i]); ntohs(ctx->launch_state->client_io->listenport[i]);
} }
launch.num_resp_port = 1;
launch.num_resp_port = ctx->launch_state->num_resp_port;
launch.resp_port = xmalloc(sizeof(uint16_t) * launch.num_resp_port);
for (i = 0; i < launch.num_resp_port; i++) { for (i = 0; i < launch.num_resp_port; i++) {
launch.resp_port[i] = ntohs(ctx->launch_state->msg_port); launch.resp_port[i] = ntohs(ctx->launch_state->resp_port[i]);
} }
_launch_tasks(ctx, &launch); _launch_tasks(ctx, &launch);
...@@ -274,6 +272,7 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx) ...@@ -274,6 +272,7 @@ void slurm_step_launch_wait_finish(slurm_step_ctx ctx)
/* FIXME - put these in an sls-specific desctructor */ /* FIXME - put these in an sls-specific desctructor */
pthread_mutex_destroy(&sls->lock); pthread_mutex_destroy(&sls->lock);
pthread_cond_destroy(&sls->cond); pthread_cond_destroy(&sls->cond);
xfree(sls->resp_port);
} }
/********************************************************************** /**********************************************************************
...@@ -288,28 +287,44 @@ static void *_msg_thr_internal(void *arg) ...@@ -288,28 +287,44 @@ static void *_msg_thr_internal(void *arg)
return NULL; return NULL;
} }
static int _msg_thr_create(struct step_launch_state *sls) static inline int
_estimate_nports(int nclients, int cli_per_port)
{
div_t d;
d = div(nclients, cli_per_port);
return d.rem > 0 ? d.quot + 1 : d.quot;
}
static int _msg_thr_create(struct step_launch_state *sls, int num_nodes)
{ {
int sock = -1; int sock = -1;
int port = -1; int port = -1;
eio_obj_t *obj; eio_obj_t *obj;
int i;
debug("Entering _msg_thr_create()"); debug("Entering _msg_thr_create()");
slurm_uid = (uid_t) slurm_get_slurm_user_id(); slurm_uid = (uid_t) slurm_get_slurm_user_id();
if (net_stream_listen(&sock, &port) < 0) { sls->msg_handle = eio_handle_create();
error("unable to intialize step launch listening socket: %m"); sls->num_resp_port = _estimate_nports(num_nodes, 48);
return SLURM_ERROR; sls->resp_port = xmalloc(sizeof(uint16_t) * sls->num_resp_port);
for (i = 0; i < sls->num_resp_port; i++) {
if (net_stream_listen(&sock, &port) < 0) {
error("unable to intialize step launch listening socket: %m");
return SLURM_ERROR;
}
sls->resp_port[i] = port;
obj = eio_obj_create(sock, &message_socket_ops, (void *)sls);
eio_new_initial_obj(sls->msg_handle, obj);
} }
sls->msg_port = port;
obj = eio_obj_create(sock, &message_socket_ops, (void *)sls); if (pthread_create(&sls->msg_thread, NULL,
_msg_thr_internal, (void *)sls) != 0) {
sls->msg_handle = eio_handle_create(); error("pthread_create of message thread: %m");
eio_new_initial_obj(sls->msg_handle, obj); return SLURM_ERROR;
/* FIXME check return code */ }
pthread_create(&sls->msg_thread, NULL, _msg_thr_internal, (void *)sls);
return SLURM_SUCCESS;
} }
static bool _message_socket_readable(eio_obj_t *obj) static bool _message_socket_readable(eio_obj_t *obj)
...@@ -339,6 +354,7 @@ static int _message_socket_accept(eio_obj_t *obj, List objs) ...@@ -339,6 +354,7 @@ static int _message_socket_accept(eio_obj_t *obj, List objs)
struct sockaddr_un addr; struct sockaddr_un addr;
slurm_msg_t *msg = NULL; slurm_msg_t *msg = NULL;
int len = sizeof(addr); int len = sizeof(addr);
int timeout = 0; /* slurm default value */
List ret_list = NULL; List ret_list = NULL;
debug3("Called _msg_socket_accept"); debug3("Called _msg_socket_accept");
...@@ -374,11 +390,13 @@ static int _message_socket_accept(eio_obj_t *obj, List objs) ...@@ -374,11 +390,13 @@ static int _message_socket_accept(eio_obj_t *obj, List objs)
msg->conn_fd = fd; msg->conn_fd = fd;
msg->forward_struct_init = 0; msg->forward_struct_init = 0;
/* multiple jobs (easily induced via no_alloc) and highly
* parallel jobs using PMI sometimes result in slow message
* responses and timeouts. Raise the default timeout for srun. */
timeout = slurm_get_msg_timeout() * 8;
again: again:
ret_list = slurm_receive_msg(fd, msg, STEP_LAUNCH_TIMEOUT); ret_list = slurm_receive_msg(fd, msg, timeout);
if(!ret_list || errno != SLURM_SUCCESS) { if(!ret_list || errno != SLURM_SUCCESS) {
printf("error on slurm_recieve_msg\n");
fflush(stdout);
if (errno == EINTR) { if (errno == EINTR) {
list_destroy(ret_list); list_destroy(ret_list);
goto again; goto again;
...@@ -388,10 +406,6 @@ again: ...@@ -388,10 +406,6 @@ again:
goto cleanup; goto cleanup;
} }
if(list_count(ret_list)>0) { if(list_count(ret_list)>0) {
printf("_message_socket_accept connection: "
"got %d from receive, expecting 0\n",
list_count(ret_list));
fflush(stdout);
error("_message_socket_accept connection: " error("_message_socket_accept connection: "
"got %d from receive, expecting 0", "got %d from receive, expecting 0",
list_count(ret_list)); list_count(ret_list));
...@@ -520,6 +534,7 @@ static int _launch_tasks(slurm_step_ctx ctx, ...@@ -520,6 +534,7 @@ static int _launch_tasks(slurm_step_ctx ctx,
ListIterator ret_data_itr; ListIterator ret_data_itr;
ret_types_t *ret; ret_types_t *ret;
ret_data_info_t *ret_data; ret_data_info_t *ret_data;
int timeout;
/* slurm_addr *step_addrs; */ /* array of addresses */ /* slurm_addr *step_addrs; */ /* array of addresses */
/* int num_step_addrs; */ /* int num_step_addrs; */
...@@ -546,17 +561,19 @@ static int _launch_tasks(slurm_step_ctx ctx, ...@@ -546,17 +561,19 @@ static int _launch_tasks(slurm_step_ctx ctx,
msg.buffer = buffer; msg.buffer = buffer;
memcpy(&msg.address, &ctx->alloc_resp->node_addr[0], memcpy(&msg.address, &ctx->alloc_resp->node_addr[0],
sizeof(slurm_addr)); sizeof(slurm_addr));
timeout = slurm_get_msg_timeout();
forward_set_launch(&msg.forward, forward_set_launch(&msg.forward,
ctx->step_req->node_count, ctx->step_req->node_count,
&zero, &zero,
ctx->step_layout, ctx->step_layout,
ctx->alloc_resp->node_addr, ctx->alloc_resp->node_addr,
itr, itr,
STEP_LAUNCH_TIMEOUT); timeout);
hostlist_iterator_destroy(itr); hostlist_iterator_destroy(itr);
hostlist_destroy(hostlist); hostlist_destroy(hostlist);
ret_list = slurm_send_recv_rc_packed_msg(&msg, STEP_LAUNCH_TIMEOUT); ret_list =
slurm_send_recv_rc_packed_msg(&msg, timeout);
if (ret_list == NULL) { if (ret_list == NULL) {
error("slurm_send_recv_rc_packed_msg failed miserably: %m"); error("slurm_send_recv_rc_packed_msg failed miserably: %m");
/* xfree(step_addrs); */ /* xfree(step_addrs); */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment