diff --git a/src/api/slurm_pmi.c b/src/api/slurm_pmi.c index acb4eb085aa1c8700da63d37049a565c528217ac..04a7bd57cd63cbd6bd36d4224f8eeb5fe3613968 100644 --- a/src/api/slurm_pmi.c +++ b/src/api/slurm_pmi.c @@ -139,28 +139,43 @@ int slurm_get_kvs_comm_set(struct kvs_comm_set **kvs_set_ptr, } /* get the message after all tasks reach the barrier */ -info("waiting for msg on port %u", port); srun_fd = slurm_accept_msg_conn(pmi_fd, &srun_addr); if (srun_fd < 0) { error("slurm_accept_msg_conn: %m"); return errno; } -again: if (slurm_receive_msg(srun_fd, &msg_rcv, 0) < 0) { + while (slurm_receive_msg(srun_fd, &msg_rcv, 0) < 0) { if (errno == EINTR) - goto again; + continue; error("slurm_receive_msg: %m"); return errno; } -info("got msg"); + msg_rcv.conn_fd = srun_fd; if (msg_rcv.msg_type != PMI_KVS_GET_RESP) { error("slurm_get_kvs_comm_set msg_type=%d", msg_rcv.msg_type); return SLURM_UNEXPECTED_MSG_ERROR; } - slurm_send_rc_msg(&msg_rcv, SLURM_SUCCESS); -info("sent reply"); +#if 0 +info("send kvs confirmation @ %ld", (long)time(NULL)); + if (slurm_send_rc_msg(&msg_rcv, SLURM_SUCCESS) < 0) + error("slurm_send_rc_msg: %m"); +info("sent kvs confirmation @ %ld", (long)time(NULL)); +#endif +#if 0 +info("send kvs confirmation @ %ld", (long)time(NULL)); +{ + return_code_msg_t rc_msg; + rc_msg.return_code = SLURM_SUCCESS; + msg_send.msg_type = RESPONSE_SLURM_RC; + msg_send.address = msg_rcv.address; + msg_send.data = &rc_msg; + if (slurm_send_node_msg(srun_fd, &msg_send) < 0) + error("slurm_send_node_msg"); +} +info("sent kvs confirmation @ %ld", (long)time(NULL)); +#endif slurm_close_accepted_conn(srun_fd); *kvs_set_ptr = msg_rcv.data; - return SLURM_SUCCESS; } diff --git a/src/srun/pmi.c b/src/srun/pmi.c index 4e3258e8e6668cd0cccaa1f0e0d26f57a52e8126..864b47b5da28d58de825366ee5810f46b2dd8c6e 100644 --- a/src/srun/pmi.c +++ b/src/srun/pmi.c @@ -96,17 +96,20 @@ static void _kvs_xmit_tasks(void) /* Spawn a pthread to transmit it */ slurm_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); -// if (pthread_create(&agent_id, &attr, _agent, (void *) &args)) -// fatal("pthread_create"); +#if 0 /* FIXME: signaling problem if pthread */ + if (pthread_create(&agent_id, &attr, _agent, (void *) &args)) + fatal("pthread_create"); +#else _agent((void *) &args); +#endif } static void *_agent(void *x) { struct agent_arg *args = (struct agent_arg *) x; struct kvs_comm_set kvs_set; - int i, j, rc; + int i, j, rc, task_fd; slurm_msg_t msg_send, msg_rcv; /* send the message */ @@ -121,23 +124,27 @@ static void *_agent(void *x) slurm_set_addr(&msg_send.address, args->barrier_xmit_ptr[i].port, args->barrier_xmit_ptr[i].hostname); - //if (slurm_send_recv_node_msg(&msg_send, &msg_rcv, 0) < 0) { - if (slurm_send_only_node_msg(&msg_send) < 0) { - error("KVS_Barrier reply fail to %s","TEST", + if ((task_fd = slurm_open_msg_conn(&msg_send.address)) < 0) { + error("slurm_init_msg_engine_port: %m"); + break; + } + rc = slurm_send_node_msg(task_fd, &msg_send); + if (rc < 0) { + error("KVS_Barrier send data fail to %s", args->barrier_xmit_ptr[i].hostname); + (void) slurm_shutdown_msg_conn(task_fd); continue; } -continue; -/* FIXME: timing problem waiting for reply */ - if (msg_rcv.msg_type != RESPONSE_SLURM_RC) { - error("KVS_Barrier msg reply type %d bad", - msg_rcv.msg_type); - continue; +#if 0 +info("get reply @ %ld", (long)time(NULL)); + rc = slurm_receive_msg(task_fd, &msg_rcv, 0); +info("got reply, rc=%d @ %ld", rc, (long)time(NULL)); +#endif + (void) slurm_shutdown_msg_conn(task_fd); + if (rc < 0) { + error("KVS_Barrier confirm fail from %s", + args->barrier_xmit_ptr[i].hostname); } - rc = ((return_code_msg_t *) msg_rcv.data)->return_code; - slurm_free_return_code_msg((return_code_msg_t *) msg_rcv.data); - if (rc != SLURM_SUCCESS) - error("KVS_Barrier rc=%d", rc); } /* Release allocated memory */