Skip to content
Snippets Groups Projects
Commit 821a9e87 authored by tewk's avatar tewk
Browse files

code clean up.

Added a few extra helper funtions
parent c16e3dcf
No related branches found
No related tags found
No related merge requests found
......@@ -17,13 +17,24 @@ extern int errno ;
/* #DEFINES */
#define SLURM_PORT 7000
#define PRIMARY_SLURM_CONTROLLER "pri_slrumctld.llnl.gov"
#define SECONDARY_SLURM_CONTROLLER "sec_slrumctld.llnl.gov"
/***** high level routines */
/***** msg functions */
/* In the socket implementation it creates a socket, binds to it, and listens for connections.
* In the mongo implemenetation is should just create a mongo socket , bind and return.
* slurm_address - for now it is really just a sockaddr_in
* slurm_fd - file descriptor of the connection created
*/
slurm_fd slurm_init_msg_engine_port ( uint16_t port )
{
slurm_addr slurm_address ;
slurm_set_addr_any ( &slurm_address , port ) ;
return _slurm_init_msg_engine ( & slurm_address ) ;
}
/***** msg functions */
/* In the socket implementation it creates a socket, binds to it, and listens for connections.
* In the mongo implemenetation is should just create a mongo socket , bind and return.
* slurm_address - for now it is really just a sockaddr_in
......@@ -43,6 +54,20 @@ uint32_t slurm_shutdown_msg_engine ( slurm_fd open_fd )
return _slurm_close ( open_fd ) ;
}
uint32_t slurm_shutdown_msg_conn ( slurm_fd open_fd )
{
return _slurm_close ( open_fd ) ;
}
/* calls connect to make a connection-less connection to the destination msg engine
* slurm_address - for now it is really just a sockaddr_in
* uint32_t - the return code
*/
uint32_t slurm_open_msg_conn ( slurm_addr * slurm_address )
{
return _slurm_open_msg_conn ( slurm_address ) ;
}
/***** recv msg functions */
/*
* note that a memory is allocated for the returned msg and must be freed at some point
......@@ -61,8 +86,7 @@ uint32_t slurm_receive_msg ( slurm_fd open_fd , slurm_addr * source_address , sl
uint32_t unpack_len ;
uint32_t receive_len = MAX_MESSAGE_BUFFER_SIZE ;
rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error recieving msg socket: errno %i\n", errno ) ;
return rc ;
......@@ -72,8 +96,10 @@ uint32_t slurm_receive_msg ( slurm_fd open_fd , slurm_addr * source_address , sl
unpack_len = rc ;
unpack_header ( & buffer , & unpack_len , & header ) ;
rc = check_header_version ( & header ) ;
if ( rc < 0 ) return rc ;
if ( (rc = check_header_version ( & header ) ) < 0 )
{
return rc;
}
/* unpack msg body */
new_msg = xmalloc ( sizeof ( slurm_msg_t ) ) ;
......@@ -104,13 +130,11 @@ uint32_t slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_type_t msg_typ
slurm_set_addr ( & secondary_destination_address , SLURM_PORT , SECONDARY_SLURM_CONTROLLER ) ;
/* try to send to primary first then secondary */
rc = slurm_send_node_msg ( open_fd , & primary_destination_address , msg_type , msg ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( (rc = slurm_send_node_msg ( open_fd , & primary_destination_address , msg_type , msg ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Send message to primary controller failed" ) ;
rc = slurm_send_node_msg ( open_fd , & secondary_destination_address , msg_type , msg ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( (rc = slurm_send_node_msg ( open_fd , & secondary_destination_address , msg_type , msg ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Send messge to secondary controller failed" ) ;
}
......@@ -145,11 +169,9 @@ uint32_t slurm_send_node_msg ( slurm_fd open_fd , slurm_addr * destination_addre
pack_msg ( & buffer , & pack_len , msg ) ;
/* send msg */
rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error sending msg socket: errno %i\n", errno ) ;
return rc ;
}
return rc ;
}
......@@ -172,8 +194,7 @@ uint32_t slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address ,
uint32_t unpack_len ;
uint32_t receive_len = MAX_MESSAGE_BUFFER_SIZE ;
rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) ;
{
debug ( "Error recieving msg socket: errno %i\n", errno ) ;
return rc ;
......@@ -210,13 +231,11 @@ uint32_t slurm_send_controller_buffer ( slurm_fd open_fd , slurm_msg_type_t msg_
slurm_set_addr ( & secondary_destination_address , SLURM_PORT , SECONDARY_SLURM_CONTROLLER ) ;
/* try to send to primary first then secondary */
rc = slurm_send_node_buffer ( open_fd , & primary_destination_address , msg_type , data_buffer , buf_len ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = slurm_send_node_buffer ( open_fd , & primary_destination_address , msg_type , data_buffer , buf_len ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Send message to primary controller failed" ) ;
rc = slurm_send_node_buffer ( open_fd , & secondary_destination_address , msg_type , data_buffer , buf_len ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = slurm_send_node_buffer ( open_fd , & secondary_destination_address , msg_type , data_buffer , buf_len ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Send messge to secondary controller failed" ) ;
}
......@@ -252,11 +271,9 @@ uint32_t slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_ad
memcpy ( buffer , data_buffer , buf_len ) ;
pack_len -= buf_len ;
rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error sending msg socket: errno %i", errno ) ;
return rc ;
}
return rc ;
}
......@@ -298,6 +315,11 @@ void slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , uint32_t
_slurm_set_addr_uint ( slurm_address , port , ip_address ) ;
}
void slurm_set_addr_any ( slurm_addr * slurm_address , uint16_t port )
{
_slurm_set_addr_uint ( slurm_address , port , SLURM_INADDR_ANY ) ;
}
void slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host )
{
_slurm_set_addr ( slurm_address , port , host ) ;
......
......@@ -20,13 +20,19 @@
#include <src/common/slurm_protocol_util.h>
#include <src/common/slurm_protocol_defs.h>
#define SLURM_PORT 7000
#define PRIMARY_SLURM_CONTROLLER "pri_slrumctld.llnl.gov"
#define SECONDARY_SLURM_CONTROLLER "sec_slrumctld.llnl.gov"
//WHAT ABOUT THESE INCLUDES
/* high level routines */
/* msg functions */
slurm_fd slurm_init_msg_engine_port ( uint16_t port ) ;
slurm_fd slurm_init_msg_engine ( slurm_addr * slurm_address ) ;
uint32_t slurm_receive_msg ( slurm_fd open_fd , slurm_addr * source_address , slurm_msg_t ** msg ) ;
uint32_t slurm_shutdown_msg_engine ( slurm_fd open_fd ) ;
uint32_t slurm_shutdown_msg_conn ( slurm_fd open_fd ) ;
/* send msg functions */
......@@ -55,6 +61,7 @@ uint32_t slurm_send_node_msg ( slurm_fd open_fd , slurm_addr * slurm_address , s
void slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , uint32_t ip_address ) ;
void slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ;
void slurm_set_addr_any ( slurm_addr * slurm_address , uint16_t port ) ;
void slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ;
void slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ;
#endif
......@@ -16,6 +16,7 @@
#define AF_SLURM AF_INET
#define SLURM_INADDR_ANY 0x00000000
/* LINUX SPECIFIC */
/* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */
......@@ -32,6 +33,8 @@ typedef struct {
/* SLURM datatypes */
/* this is a custom data type to describe the slurm msg type type that is placed in the slurm protocol header
* while just an short now, it may change in the future */
typedef uint16_t slurm_msg_type_t ;
/* Now defined in ../../src/common/slurm_protocol_defs.h
* typedef uint16_t slurm_msg_type_t ;
*/
#endif
......@@ -44,6 +44,60 @@
/* SLURM Message types */
typedef enum {
REQUEST_NODE_REGISRATION_STATUS = 1,
MESSAGE_NODE_REGISRATION_STATUS = 2,
REQUEST_RESOURCE_ALLOCATION = 3,
RESPONSE_RESOURCE_ALLOCATION = 4,
REQUEST_CANCEL_JOB = 5,
REQUEST_RECONFIGURE = 6,
RESPONSE_CANCEL_JOB = 7,
REQUEST_JOB_INFO = 8,
RESPONSE_JOB_INFO = 9,
REQUEST_JOB_STEP_INFO = 10,
RESPONSE_JOB_STEP_INFO = 11,
REQUEST_NODE_INFO = 12,
RESPONSE_NODE_INFO = 13,
REQUEST_PARTITION_INFO = 14,
RESPONSE_PARTITION_INFO = 15,
REQUEST_ACCTING_INFO = 16,
RESPONSE_ACCOUNTING_INFO = 17,
REQUEST_BUILD_INFO = 18,
RESPONSE_BUILD_INFO = 19,
MESSAGE_UPLOAD_ACCOUNTING_INFO = 20,
RESPONSE_RECONFIGURE = 21,
REQUEST_SUBMIT_BATCH_JOB = 22,
RESPONSE_SUBMIT_BATCH_JOB = 23,
REQUEST_CANCEL_JOB_STEP = 24,
RESPONSE_CANCEL_JOB_STEP = 25,
REQUEST_SIGNAL_JOB = 26,
RESPONSE_SIGNAL_JOB = 27,
REQUEST_SIGNAL_JOB_STEP = 28,
RESPONSE_SIGNAL_JOB_STEP = 29,
REQUEST_BATCH_JOB_LAUNCH = 30,
RESPONSE_BATCH_JOB_LAUNCH = 31,
MESSAGE_TASK_EXIT = 32,
MESSAGE_REVOKE_JOB_CREDENTIAL = 33,
REQUEST_LAUNCH_TASKS = 34,
REQUEST_CREATE_JOB_STEP = 35,
RESPONSE_CREATE_JOB_STEP = 36,
REQUEST_RUN_JOB_STEP = 37,
RESPONSE_RUN_JOB_STEP = 38,
REQUEST_JOB_ATTACH = 39,
RESPONSE_JOB_ATTACH = 40,
RESPONSE_LAUNCH_TASKS = 41,
REQUEST_GET_KEY = 42,
RESPONSE_GET_KEY = 43,
REQUEST_GET_JOB_STEP_INFO = 44,
RESPONSE_GET_JOB_STEP_INFO = 45,
REQUEST_JOB_RESOURCE = 46,
RESPONSE_JOB_RESOURCE = 47
/*
REQUEST_RUN_JOB_STEP = 48,
RESPONSE_RUN_JOB_STEP = 49
*/
} slurm_msg_type_t ;
#define REQUEST_NODE_REGISRATION_STATUS 1
#define MESSAGE_NODE_REGISRATION_STATUS 2
#define REQUEST_RESOURCE_ALLOCATION 3
......@@ -58,7 +112,7 @@
#define REQUEST_NODE_INFO 12
#define RESPONSE_NODE_INFO 13
#define REQUEST_PARTITION_INFO 14
#define RESPONSE_JOB_INFO 15
#define RESPONSE_PATITION_INFO 15
#define REQUEST_ACCTING_INFO 16
#define RESPONSE_ACCOUNTING_INFO 17
#define REQUEST_BUILD_INFO 18
......@@ -91,8 +145,6 @@
#define RESPONSE_GET_JOB_STEP_INFO 45
#define REQUEST_JOB_RESOURCE 46
#define RESPONSE_JOB_RESOURCE 47
#define REQUEST_RUN_JOB_STEP 48
#define RESPONSE_RUN_JOB_STEP 49
typedef struct slurm_protocol_header
{
......
......@@ -66,6 +66,7 @@ typedef enum slurm_socket_type { SLURM_MESSAGE , SLURM_STREAM } slurm_socket_typ
int _slurm_create_socket (slurm_socket_type_t type) ;
/* msg functions */
uint32_t _slurm_init_msg_engine ( slurm_addr * slurm_address ) ;
uint32_t _slurm_open_msg_conn ( slurm_addr * slurm_address ) ;
ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ;
ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ;
/* uint32_t _slurm_shutdown_msg_engine ( slurm_fd open_fd ) ; */
......
......@@ -29,6 +29,11 @@ uint32_t _slurm_init_msg_engine ( slurm_addr * slurm_address )
return _slurm_listen_stream ( slurm_address ) ;
}
uint32_t _slurm_open_msg_conn ( slurm_addr * slurm_address )
{
return _slurm_listen_stream ( slurm_address ) ;
}
ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address )
{
slurm_fd connection_fd ;
......@@ -41,15 +46,13 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin
uint32_t total_len ;
connection_fd = _slurm_accept_stream ( open_fd , slurm_address ) ;
if ( connection_fd == SLURM_SOCKET_ERROR )
if ( ( connection_fd = _slurm_accept_stream ( open_fd , slurm_address ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error opening stream socket to receive msg datagram emulation layeri\n" ) ;
return connection_fd ;
}
recv_len = _slurm_recv ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ;
if ( recv_len != sizeof ( uint32_t ) )
if ( ( recv_len = _slurm_recv ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) )
{
debug ( "Error receiving legth of datagram. Total Bytes Sent %i \n", recv_len ) ;
}
......@@ -58,8 +61,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin
total_len = 0 ;
while ( total_len < transmit_size )
{
recv_len = _slurm_recv ( connection_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ;
if ( recv_len == SLURM_SOCKET_ERROR )
if ( ( recv_len = _slurm_recv ( connection_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error receiving legth of datagram. errno %i \n", errno ) ;
return recv_len ;
......@@ -85,14 +87,12 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3
pack32 ( size , ( void ** ) & size_buffer , & size_size ) ;
connection_fd = _slurm_open_stream ( slurm_address ) ;
if ( connection_fd == SLURM_SOCKET_ERROR )
if ( ( connection_fd = _slurm_open_stream ( slurm_address ) ) ==SLURM_SOCKET_ERROR )
{
debug ( "Error opening stream socket to send msg datagram emulation layer\n" ) ;
return connection_fd ;
}
send_len = _slurm_send ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ;
if ( send_len != sizeof ( uint32_t ) )
if ( ( send_len = _slurm_send ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) )
{
debug ( "Error sending length of datagram\n" ) ;
}
......@@ -116,26 +116,19 @@ uint32_t _slurm_listen_stream ( slurm_addr * slurm_address )
{
uint32_t rc ;
slurm_fd connection_fd ;
rc =_slurm_create_socket ( SLURM_STREAM ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) == SLURM_SOCKET_ERROR )
{
debug( "Error creating slurm stream socket: errno %i\n", errno ) ;
return rc ;
}
else
{
connection_fd = rc ;
return connection_fd ;
}
rc = _slurm_bind ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_bind ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR )
{
debug( "Error binding slurm stream socket: errno %i\n" , errno ) ;
return rc ;
}
rc = _slurm_listen ( connection_fd , DEFAULT_LISTEN_BACKLOG ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_listen ( connection_fd , DEFAULT_LISTEN_BACKLOG ) ) == SLURM_SOCKET_ERROR )
{
debug( "Error listening on slurm stream socket: errno %i\n" , errno ) ;
return rc ;
......@@ -146,18 +139,11 @@ uint32_t _slurm_listen_stream ( slurm_addr * slurm_address )
uint32_t _slurm_accept_stream ( slurm_fd open_fd , slurm_addr * slurm_address )
{
uint32_t rc ;
uint32_t addr_len = sizeof ( slurm_addr ) ;
slurm_fd connection_fd ;
rc =_slurm_accept ( open_fd , ( struct sockaddr * ) slurm_address , & addr_len ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( connection_fd = _slurm_accept ( open_fd , ( struct sockaddr * ) slurm_address , & addr_len ) ) == SLURM_SOCKET_ERROR )
{
debug( "Error accepting slurm stream socket: errno %i\n", errno ) ;
return rc ;
}
else
{
connection_fd = rc ;
}
return connection_fd ;
......@@ -167,19 +153,13 @@ uint32_t _slurm_open_stream ( slurm_addr * slurm_address )
{
uint32_t rc ;
slurm_fd connection_fd ;
rc =_slurm_create_socket ( SLURM_STREAM ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) == SLURM_SOCKET_ERROR )
{
debug( "Error creating slurm stream socket: errno %i\n", errno ) ;
return rc ;
}
else
{
connection_fd = rc ;
return connection_fd ;
}
rc = _slurm_connect ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ;
if ( rc == SLURM_SOCKET_ERROR )
if ( ( rc = _slurm_connect ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR )
{
debug( "Error listening on slurm stream socket: errno %i\n" , errno ) ;
return rc ;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment