Skip to content
Snippets Groups Projects
Commit 510780bd authored by Moe Jette's avatar Moe Jette
Browse files

Update code to check reply to KVS xmit for errors.

parent 06a63ce7
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,9 @@
#include "src/common/xstring.h"
#include "src/common/xmalloc.h"
#define _DEBUG 0
#define _DEBUG 0 /* non-zero for extra KVS logging */
#define MSG_RE_TRANSMIT 1 /* re-transmit KVS messages this number of times */
#define MSG_PARALLELISM 50 /* count of simultaneous KVS message threads */
/* Global variables */
pthread_mutex_t kvs_mutex = PTHREAD_MUTEX_INITIALIZER;
......@@ -49,17 +51,18 @@ struct kvs_comm **kvs_comm_ptr = NULL;
struct barrier_resp {
uint16_t port;
char *hostname;
};
}; /* details for barrier task communcations */
struct barrier_resp *barrier_ptr = NULL;
uint16_t barrier_resp_cnt = 0;
uint16_t barrier_cnt = 0;
uint16_t barrier_resp_cnt = 0; /* tasks having reached barrier */
uint16_t barrier_cnt = 0; /* tasks needing to reach barrier */
struct agent_arg {
struct barrier_resp *barrier_xmit_ptr;
int barrier_xmit_cnt;
struct kvs_comm **kvs_xmit_ptr;
int kvs_xmit_cnt;
};
}; /* details for message agent manager */
int agent_cnt = 0; /* number of active message agents */
static void *_agent(void *x);
static struct kvs_comm *_find_kvs_by_name(char *name);
......@@ -106,14 +109,17 @@ static void *_agent(void *x)
struct agent_arg *args = (struct agent_arg *) x;
struct kvs_comm_set kvs_set;
int i, j, rc, task_fd;
int success_xmit = 0, retries = 0;
slurm_msg_t msg_send, msg_rcv;
/* send the message */
/* send the messages */
kvs_set.kvs_comm_recs = args->kvs_xmit_cnt;
kvs_set.kvs_comm_ptr = args->kvs_xmit_ptr;
msg_send.msg_type = PMI_KVS_GET_RESP;
msg_send.data = (void *) &kvs_set;
for (i=0; i<args->barrier_xmit_cnt; i++) {
if (args->barrier_xmit_ptr[i].port == 0)
continue;
debug2("KVS_Barrier msg to %s:%u",
args->barrier_xmit_ptr[i].hostname,
args->barrier_xmit_ptr[i].port);
......@@ -136,8 +142,26 @@ static void *_agent(void *x)
if (rc < 0) {
error("KVS_Barrier confirm fail from %s",
args->barrier_xmit_ptr[i].hostname);
continue;
}
if (msg_rcv.msg_type != RESPONSE_SLURM_RC) {
error("KVS_Barrier confirm type %d from %s",
msg_rcv.msg_type,
args->barrier_xmit_ptr[i].hostname);
continue;
}
rc = ((return_code_msg_t *) msg_rcv.data)->return_code;
slurm_free_return_code_msg((return_code_msg_t *) msg_rcv.data);
if (rc != SLURM_SUCCESS) {
error("KVS_Barrier confirm from %s, rc=%d",
args->barrier_xmit_ptr[i].hostname, rc);
} else {
/* successfully transmitted KVS keypairs */
args->barrier_xmit_ptr[i].port = 0;
success_xmit++;
}
}
/* Release allocated memory */
for (i=0; i<args->barrier_xmit_cnt; i++)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment