Skip to content
Snippets Groups Projects
Commit 679d4bfb authored by Moe Jette's avatar Moe Jette
Browse files

some clean-up and work on KVS return receipt.

parent c8e9feb1
No related branches found
No related tags found
No related merge requests found
...@@ -139,28 +139,43 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, ...@@ -139,28 +139,43 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr,
} }
/* get the message after all tasks reach the barrier */ /* get the message after all tasks reach the barrier */
info("waiting for msg on port %u", port);
srun_fd = slurm_accept_msg_conn(pmi_fd, &srun_addr); srun_fd = slurm_accept_msg_conn(pmi_fd, &srun_addr);
if (srun_fd < 0) { if (srun_fd < 0) {
error("slurm_accept_msg_conn: %m"); error("slurm_accept_msg_conn: %m");
return errno; return errno;
} }
again: if (slurm_receive_msg(srun_fd, &msg_rcv, 0) < 0) { while (slurm_receive_msg(srun_fd, &msg_rcv, 0) < 0) {
if (errno == EINTR) if (errno == EINTR)
goto again; continue;
error("slurm_receive_msg: %m"); error("slurm_receive_msg: %m");
return errno; return errno;
} }
info("got msg"); msg_rcv.conn_fd = srun_fd;
if (msg_rcv.msg_type != PMI_KVS_GET_RESP) { if (msg_rcv.msg_type != PMI_KVS_GET_RESP) {
error("slurm_get_kvs_comm_set msg_type=%d", msg_rcv.msg_type); error("slurm_get_kvs_comm_set msg_type=%d", msg_rcv.msg_type);
return SLURM_UNEXPECTED_MSG_ERROR; return SLURM_UNEXPECTED_MSG_ERROR;
} }
slurm_send_rc_msg(&msg_rcv, SLURM_SUCCESS); #if 0
info("sent reply"); info("send kvs confirmation @ %ld", (long)time(NULL));
if (slurm_send_rc_msg(&msg_rcv, SLURM_SUCCESS) < 0)
error("slurm_send_rc_msg: %m");
info("sent kvs confirmation @ %ld", (long)time(NULL));
#endif
#if 0
info("send kvs confirmation @ %ld", (long)time(NULL));
{
return_code_msg_t rc_msg;
rc_msg.return_code = SLURM_SUCCESS;
msg_send.msg_type = RESPONSE_SLURM_RC;
msg_send.address = msg_rcv.address;
msg_send.data = &rc_msg;
if (slurm_send_node_msg(srun_fd, &msg_send) < 0)
error("slurm_send_node_msg");
}
info("sent kvs confirmation @ %ld", (long)time(NULL));
#endif
slurm_close_accepted_conn(srun_fd); slurm_close_accepted_conn(srun_fd);
*kvs_set_ptr = msg_rcv.data; *kvs_set_ptr = msg_rcv.data;
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
......
...@@ -96,17 +96,20 @@ static void _kvs_xmit_tasks(void) ...@@ -96,17 +96,20 @@ static void _kvs_xmit_tasks(void)
/* Spawn a pthread to transmit it */ /* Spawn a pthread to transmit it */
slurm_attr_init(&attr); slurm_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
// if (pthread_create(&agent_id, &attr, _agent, (void *) &args)) #if 0
// fatal("pthread_create");
/* FIXME: signaling problem if pthread */ /* FIXME: signaling problem if pthread */
if (pthread_create(&agent_id, &attr, _agent, (void *) &args))
fatal("pthread_create");
#else
_agent((void *) &args); _agent((void *) &args);
#endif
} }
static void *_agent(void *x) static void *_agent(void *x)
{ {
struct agent_arg *args = (struct agent_arg *) x; struct agent_arg *args = (struct agent_arg *) x;
struct kvs_comm_set kvs_set; struct kvs_comm_set kvs_set;
int i, j, rc; int i, j, rc, task_fd;
slurm_msg_t msg_send, msg_rcv; slurm_msg_t msg_send, msg_rcv;
/* send the message */ /* send the message */
...@@ -121,23 +124,27 @@ static void *_agent(void *x) ...@@ -121,23 +124,27 @@ static void *_agent(void *x)
slurm_set_addr(&msg_send.address, slurm_set_addr(&msg_send.address,
args->barrier_xmit_ptr[i].port, args->barrier_xmit_ptr[i].port,
args->barrier_xmit_ptr[i].hostname); args->barrier_xmit_ptr[i].hostname);
//if (slurm_send_recv_node_msg(&msg_send, &msg_rcv, 0) < 0) { if ((task_fd = slurm_open_msg_conn(&msg_send.address)) < 0) {
if (slurm_send_only_node_msg(&msg_send) < 0) { error("slurm_init_msg_engine_port: %m");
error("KVS_Barrier reply fail to %s","TEST", break;
}
rc = slurm_send_node_msg(task_fd, &msg_send);
if (rc < 0) {
error("KVS_Barrier send data fail to %s",
args->barrier_xmit_ptr[i].hostname); args->barrier_xmit_ptr[i].hostname);
(void) slurm_shutdown_msg_conn(task_fd);
continue; continue;
} }
continue; #if 0
/* FIXME: timing problem waiting for reply */ info("get reply @ %ld", (long)time(NULL));
if (msg_rcv.msg_type != RESPONSE_SLURM_RC) { rc = slurm_receive_msg(task_fd, &msg_rcv, 0);
error("KVS_Barrier msg reply type %d bad", info("got reply, rc=%d @ %ld", rc, (long)time(NULL));
msg_rcv.msg_type); #endif
continue; (void) slurm_shutdown_msg_conn(task_fd);
if (rc < 0) {
error("KVS_Barrier confirm fail from %s",
args->barrier_xmit_ptr[i].hostname);
} }
rc = ((return_code_msg_t *) msg_rcv.data)->return_code;
slurm_free_return_code_msg((return_code_msg_t *) msg_rcv.data);
if (rc != SLURM_SUCCESS)
error("KVS_Barrier rc=%d", rc);
} }
/* Release allocated memory */ /* Release allocated memory */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment