Skip to content
Snippets Groups Projects
Commit 8cd2194c authored by Artem Polyakov's avatar Artem Polyakov Committed by David Bigagli
Browse files

Fix EIO: try to process message right in the place if possible instead of...

Fix EIO: try to process message right in the place if possible instead of always switching to the AIO event handling.
parent dc54e68d
No related branches found
No related tags found
No related merge requests found
...@@ -81,6 +81,7 @@ static int _recv_unpack_hdr(void *net, void *host); ...@@ -81,6 +81,7 @@ static int _recv_unpack_hdr(void *net, void *host);
static bool _serv_readable(eio_obj_t *obj); static bool _serv_readable(eio_obj_t *obj);
static int _serv_read(eio_obj_t *obj, List objs); static int _serv_read(eio_obj_t *obj, List objs);
static void _process_server_request(recv_header_t *_hdr, void *payload); static void _process_server_request(recv_header_t *_hdr, void *payload);
static int _process_message(pmixp_io_engine_t *me);
static struct io_operations peer_ops = { static struct io_operations peer_ops = {
.readable = _serv_readable, .readable = _serv_readable,
...@@ -190,6 +191,29 @@ int pmixp_stepd_finalize(void) ...@@ -190,6 +191,29 @@ int pmixp_stepd_finalize(void)
return SLURM_SUCCESS; return SLURM_SUCCESS;
} }
/* this routine tries to complete message processing on message
* engine (me). Return value:
* - 0: no progress was observed on the descriptor
* - 1: one more message was successfuly processed
* - 2: all messages are completed
*/
static int _process_message(pmixp_io_engine_t *me)
{
int ret = 0;
pmix_io_rcvd(me);
if (pmix_io_rcvd_ready(me)) {
recv_header_t hdr;
void *msg = pmix_io_rcvd_extract(me, &hdr);
_process_server_request(&hdr, msg);
ret = 1;
}
if (pmix_io_finalized(me)) {
ret = 2;
}
return ret;
}
/* /*
* TODO: we need to keep track of the "me" * TODO: we need to keep track of the "me"
* structures created here, because we need to * structures created here, because we need to
...@@ -211,10 +235,14 @@ void pmix_server_new_conn(int fd) ...@@ -211,10 +235,14 @@ void pmix_server_new_conn(int fd)
*/ */
pmix_io_rcvd_padding(me, sizeof(uint32_t)); pmix_io_rcvd_padding(me, sizeof(uint32_t));
/* TODO: in future try to process the request right here if( 2 == _process_message(me) ){
* use eio only in case of blocking operation /* connection was fully processed here */
* NOW: always defer to debug the blocking case xfree(me);
*/ return;
}
/* If it is a blocking operation: create AIO object to
* handle it */
obj = eio_obj_create(fd, &peer_ops, (void *)me); obj = eio_obj_create(fd, &peer_ops, (void *)me);
eio_new_obj(pmixp_info_io(), obj); eio_new_obj(pmixp_info_io(), obj);
} }
...@@ -389,27 +417,24 @@ static void _process_server_request(recv_header_t *_hdr, void *payload) ...@@ -389,27 +417,24 @@ static void _process_server_request(recv_header_t *_hdr, void *payload)
static int _serv_read(eio_obj_t *obj, List objs) static int _serv_read(eio_obj_t *obj, List objs)
{ {
PMIXP_DEBUG("fd = %d", obj->fd); PMIXP_DEBUG("fd = %d", obj->fd);
pmixp_io_engine_t *me = (pmixp_io_engine_t *)obj->arg; pmixp_io_engine_t *me = (pmixp_io_engine_t *)obj->arg;
bool proceed = true;
pmixp_debug_hang(0); pmixp_debug_hang(0);
/* Read and process all received messages */ /* Read and process all received messages */
while (1) { while (proceed) {
pmix_io_rcvd(me); switch( _process_message(me) ){
if (pmix_io_finalized(me)) { case 2:
obj->shutdown = true; obj->shutdown = true;
PMIXP_DEBUG("Connection finalized fd = %d", obj->fd); PMIXP_DEBUG("Connection finalized fd = %d", obj->fd);
/* cleanup after this connection */
eio_remove_obj(obj, objs); eio_remove_obj(obj, objs);
return 0; xfree(me);
} case 0:
if (pmix_io_rcvd_ready(me)) { proceed = 0;
recv_header_t hdr; case 1:
void *msg = pmix_io_rcvd_extract(me, &hdr);
_process_server_request(&hdr, msg);
} else {
/* No more complete messages */
break; break;
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment