diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 141b321ad6eb7b2ab0b4ddfc3ff512db45c5d074..a94868262be41c60ec00b4088d2ee4c3ab14f9e3 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -17,13 +17,24 @@ extern int errno ; /* #DEFINES */ -#define SLURM_PORT 7000 -#define PRIMARY_SLURM_CONTROLLER "pri_slrumctld.llnl.gov" -#define SECONDARY_SLURM_CONTROLLER "sec_slrumctld.llnl.gov" /***** high level routines */ /***** msg functions */ +/* In the socket implementation it creates a socket, binds to it, and listens for connections. + * In the mongo implemenetation is should just create a mongo socket , bind and return. + * slurm_address - for now it is really just a sockaddr_in + * slurm_fd - file descriptor of the connection created + */ +slurm_fd slurm_init_msg_engine_port ( uint16_t port ) +{ + slurm_addr slurm_address ; + slurm_set_addr_any ( &slurm_address , port ) ; + return _slurm_init_msg_engine ( & slurm_address ) ; +} + +/***** msg functions */ + /* In the socket implementation it creates a socket, binds to it, and listens for connections. * In the mongo implemenetation is should just create a mongo socket , bind and return. * slurm_address - for now it is really just a sockaddr_in @@ -43,6 +54,20 @@ uint32_t slurm_shutdown_msg_engine ( slurm_fd open_fd ) return _slurm_close ( open_fd ) ; } +uint32_t slurm_shutdown_msg_conn ( slurm_fd open_fd ) +{ + return _slurm_close ( open_fd ) ; +} + +/* calls connect to make a connection-less connection to the destination msg engine + * slurm_address - for now it is really just a sockaddr_in + * uint32_t - the return code + */ +uint32_t slurm_open_msg_conn ( slurm_addr * slurm_address ) +{ + return _slurm_open_msg_conn ( slurm_address ) ; +} + /***** recv msg functions */ /* * note that a memory is allocated for the returned msg and must be freed at some point @@ -61,8 +86,7 @@ uint32_t slurm_receive_msg ( slurm_fd open_fd , slurm_addr * source_address , sl uint32_t unpack_len ; uint32_t receive_len = MAX_MESSAGE_BUFFER_SIZE ; - rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) { debug ( "Error recieving msg socket: errno %i\n", errno ) ; return rc ; @@ -72,8 +96,10 @@ uint32_t slurm_receive_msg ( slurm_fd open_fd , slurm_addr * source_address , sl unpack_len = rc ; unpack_header ( & buffer , & unpack_len , & header ) ; - rc = check_header_version ( & header ) ; - if ( rc < 0 ) return rc ; + if ( (rc = check_header_version ( & header ) ) < 0 ) + { + return rc; + } /* unpack msg body */ new_msg = xmalloc ( sizeof ( slurm_msg_t ) ) ; @@ -104,13 +130,11 @@ uint32_t slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_type_t msg_typ slurm_set_addr ( & secondary_destination_address , SLURM_PORT , SECONDARY_SLURM_CONTROLLER ) ; /* try to send to primary first then secondary */ - rc = slurm_send_node_msg ( open_fd , & primary_destination_address , msg_type , msg ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( (rc = slurm_send_node_msg ( open_fd , & primary_destination_address , msg_type , msg ) ) == SLURM_SOCKET_ERROR ) { debug ( "Send message to primary controller failed" ) ; - rc = slurm_send_node_msg ( open_fd , & secondary_destination_address , msg_type , msg ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( (rc = slurm_send_node_msg ( open_fd , & secondary_destination_address , msg_type , msg ) ) == SLURM_SOCKET_ERROR ) { debug ( "Send messge to secondary controller failed" ) ; } @@ -145,11 +169,9 @@ uint32_t slurm_send_node_msg ( slurm_fd open_fd , slurm_addr * destination_addre pack_msg ( & buffer , & pack_len , msg ) ; /* send msg */ - rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ) == SLURM_SOCKET_ERROR ) { debug ( "Error sending msg socket: errno %i\n", errno ) ; - return rc ; } return rc ; } @@ -172,8 +194,7 @@ uint32_t slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , uint32_t unpack_len ; uint32_t receive_len = MAX_MESSAGE_BUFFER_SIZE ; - rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) ; { debug ( "Error recieving msg socket: errno %i\n", errno ) ; return rc ; @@ -210,13 +231,11 @@ uint32_t slurm_send_controller_buffer ( slurm_fd open_fd , slurm_msg_type_t msg_ slurm_set_addr ( & secondary_destination_address , SLURM_PORT , SECONDARY_SLURM_CONTROLLER ) ; /* try to send to primary first then secondary */ - rc = slurm_send_node_buffer ( open_fd , & primary_destination_address , msg_type , data_buffer , buf_len ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = slurm_send_node_buffer ( open_fd , & primary_destination_address , msg_type , data_buffer , buf_len ) ) == SLURM_SOCKET_ERROR ) { debug ( "Send message to primary controller failed" ) ; - rc = slurm_send_node_buffer ( open_fd , & secondary_destination_address , msg_type , data_buffer , buf_len ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = slurm_send_node_buffer ( open_fd , & secondary_destination_address , msg_type , data_buffer , buf_len ) ) == SLURM_SOCKET_ERROR ) { debug ( "Send messge to secondary controller failed" ) ; } @@ -252,11 +271,9 @@ uint32_t slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_ad memcpy ( buffer , data_buffer , buf_len ) ; pack_len -= buf_len ; - rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ) == SLURM_SOCKET_ERROR ) { debug ( "Error sending msg socket: errno %i", errno ) ; - return rc ; } return rc ; } @@ -298,6 +315,11 @@ void slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , uint32_t _slurm_set_addr_uint ( slurm_address , port , ip_address ) ; } +void slurm_set_addr_any ( slurm_addr * slurm_address , uint16_t port ) +{ + _slurm_set_addr_uint ( slurm_address , port , SLURM_INADDR_ANY ) ; +} + void slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) { _slurm_set_addr ( slurm_address , port , host ) ; diff --git a/src/common/slurm_protocol_api.h b/src/common/slurm_protocol_api.h index 6a72830c6ccba1600c958967efe17f2ba9f102fe..68ae622cdf26ab6b69b34edf8fbb7071cc09384d 100644 --- a/src/common/slurm_protocol_api.h +++ b/src/common/slurm_protocol_api.h @@ -20,13 +20,19 @@ #include <src/common/slurm_protocol_util.h> #include <src/common/slurm_protocol_defs.h> +#define SLURM_PORT 7000 +#define PRIMARY_SLURM_CONTROLLER "pri_slrumctld.llnl.gov" +#define SECONDARY_SLURM_CONTROLLER "sec_slrumctld.llnl.gov" + //WHAT ABOUT THESE INCLUDES /* high level routines */ /* msg functions */ +slurm_fd slurm_init_msg_engine_port ( uint16_t port ) ; slurm_fd slurm_init_msg_engine ( slurm_addr * slurm_address ) ; uint32_t slurm_receive_msg ( slurm_fd open_fd , slurm_addr * source_address , slurm_msg_t ** msg ) ; uint32_t slurm_shutdown_msg_engine ( slurm_fd open_fd ) ; +uint32_t slurm_shutdown_msg_conn ( slurm_fd open_fd ) ; /* send msg functions */ @@ -55,6 +61,7 @@ uint32_t slurm_send_node_msg ( slurm_fd open_fd , slurm_addr * slurm_address , s void slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , uint32_t ip_address ) ; void slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ; +void slurm_set_addr_any ( slurm_addr * slurm_address , uint16_t port ) ; void slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ; void slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ; #endif diff --git a/src/common/slurm_protocol_common.h b/src/common/slurm_protocol_common.h index 16c5a6936c658080ef25a1a153008677c21a4fd6..bd20be8b876aed0438617de117709f30ce06f102 100644 --- a/src/common/slurm_protocol_common.h +++ b/src/common/slurm_protocol_common.h @@ -16,6 +16,7 @@ #define AF_SLURM AF_INET +#define SLURM_INADDR_ANY 0x00000000 /* LINUX SPECIFIC */ /* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */ @@ -32,6 +33,8 @@ typedef struct { /* SLURM datatypes */ /* this is a custom data type to describe the slurm msg type type that is placed in the slurm protocol header * while just an short now, it may change in the future */ -typedef uint16_t slurm_msg_type_t ; +/* Now defined in ../../src/common/slurm_protocol_defs.h + * typedef uint16_t slurm_msg_type_t ; + */ #endif diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 89475e31aa2a02374d34113c7e33ec662e2f9695..816adb4146c666067992cdc6525fea86b022960f 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -44,6 +44,60 @@ /* SLURM Message types */ +typedef enum { + REQUEST_NODE_REGISRATION_STATUS = 1, + MESSAGE_NODE_REGISRATION_STATUS = 2, + REQUEST_RESOURCE_ALLOCATION = 3, + RESPONSE_RESOURCE_ALLOCATION = 4, + REQUEST_CANCEL_JOB = 5, + REQUEST_RECONFIGURE = 6, + RESPONSE_CANCEL_JOB = 7, + REQUEST_JOB_INFO = 8, + RESPONSE_JOB_INFO = 9, + REQUEST_JOB_STEP_INFO = 10, + RESPONSE_JOB_STEP_INFO = 11, + REQUEST_NODE_INFO = 12, + RESPONSE_NODE_INFO = 13, + REQUEST_PARTITION_INFO = 14, + RESPONSE_PARTITION_INFO = 15, + REQUEST_ACCTING_INFO = 16, + RESPONSE_ACCOUNTING_INFO = 17, + REQUEST_BUILD_INFO = 18, + RESPONSE_BUILD_INFO = 19, + MESSAGE_UPLOAD_ACCOUNTING_INFO = 20, + RESPONSE_RECONFIGURE = 21, + REQUEST_SUBMIT_BATCH_JOB = 22, + RESPONSE_SUBMIT_BATCH_JOB = 23, + REQUEST_CANCEL_JOB_STEP = 24, + RESPONSE_CANCEL_JOB_STEP = 25, + REQUEST_SIGNAL_JOB = 26, + RESPONSE_SIGNAL_JOB = 27, + REQUEST_SIGNAL_JOB_STEP = 28, + RESPONSE_SIGNAL_JOB_STEP = 29, + REQUEST_BATCH_JOB_LAUNCH = 30, + RESPONSE_BATCH_JOB_LAUNCH = 31, + MESSAGE_TASK_EXIT = 32, + MESSAGE_REVOKE_JOB_CREDENTIAL = 33, + REQUEST_LAUNCH_TASKS = 34, + REQUEST_CREATE_JOB_STEP = 35, + RESPONSE_CREATE_JOB_STEP = 36, + REQUEST_RUN_JOB_STEP = 37, + RESPONSE_RUN_JOB_STEP = 38, + REQUEST_JOB_ATTACH = 39, + RESPONSE_JOB_ATTACH = 40, + RESPONSE_LAUNCH_TASKS = 41, + REQUEST_GET_KEY = 42, + RESPONSE_GET_KEY = 43, + REQUEST_GET_JOB_STEP_INFO = 44, + RESPONSE_GET_JOB_STEP_INFO = 45, + REQUEST_JOB_RESOURCE = 46, + RESPONSE_JOB_RESOURCE = 47 + /* + REQUEST_RUN_JOB_STEP = 48, + RESPONSE_RUN_JOB_STEP = 49 + */ +} slurm_msg_type_t ; + #define REQUEST_NODE_REGISRATION_STATUS 1 #define MESSAGE_NODE_REGISRATION_STATUS 2 #define REQUEST_RESOURCE_ALLOCATION 3 @@ -58,7 +112,7 @@ #define REQUEST_NODE_INFO 12 #define RESPONSE_NODE_INFO 13 #define REQUEST_PARTITION_INFO 14 -#define RESPONSE_JOB_INFO 15 +#define RESPONSE_PATITION_INFO 15 #define REQUEST_ACCTING_INFO 16 #define RESPONSE_ACCOUNTING_INFO 17 #define REQUEST_BUILD_INFO 18 @@ -91,8 +145,6 @@ #define RESPONSE_GET_JOB_STEP_INFO 45 #define REQUEST_JOB_RESOURCE 46 #define RESPONSE_JOB_RESOURCE 47 -#define REQUEST_RUN_JOB_STEP 48 -#define RESPONSE_RUN_JOB_STEP 49 typedef struct slurm_protocol_header { diff --git a/src/common/slurm_protocol_interface.h b/src/common/slurm_protocol_interface.h index fde65ce4865be4236323321db6daf416082ea6e6..dfe989face019a320a49705f2d161885aeda1842 100644 --- a/src/common/slurm_protocol_interface.h +++ b/src/common/slurm_protocol_interface.h @@ -66,6 +66,7 @@ typedef enum slurm_socket_type { SLURM_MESSAGE , SLURM_STREAM } slurm_socket_typ int _slurm_create_socket (slurm_socket_type_t type) ; /* msg functions */ uint32_t _slurm_init_msg_engine ( slurm_addr * slurm_address ) ; +uint32_t _slurm_open_msg_conn ( slurm_addr * slurm_address ) ; ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ; ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ; /* uint32_t _slurm_shutdown_msg_engine ( slurm_fd open_fd ) ; */ diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index 497ad8051739b5b8532ecbfd4f3cd3832a7fef6d..01ddb9d2e24c9a43191c43d4f3eda4b1baad8c8a 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -29,6 +29,11 @@ uint32_t _slurm_init_msg_engine ( slurm_addr * slurm_address ) return _slurm_listen_stream ( slurm_address ) ; } +uint32_t _slurm_open_msg_conn ( slurm_addr * slurm_address ) +{ + return _slurm_listen_stream ( slurm_address ) ; +} + ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) { slurm_fd connection_fd ; @@ -41,15 +46,13 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin uint32_t total_len ; - connection_fd = _slurm_accept_stream ( open_fd , slurm_address ) ; - if ( connection_fd == SLURM_SOCKET_ERROR ) + if ( ( connection_fd = _slurm_accept_stream ( open_fd , slurm_address ) ) == SLURM_SOCKET_ERROR ) { debug ( "Error opening stream socket to receive msg datagram emulation layeri\n" ) ; return connection_fd ; } - recv_len = _slurm_recv ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ; - if ( recv_len != sizeof ( uint32_t ) ) + if ( ( recv_len = _slurm_recv ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) ) { debug ( "Error receiving legth of datagram. Total Bytes Sent %i \n", recv_len ) ; } @@ -58,8 +61,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin total_len = 0 ; while ( total_len < transmit_size ) { - recv_len = _slurm_recv ( connection_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ; - if ( recv_len == SLURM_SOCKET_ERROR ) + if ( ( recv_len = _slurm_recv ( connection_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) { debug ( "Error receiving legth of datagram. errno %i \n", errno ) ; return recv_len ; @@ -85,14 +87,12 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3 pack32 ( size , ( void ** ) & size_buffer , & size_size ) ; - connection_fd = _slurm_open_stream ( slurm_address ) ; - if ( connection_fd == SLURM_SOCKET_ERROR ) + if ( ( connection_fd = _slurm_open_stream ( slurm_address ) ) ==SLURM_SOCKET_ERROR ) { debug ( "Error opening stream socket to send msg datagram emulation layer\n" ) ; return connection_fd ; } - send_len = _slurm_send ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ; - if ( send_len != sizeof ( uint32_t ) ) + if ( ( send_len = _slurm_send ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) ) { debug ( "Error sending length of datagram\n" ) ; } @@ -116,26 +116,19 @@ uint32_t _slurm_listen_stream ( slurm_addr * slurm_address ) { uint32_t rc ; slurm_fd connection_fd ; - rc =_slurm_create_socket ( SLURM_STREAM ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) == SLURM_SOCKET_ERROR ) { debug( "Error creating slurm stream socket: errno %i\n", errno ) ; - return rc ; - } - else - { - connection_fd = rc ; + return connection_fd ; } - rc = _slurm_bind ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_bind ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR ) { debug( "Error binding slurm stream socket: errno %i\n" , errno ) ; return rc ; } - rc = _slurm_listen ( connection_fd , DEFAULT_LISTEN_BACKLOG ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_listen ( connection_fd , DEFAULT_LISTEN_BACKLOG ) ) == SLURM_SOCKET_ERROR ) { debug( "Error listening on slurm stream socket: errno %i\n" , errno ) ; return rc ; @@ -146,18 +139,11 @@ uint32_t _slurm_listen_stream ( slurm_addr * slurm_address ) uint32_t _slurm_accept_stream ( slurm_fd open_fd , slurm_addr * slurm_address ) { - uint32_t rc ; uint32_t addr_len = sizeof ( slurm_addr ) ; slurm_fd connection_fd ; - rc =_slurm_accept ( open_fd , ( struct sockaddr * ) slurm_address , & addr_len ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( connection_fd = _slurm_accept ( open_fd , ( struct sockaddr * ) slurm_address , & addr_len ) ) == SLURM_SOCKET_ERROR ) { debug( "Error accepting slurm stream socket: errno %i\n", errno ) ; - return rc ; - } - else - { - connection_fd = rc ; } return connection_fd ; @@ -167,19 +153,13 @@ uint32_t _slurm_open_stream ( slurm_addr * slurm_address ) { uint32_t rc ; slurm_fd connection_fd ; - rc =_slurm_create_socket ( SLURM_STREAM ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) == SLURM_SOCKET_ERROR ) { debug( "Error creating slurm stream socket: errno %i\n", errno ) ; - return rc ; - } - else - { - connection_fd = rc ; + return connection_fd ; } - rc = _slurm_connect ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ; - if ( rc == SLURM_SOCKET_ERROR ) + if ( ( rc = _slurm_connect ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR ) { debug( "Error listening on slurm stream socket: errno %i\n" , errno ) ; return rc ;