diff --git a/src/plugins/mpi/pmix/pmixp_server.c b/src/plugins/mpi/pmix/pmixp_server.c index ead8cff492529a1ec9ee7e165b4e1c06b4c2e480..bb0201cf5fe13ba1facee60f8a06d0c4e5d0c95e 100644 --- a/src/plugins/mpi/pmix/pmixp_server.c +++ b/src/plugins/mpi/pmix/pmixp_server.c @@ -81,6 +81,7 @@ static int _recv_unpack_hdr(void *net, void *host); static bool _serv_readable(eio_obj_t *obj); static int _serv_read(eio_obj_t *obj, List objs); static void _process_server_request(recv_header_t *_hdr, void *payload); +static int _process_message(pmixp_io_engine_t *me); static struct io_operations peer_ops = { .readable = _serv_readable, @@ -190,6 +191,29 @@ int pmixp_stepd_finalize(void) return SLURM_SUCCESS; } +/* this routine tries to complete message processing on message + * engine (me). Return value: + * - 0: no progress was observed on the descriptor + * - 1: one more message was successfuly processed + * - 2: all messages are completed + */ +static int _process_message(pmixp_io_engine_t *me) +{ + int ret = 0; + pmix_io_rcvd(me); + if (pmix_io_rcvd_ready(me)) { + recv_header_t hdr; + void *msg = pmix_io_rcvd_extract(me, &hdr); + _process_server_request(&hdr, msg); + ret = 1; + } + if (pmix_io_finalized(me)) { + ret = 2; + } + return ret; +} + + /* * TODO: we need to keep track of the "me" * structures created here, because we need to @@ -211,10 +235,14 @@ void pmix_server_new_conn(int fd) */ pmix_io_rcvd_padding(me, sizeof(uint32_t)); - /* TODO: in future try to process the request right here - * use eio only in case of blocking operation - * NOW: always defer to debug the blocking case - */ + if( 2 == _process_message(me) ){ + /* connection was fully processed here */ + xfree(me); + return; + } + + /* If it is a blocking operation: create AIO object to + * handle it */ obj = eio_obj_create(fd, &peer_ops, (void *)me); eio_new_obj(pmixp_info_io(), obj); } @@ -389,27 +417,24 @@ static void _process_server_request(recv_header_t *_hdr, void *payload) static int _serv_read(eio_obj_t *obj, List objs) { - PMIXP_DEBUG("fd = %d", obj->fd); pmixp_io_engine_t *me = (pmixp_io_engine_t *)obj->arg; + bool proceed = true; pmixp_debug_hang(0); /* Read and process all received messages */ - while (1) { - pmix_io_rcvd(me); - if (pmix_io_finalized(me)) { + while (proceed) { + switch( _process_message(me) ){ + case 2: obj->shutdown = true; PMIXP_DEBUG("Connection finalized fd = %d", obj->fd); + /* cleanup after this connection */ eio_remove_obj(obj, objs); - return 0; - } - if (pmix_io_rcvd_ready(me)) { - recv_header_t hdr; - void *msg = pmix_io_rcvd_extract(me, &hdr); - _process_server_request(&hdr, msg); - } else { - /* No more complete messages */ + xfree(me); + case 0: + proceed = 0; + case 1: break; } }