diff --git a/src/common/forward.c b/src/common/forward.c index f3fda37ce7ce5879f23785e5f8c72490caccfdb1..cd4fddcd7fd0aaa3c6a1651272253f67716c03ac 100644 --- a/src/common/forward.c +++ b/src/common/forward.c @@ -446,7 +446,8 @@ extern int no_resp_forwards(forward_t *forward, List *ret_list, int err) type->err = err; type->ret_data_list = list_create(destroy_data_info); for(i=0; i<forward->cnt; i++) { - strncpy(name, &forward->name[i * MAX_SLURM_NAME], MAX_SLURM_NAME); + strncpy(name, + &forward->name[i * MAX_SLURM_NAME], MAX_SLURM_NAME); ret_data_info = xmalloc(sizeof(ret_data_info_t)); list_push(type->ret_data_list, ret_data_info); ret_data_info->node_name = xstrdup(name); @@ -470,8 +471,6 @@ void destroy_data_info(void *object) void destroy_forward(forward_t *forward) { - //return; - //forward_t *forward = (forward_t *)object; if(forward->cnt > 0) { xfree(forward->addr); xfree(forward->name); @@ -482,11 +481,11 @@ void destroy_forward(forward_t *forward) void destroy_forward_struct(forward_struct_t *forward_struct) { - //forward_msg_t *forward_msg = (forward_msg_t *)object; if(forward_struct) { xfree(forward_struct->buf); xfree(forward_struct->forward_msg); slurm_mutex_destroy(&forward_struct->forward_mutex); + pthread_cond_destroy(&forward_struct->notify); xfree(forward_struct); } } diff --git a/src/common/read_config.c b/src/common/read_config.c index cf0b6f22067004953eedb056980a7dfb26402ba8..ba74cb67d501804c518da152a87d03803ef3a41d 100644 --- a/src/common/read_config.c +++ b/src/common/read_config.c @@ -1434,10 +1434,10 @@ validate_and_set_defaults(slurm_ctl_conf_t *conf, s_p_hashtbl_t *hashtbl) if (!s_p_get_uint16(&conf->wait_time, "WaitTime", hashtbl)) conf->wait_time = DEFAULT_WAIT_TIME; - if (s_p_get_uint16(&conf->schedport, "TreeWidth", hashtbl)) { + if (s_p_get_uint16(&conf->tree_width, "TreeWidth", hashtbl)) { if (conf->tree_width == 0) { error("TreeWidth=0 is invalid"); - conf->tree_width = 50; /* default? */ + conf->tree_width = DEFAULT_TREE_WIDTH; /* default? */ } } else { conf->tree_width = DEFAULT_TREE_WIDTH; diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 8529937230d90910943adb0c0f98d76b48dc9675..98ed476a67cf1812c69726db6d0b5821d24402e3 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -697,13 +697,12 @@ List slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout) int rc; void *auth_cred = NULL; Buf buffer; - int count = 0; - forward_struct_t *forward_struct = NULL; + /* int count = 0; */ ret_types_t *ret_type = NULL; - ListIterator itr; - int16_t fwd_cnt = 0; - +/* ListIterator itr; */ + List ret_list = list_create(destroy_ret_types); + msg->forward_struct = NULL; xassert(fd >= 0); @@ -750,23 +749,26 @@ List slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout) } else { memcpy(&header.orig_addr, &msg->orig_addr, sizeof(slurm_addr)); } - fwd_cnt = header.forward.cnt; + /* Forward message to other nodes */ - if(fwd_cnt > 0) { - forward_struct = xmalloc(sizeof(forward_struct_t)); - forward_struct->buf_len = remaining_buf(buffer); - forward_struct->buf = - xmalloc(sizeof(char) * forward_struct->buf_len); - memcpy(forward_struct->buf, + if(header.forward.cnt > 0) { + msg->forward_struct = xmalloc(sizeof(forward_struct_t)); + msg->forward_struct_init = FORWARD_INIT; + msg->forward_struct->buf_len = remaining_buf(buffer); + msg->forward_struct->buf = + xmalloc(sizeof(char) * msg->forward_struct->buf_len); + memcpy(msg->forward_struct->buf, &buffer->head[buffer->processed], - forward_struct->buf_len); - forward_struct->ret_list = ret_list; + msg->forward_struct->buf_len); + msg->forward_struct->ret_list = ret_list; - forward_struct->timeout = timeout - header.forward.timeout; - - debug3("forwarding messages to %d nodes!!!!", fwd_cnt); + msg->forward_struct->timeout = timeout-header.forward.timeout; + msg->forward_struct->fwd_cnt = header.forward.cnt; + + debug("forwarding messages to %d nodes!!!!", + msg->forward_struct->fwd_cnt); - if(forward_msg(forward_struct, &header) == SLURM_ERROR) { + if(forward_msg(msg->forward_struct, &header) == SLURM_ERROR) { error("problem with forward msg"); } } @@ -807,36 +809,36 @@ List slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout) free_buf(buffer); rc = SLURM_SUCCESS; - if(forward_struct) { - slurm_mutex_lock(&forward_struct->forward_mutex); - count = 0; - itr = list_iterator_create(ret_list); - while((ret_type = (ret_types_t *) list_next(itr)) - != NULL) { - count += list_count(ret_type->ret_data_list); - } - list_iterator_destroy(itr); - debug3("Got back %d",count); - while((count < fwd_cnt)) { - pthread_cond_wait(&forward_struct->notify, - &forward_struct->forward_mutex); - count = 0; - itr = list_iterator_create(ret_list); - while((ret_type = (ret_types_t *) list_next(itr)) - != NULL) { - count += list_count(ret_type->ret_data_list); - } - list_iterator_destroy(itr); - debug3("Got back %d",count); + /* if(forward_struct) { */ +/* slurm_mutex_lock(&forward_struct->forward_mutex); */ +/* count = 0; */ +/* itr = list_iterator_create(ret_list); */ +/* while((ret_type = (ret_types_t *) list_next(itr)) */ +/* != NULL) { */ +/* count += list_count(ret_type->ret_data_list); */ +/* } */ +/* list_iterator_destroy(itr); */ +/* debug3("Got back %d",count); */ +/* while((count < fwd_cnt)) { */ +/* pthread_cond_wait(&forward_struct->notify, */ +/* &forward_struct->forward_mutex); */ +/* count = 0; */ +/* itr = list_iterator_create(ret_list); */ +/* while((ret_type = (ret_types_t *) list_next(itr)) */ +/* != NULL) { */ +/* count += list_count(ret_type->ret_data_list); */ +/* } */ +/* list_iterator_destroy(itr); */ +/* debug3("Got back %d",count); */ - } - debug2("Got them all"); - slurm_mutex_unlock(&forward_struct->forward_mutex); - } +/* } */ +/* debug2("Got them all"); */ +/* slurm_mutex_unlock(&forward_struct->forward_mutex); */ +/* } */ total_return: destroy_forward(&header.forward); - destroy_forward_struct(forward_struct); + //destroy_forward_struct(forward_struct); if(rc != SLURM_SUCCESS) { error("slurm_receive_msg: %s", slurm_strerror(rc)); @@ -955,6 +957,9 @@ int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg) Buf buffer; int rc; void * auth_cred; + int count = 0; + ret_types_t *ret_type = NULL; + ListIterator itr; /* * Initialize header with Auth credential and message type. @@ -970,6 +975,37 @@ int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg) forward_init(&msg->forward, NULL); msg->ret_list = NULL; } + + /* wait for all the other messages on the tree under us */ + if(msg->forward_struct_init == FORWARD_INIT && msg->forward_struct) { + debug("looking for %d", msg->forward_struct->fwd_cnt); + slurm_mutex_lock(&msg->forward_struct->forward_mutex); + count = 0; + itr = list_iterator_create(msg->ret_list); + while((ret_type = (ret_types_t *) list_next(itr)) + != NULL) { + count += list_count(ret_type->ret_data_list); + } + list_iterator_destroy(itr); + debug("Got back %d", count); + while((count < msg->forward_struct->fwd_cnt)) { + pthread_cond_wait(&msg->forward_struct->notify, + &msg->forward_struct->forward_mutex); + count = 0; + itr = list_iterator_create(msg->ret_list); + while((ret_type = (ret_types_t *) list_next(itr)) + != NULL) { + count += list_count(ret_type->ret_data_list); + } + list_iterator_destroy(itr); + debug("Got back %d", count); + + } + debug("Got them all"); + slurm_mutex_unlock(&msg->forward_struct->forward_mutex); + destroy_forward_struct(msg->forward_struct); + } + init_header(&header, msg, SLURM_PROTOCOL_NO_FLAGS); /* @@ -1309,6 +1345,8 @@ int slurm_send_rc_msg(slurm_msg_t *msg, int rc) resp_msg.msg_type = RESPONSE_SLURM_RC; resp_msg.data = &rc_msg; resp_msg.forward = msg->forward; + resp_msg.forward_struct = msg->forward_struct; + resp_msg.forward_struct_init = msg->forward_struct_init; resp_msg.ret_list = msg->ret_list; resp_msg.orig_addr = msg->orig_addr; resp_msg.srun_node_id = msg->srun_node_id; diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index f6fe1b3df9f9c4beb3da26908887e9a309a70c3a..a8523d1fbc37f6a6936b396469e2ed51bba995b6 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -192,11 +192,6 @@ typedef struct forward { uint16_t init; /* tell me it has been set (FORWARD_INIT) */ } forward_t; -typedef struct slurm_protocol_config { - slurm_addr primary_controller; - slurm_addr secondary_controller; -} slurm_protocol_config_t; - /*core api protocol message structures */ typedef struct slurm_protocol_header { uint16_t version; @@ -210,6 +205,34 @@ typedef struct slurm_protocol_header { List ret_list; } header_t; +typedef struct forward_message { + header_t header; + char *buf; + int buf_len; + slurm_addr addr; + int timeout; + List ret_list; + pthread_mutex_t *forward_mutex; + pthread_cond_t *notify; + char node_name[MAX_SLURM_NAME]; +} forward_msg_t; + +typedef struct forward_struct { + int timeout; + uint16_t fwd_cnt; + pthread_mutex_t forward_mutex; + pthread_cond_t notify; + forward_msg_t *forward_msg; + char *buf; + int buf_len; + List ret_list; +} forward_struct_t; + +typedef struct slurm_protocol_config { + slurm_addr primary_controller; + slurm_addr secondary_controller; +} slurm_protocol_config_t; + typedef struct slurm_msg { slurm_msg_type_t msg_type; slurm_addr address; @@ -219,6 +242,8 @@ typedef struct slurm_msg { uint32_t data_size; uint32_t srun_node_id; /* node id of this node (relative to job) */ forward_t forward; + forward_struct_t *forward_struct; + uint16_t forward_struct_init; slurm_addr orig_addr; List ret_list; Buf buffer; @@ -237,28 +262,6 @@ typedef struct ret_types { List ret_data_list; } ret_types_t; -typedef struct forward_message { - header_t header; - char *buf; - int buf_len; - slurm_addr addr; - int timeout; - List ret_list; - pthread_mutex_t *forward_mutex; - pthread_cond_t *notify; - char node_name[MAX_SLURM_NAME]; -} forward_msg_t; - -typedef struct forward_struct { - int timeout; - pthread_mutex_t forward_mutex; - pthread_cond_t notify; - forward_msg_t *forward_msg; - char *buf; - int buf_len; - List ret_list; -} forward_struct_t; - /*****************************************************************************\ * Slurm Protocol Data Structures \*****************************************************************************/