Skip to content
Snippets Groups Projects
Commit d11e103c authored by tewk's avatar tewk
Browse files

Added files allow for mongo/socket differences in the slurm_protocol_common.h file

slurm_protocol_errno.h provides a default place for slurm specific error definitions until Jim completes his general errno implementation

Cleaned up comm layer , this never ends :)
added pack and unpack routines for new communcations paradigm
parent 697d1f03
No related branches found
No related tags found
No related merge requests found
...@@ -31,6 +31,7 @@ libcommon_la_SOURCES = xmalloc.c \ ...@@ -31,6 +31,7 @@ libcommon_la_SOURCES = xmalloc.c \
slurm_protocol_pack.c \ slurm_protocol_pack.c \
slurm_protocol_util.c \ slurm_protocol_util.c \
slurm_protocol_socket_implementation.c \ slurm_protocol_socket_implementation.c \
slurm_protocol_defs.c \
$(elan_sources) $(elan_sources)
noinst_HEADERS = xmalloc.h \ noinst_HEADERS = xmalloc.h \
...@@ -47,6 +48,10 @@ noinst_HEADERS = xmalloc.h \ ...@@ -47,6 +48,10 @@ noinst_HEADERS = xmalloc.h \
slurm_protocol_api.h \ slurm_protocol_api.h \
slurm_protocol_pack.h \ slurm_protocol_pack.h \
slurm_protocol_util.h \ slurm_protocol_util.h \
slurm_protocol_defs.h \
slurm_protocol_common.h \
slurm_protocol_socket_common.h \
slurm_protocol_mongo_common.h \
qsw.h qsw.h
EXTRA_libcommon_la_SOURCES = \ EXTRA_libcommon_la_SOURCES = \
......
...@@ -130,14 +130,14 @@ int slurm_close_accepted_conn ( slurm_fd open_fd ) ...@@ -130,14 +130,14 @@ int slurm_close_accepted_conn ( slurm_fd open_fd )
*/ */
int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg )
{ {
char buftemp[MAX_MESSAGE_BUFFER_SIZE] ; char buftemp[SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE] ;
char * buffer = buftemp ; char * buffer = buftemp ;
header_t header ; header_t header ;
int rc ; int rc ;
unsigned int unpack_len ; unsigned int unpack_len ;
unsigned int receive_len = MAX_MESSAGE_BUFFER_SIZE ; unsigned int receive_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , & msg->address ) ) == SLURM_SOCKET_ERROR ) if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & (msg)->address ) ) == SLURM_SOCKET_ERROR )
{ {
debug ( "Error recieving msg socket: errno %i\n", errno ) ; debug ( "Error recieving msg socket: errno %i\n", errno ) ;
return rc ; return rc ;
...@@ -153,7 +153,7 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -153,7 +153,7 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg )
} }
/* unpack msg body */ /* unpack msg body */
msg -> msg_type = header . msg_type ; (msg) -> msg_type = header . msg_type ;
unpack_msg ( msg , & buffer , & unpack_len ) ; unpack_msg ( msg , & buffer , & unpack_len ) ;
return rc ; return rc ;
...@@ -199,7 +199,7 @@ int slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -199,7 +199,7 @@ int slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_t * msg )
*/ */
int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg ) int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg )
{ {
char buf_temp[MAX_MESSAGE_BUFFER_SIZE] ; char buf_temp[SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE] ;
char * buffer = buf_temp ; char * buffer = buf_temp ;
header_t header ; header_t header ;
int rc ; int rc ;
...@@ -209,14 +209,14 @@ int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -209,14 +209,14 @@ int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg )
init_header ( & header , msg->msg_type , SLURM_PROTOCOL_NO_FLAGS ) ; init_header ( & header , msg->msg_type , SLURM_PROTOCOL_NO_FLAGS ) ;
/* pack header */ /* pack header */
pack_len = MAX_MESSAGE_BUFFER_SIZE ; pack_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
pack_header ( &header , & buffer , & pack_len ) ; pack_header ( &header , & buffer , & pack_len ) ;
/* pack msg */ /* pack msg */
pack_msg ( msg , & buffer , & pack_len ) ; pack_msg ( msg , & buffer , & pack_len ) ;
/* send msg */ /* send msg */
if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , &msg->address ) ) == SLURM_SOCKET_ERROR ) if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE - pack_len , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , &msg->address ) ) == SLURM_SOCKET_ERROR )
{ {
debug ( "Error sending msg socket: errno %i\n", errno ) ; debug ( "Error sending msg socket: errno %i\n", errno ) ;
} }
...@@ -234,15 +234,15 @@ int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -234,15 +234,15 @@ int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg )
*/ */
int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slurm_msg_type_t * msg_type , char * data_buffer , size_t buf_len ) int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slurm_msg_type_t * msg_type , char * data_buffer , size_t buf_len )
{ {
char buftemp[MAX_MESSAGE_BUFFER_SIZE] ; char buftemp[SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE] ;
char * buffer = buftemp ; char * buffer = buftemp ;
header_t header ; header_t header ;
int rc ; int rc ;
int bytes_read; int bytes_read;
unsigned int unpack_len ; unsigned int unpack_len ;
unsigned int receive_len = MAX_MESSAGE_BUFFER_SIZE ; unsigned int receive_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) ; if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) ;
{ {
debug ( "Error recieving msg socket: errno %i\n", errno ) ; debug ( "Error recieving msg socket: errno %i\n", errno ) ;
return rc ; return rc ;
...@@ -302,7 +302,7 @@ int slurm_send_controller_buffer ( slurm_fd open_fd , slurm_msg_type_t msg_type ...@@ -302,7 +302,7 @@ int slurm_send_controller_buffer ( slurm_fd open_fd , slurm_msg_type_t msg_type
*/ */
int slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_address , slurm_msg_type_t msg_type , char * data_buffer , size_t buf_len ) int slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_address , slurm_msg_type_t msg_type , char * data_buffer , size_t buf_len )
{ {
char buf_temp[MAX_MESSAGE_BUFFER_SIZE] ; char buf_temp[SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE] ;
char * buffer = buf_temp ; char * buffer = buf_temp ;
header_t header ; header_t header ;
unsigned int rc ; unsigned int rc ;
...@@ -312,14 +312,14 @@ int slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_address ...@@ -312,14 +312,14 @@ int slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_address
init_header ( & header , msg_type , SLURM_PROTOCOL_NO_FLAGS ) ; init_header ( & header , msg_type , SLURM_PROTOCOL_NO_FLAGS ) ;
/* pack header */ /* pack header */
pack_len = MAX_MESSAGE_BUFFER_SIZE ; pack_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
pack_header ( &header, & buffer , & pack_len ) ; pack_header ( &header, & buffer , & pack_len ) ;
/* pack msg */ /* pack msg */
memcpy ( buffer , data_buffer , buf_len ) ; memcpy ( buffer , data_buffer , buf_len ) ;
pack_len -= buf_len ; pack_len -= buf_len ;
if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , MAX_MESSAGE_BUFFER_SIZE - pack_len , NO_SEND_RECV_FLAGS , destination_address ) ) == SLURM_SOCKET_ERROR ) if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE - pack_len , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , destination_address ) ) == SLURM_SOCKET_ERROR )
{ {
debug ( "Error sending msg socket: errno %i", errno ) ; debug ( "Error sending msg socket: errno %i", errno ) ;
} }
...@@ -346,12 +346,12 @@ slurm_fd slurm_open_stream ( slurm_addr * slurm_address ) ...@@ -346,12 +346,12 @@ slurm_fd slurm_open_stream ( slurm_addr * slurm_address )
size_t slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size ) size_t slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size )
{ {
return _slurm_send ( open_fd , buffer , size , NO_SEND_RECV_FLAGS ) ; return _slurm_send ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ;
} }
size_t slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size ) size_t slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size )
{ {
return _slurm_recv ( open_fd , buffer , size , NO_SEND_RECV_FLAGS ) ; return _slurm_recv ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ;
} }
int slurm_close_stream ( slurm_fd open_fd ) int slurm_close_stream ( slurm_fd open_fd )
...@@ -388,13 +388,41 @@ void slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host ...@@ -388,13 +388,41 @@ void slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host
_slurm_get_addr ( slurm_address , port , host , buf_len ) ; _slurm_get_addr ( slurm_address , port , host , buf_len ) ;
} }
/* slurm msg type */ /************************/
/* frees the inner message data then frees the msg struct */ /***** slurm addr functions */
void slurm_msg_destroy ( slurm_msg_t * location , int destroy_data ) /************************/
/* sets/gets the fields of a slurm_addri */
void slurm_free_msg ( slurm_msg_t * msg )
{ {
if ( destroy_data ) /*
switch ( msg -> msg_type )
{ {
free ( location->data ) ; case REQUEST_BUID_INFO :
( ( build_table *) msg -> data ) -> free ( msg -> data ) ;
} }
free ( location ) ; */
free ( msg ) ;
}
/*******************************************/
/***** slurm send highlevel msg functions */
/*******************************************/
void slurm_send_rc_msg ( slurm_msg_t * request_msg , int rc )
{
slurm_msg_t response_msg ;
return_code_msg_t rc_msg ;
/* no change */
rc_msg . return_code = rc ;
/* init response_msg structure */
response_msg . address = request_msg -> address ;
response_msg . msg_type = RESPONSE_SLURM_RC ;
response_msg . data = & rc_msg ;
/* send message */
slurm_send_node_msg( request_msg -> conn_fd , &response_msg ) ;
} }
...@@ -19,10 +19,13 @@ ...@@ -19,10 +19,13 @@
#include <src/common/slurm_protocol_common.h> #include <src/common/slurm_protocol_common.h>
#include <src/common/slurm_protocol_util.h> #include <src/common/slurm_protocol_util.h>
#include <src/common/slurm_protocol_defs.h> #include <src/common/slurm_protocol_defs.h>
#include <src/common/slurm_protocol_errno.h>
#define SLURM_PORT 7000 #define SLURM_PORT 7000
#define PRIMARY_SLURM_CONTROLLER "pri_slrumctld.llnl.gov" /*#define PRIMARY_SLURM_CONTROLLER "pri_slrumctld.llnl.gov" */
#define SECONDARY_SLURM_CONTROLLER "sec_slrumctld.llnl.gov" /*#define SECONDARY_SLURM_CONTROLLER "sec_slrumctld.llnl.gov" */
#define PRIMARY_SLURM_CONTROLLER "localhost"
#define SECONDARY_SLURM_CONTROLLER "localhost"
#define REQUEST_BUFFER_SIZE 64 #define REQUEST_BUFFER_SIZE 64
#define RESPONSE_BUFFER_SIZE 1024 #define RESPONSE_BUFFER_SIZE 1024
...@@ -30,32 +33,33 @@ ...@@ -30,32 +33,33 @@
/* high level routines */ /* high level routines */
/* msg functions */ /* msg functions */
slurm_fd slurm_init_msg_engine_port ( uint16_t port ) ; slurm_fd inline slurm_init_msg_engine_port ( uint16_t port ) ;
slurm_fd slurm_init_msg_engine ( slurm_addr * slurm_address ) ; slurm_fd inline slurm_init_msg_engine ( slurm_addr * slurm_address ) ;
slurm_fd slurm_accept_msg_conn ( slurm_fd open_fd , slurm_addr * slurm_address ) ; slurm_fd inline slurm_accept_msg_conn ( slurm_fd open_fd , slurm_addr * slurm_address ) ;
int slurm_close_accepted_conn ( slurm_fd open_fd ) ; int inline slurm_close_accepted_conn ( slurm_fd open_fd ) ;
int slurm_shutdown_msg_engine ( slurm_fd open_fd ) ; int inline slurm_shutdown_msg_engine ( slurm_fd open_fd ) ;
int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ; int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ;
int slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ;
int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ;
slurm_fd slurm_open_controller_conn ( ) ; slurm_fd inline slurm_open_controller_conn ( ) ;
slurm_fd slurm_open_msg_conn ( slurm_addr * slurm_address ) ; slurm_fd inline slurm_open_msg_conn ( slurm_addr * slurm_address ) ;
int slurm_shutdown_msg_conn ( slurm_fd open_fd ) ; int inline slurm_shutdown_msg_conn ( slurm_fd open_fd ) ;
/* send msg functions */ /* send msg functions */
/* stream functions */ /* stream functions */
slurm_fd slurm_listen_stream ( slurm_addr * slurm_address ) ; slurm_fd inline slurm_listen_stream ( slurm_addr * slurm_address ) ;
slurm_fd slurm_accept_stream ( slurm_fd open_fd , slurm_addr * slurm_address ) ; slurm_fd inline slurm_accept_stream ( slurm_fd open_fd , slurm_addr * slurm_address ) ;
slurm_fd slurm_open_stream ( slurm_addr * slurm_address ) ; slurm_fd inline slurm_open_stream ( slurm_addr * slurm_address ) ;
size_t slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size ) ; size_t inline slurm_write_stream ( slurm_fd open_fd , char * buffer , size_t size ) ;
size_t slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size ) ; size_t inline slurm_read_stream ( slurm_fd open_fd , char * buffer , size_t size ) ;
int slurm_close_stream ( slurm_fd open_fd ) ; int inline slurm_close_stream ( slurm_fd open_fd ) ;
/* Low level routines */ /* Low level routines */
/* msg functions */ /* msg functions */
...@@ -63,14 +67,20 @@ int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slur ...@@ -63,14 +67,20 @@ int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slur
int slurm_send_controller_buffer ( slurm_fd open_fd , slurm_msg_type_t msg_type , char * data_buffer , size_t buf_len ) ; int slurm_send_controller_buffer ( slurm_fd open_fd , slurm_msg_type_t msg_type , char * data_buffer , size_t buf_len ) ;
int slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_address , slurm_msg_type_t msg_type , char * data_buffer , size_t buf_len ) ; int slurm_send_node_buffer ( slurm_fd open_fd , slurm_addr * destination_address , slurm_msg_type_t msg_type , char * data_buffer , size_t buf_len ) ;
int slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ;
int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ;
/* Address Conversion Functions */ /* Address Conversion Functions */
void slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , uint32_t ip_address ) ; void inline slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , uint32_t ip_address ) ;
void slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ; void inline slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ;
void slurm_set_addr_any ( slurm_addr * slurm_address , uint16_t port ) ; void inline slurm_set_addr_any ( slurm_addr * slurm_address , uint16_t port ) ;
void slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ; void inline slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ;
void slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ; void inline slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ;
/* Slurm message functions */
void slurm_free_msg ( slurm_msg_t * msg ) ;
/*******************************************/
/***** slurm send highlevel msg functions */
/*******************************************/
void slurm_send_rc_msg ( slurm_msg_t * request_msg , int rc );
#endif #endif
#ifndef _SLURM_PROTOCOL_COMMON_H #ifndef _SLURM_PROTOCOL_COMMON_H
#define _SLURM_PROTOCOL_COMMON_H #define _SLURM_PROTOCOL_COMMON_H
#if HAVE_CONFIG_H #include <src/common/slurm_protocol_errno.h>
# include <config.h> /* for sendto and recvfrom commands */
# if HAVE_INTTYPES_H #define SLURM_PROTOCOL_NO_SEND_RECV_FLAGS 0
# include <inttypes.h> /* for accpet commands */
# else #define SLURM_PROTOCOL_DEFAULT_LISTEN_BACKLOG 10
# if HAVE_STDINT_H /* used in interface methods */
# include <stdint.h> #define SLURM_PROTOCOL_FUNCTION_NOT_IMPLEMENTED -2
# endif /* max slurm message send and receive buffer size
# endif /* HAVE_INTTYPES_H */ this may need to be increased to 350k-512k */
#else /* !HAVE_CONFIG_H */ #define SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE 4096
# include <inttypes.h> /* slurm protocol header defines */
#endif /* HAVE_CONFIG_H */ #define SLURM_PROTOCOL_VERSION 1
#define SLURM_PROTOCOL_NO_FLAGS 0 /* used in the header to set flags to empty */
#include <netinet/in.h>
#define AF_SLURM AF_INET #if MONG_IMPLEMENTATION
#define SLURM_INADDR_ANY 0x00000000 # include <src/common/slurm_protocol_mongo_common.h>
#else
/* LINUX SPECIFIC */ # include <src/common/slurm_protocol_socket_common.h>
/* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */ #endif
typedef uint32_t slurm_fd ;
/* this is the slurm equivalent of the BSD sockets sockaddr */
typedef struct sockaddr_in slurm_addr ;
/*struct kevin {
int16_t family ;
uint16_t port ;
uint32_t address ;
char pad[16 - sizeof ( int16_t ) - sizeof (uint16_t) - sizeof (uint32_t) ] ;
} ;
*/
/* SLURM datatypes */
/* this is a custom data type to describe the slurm msg type type that is placed in the slurm protocol header
* while just an short now, it may change in the future */
/* Now defined in ../../src/common/slurm_protocol_defs.h
* typedef uint16_t slurm_msg_type_t ;
*/
#endif #endif
#include <src/common/slurm_protocol_defs.h>
#include <stdlib.h>
#include <src/slurmctld/slurmctld.h>
void slurm_free_last_update_msg ( last_update_msg_t * msg )
{
free ( msg ) ;
}
void slurm_free_job_id_msg ( job_id_msg_t * msg )
{
free ( msg ) ;
}
void slurm_free_job_desc_msg ( job_desc_msg_t * msg )
{
free ( msg->features ) ;
free ( msg->groups ) ;
free ( msg->name ) ;
free ( msg->partition_key ) ;
free ( msg->partition ) ;
free ( msg->req_nodes ) ;
free ( msg->job_script ) ;
free ( msg ) ;
}
void slurm_free_job_info_msg ( job_info_msg_t * msg )
{
int i;
for (i = 0; i < msg -> record_count; i++) {
slurm_free_job_table ( & ( msg->job_array[i] ) ) ;
}
free ( msg );
}
void slurm_free_job_table ( job_table_t * job )
{
if ( job )
{
if (job->nodes) free (job->nodes) ;
if (job->partition) free (job->partition) ;
if (job->name) free (job->name) ;
if (job->node_inx) free (job->node_inx) ;
if (job->req_nodes) free (job->req_nodes) ;
if (job->features) free (job->features) ;
if (job->job_script) free (job->job_script) ;
if (job->req_node_inx) free (job->req_node_inx) ;
free ( job ) ;
}
}
...@@ -20,30 +20,6 @@ ...@@ -20,30 +20,6 @@
#include <src/common/slurm_protocol_common.h> #include <src/common/slurm_protocol_common.h>
/* for sendto and recvfrom commands */
#define NO_SEND_RECV_FLAGS 0
#define DEFAULT_LISTEN_BACKLOG 10
/* used in interface methods */
#define SLURM_NOT_IMPLEMENTED -2
#define SLURM_PROTOCOL_SUCCESS 0
#define SLURM_PROTOCOL_FAILURE -1
#define SLURM_SOCKET_ERROR -1
#define MAX_MESSAGE_BUFFER_SIZE 4096
#define SLURM_PROTOCOL_VERSION 1
#define SLURM_PROTOCOL_NO_FLAGS 0
#define SLURM_PROTOCOL_VERSION_ERROR -100
/* SLURM Message types */ /* SLURM Message types */
typedef enum { typedef enum {
REQUEST_NODE_REGISRATION_STATUS = 1, REQUEST_NODE_REGISRATION_STATUS = 1,
...@@ -102,23 +78,29 @@ typedef uint16_t slurm_msg_type_t ; ...@@ -102,23 +78,29 @@ typedef uint16_t slurm_msg_type_t ;
#define REQUEST_NODE_REGISRATION_STATUS 1 #define REQUEST_NODE_REGISRATION_STATUS 1
#define MESSAGE_NODE_REGISRATION_STATUS 2 #define MESSAGE_NODE_REGISRATION_STATUS 2
#define REQUEST_RESOURCE_ALLOCATION 3 #define REQUEST_RESOURCE_ALLOCATION 3
#define RESPONSE_RESOURCE_ALLOCATION 4 #define RESPONSE_RESOURCE_ALLOCATION 4
#define REQUEST_CANCEL_JOB 5 #define REQUEST_CANCEL_JOB 5
#define REQUEST_RECONFIGURE 6
#define RESPONSE_CANCEL_JOB 7 #define RESPONSE_CANCEL_JOB 7
#define REQUEST_JOB_INFO 8
#define RESPONSE_JOB_INFO 9 #define REQUEST_RECONFIGURE 6
#define REQUEST_JOB_STEP_INFO 10
#define RESPONSE_JOB_STEP_INFO 11 #define REQUEST_JOB_INFO 3021
#define REQUEST_NODE_INFO 12 #define RESPONSE_JOB_INFO 3022
#define RESPONSE_NODE_INFO 13 #define REQUEST_JOB_STEP_INFO 3031
#define REQUEST_PARTITION_INFO 14 #define RESPONSE_JOB_STEP_INFO 3032
#define RESPONSE_PATITION_INFO 15 #define REQUEST_NODE_INFO 3041
#define REQUEST_ACCTING_INFO 16 #define RESPONSE_NODE_INFO 3042
#define RESPONSE_ACCOUNTING_INFO 17 #define REQUEST_PARTITION_INFO 3051
#define REQUEST_BUILD_INFO 18 #define RESPONSE_PATITION_INFO 3052
#define RESPONSE_BUILD_INFO 19 #define REQUEST_ACCTING_INFO 3061
#define RESPONSE_ACCOUNTING_INFO 3062
#define REQUEST_BUILD_INFO 3011
#define RESPONSE_BUILD_INFO 3012
#define RESPONSE_BUILD_INFO_RC 3013
#define MESSAGE_UPLOAD_ACCOUNTING_INFO 20 #define MESSAGE_UPLOAD_ACCOUNTING_INFO 20
#define RESPONSE_RECONFIGURE 21 #define RESPONSE_RECONFIGURE 21
#define REQUEST_SUBMIT_BATCH_JOB 22 #define REQUEST_SUBMIT_BATCH_JOB 22
...@@ -147,6 +129,7 @@ typedef uint16_t slurm_msg_type_t ; ...@@ -147,6 +129,7 @@ typedef uint16_t slurm_msg_type_t ;
#define RESPONSE_GET_JOB_STEP_INFO 45 #define RESPONSE_GET_JOB_STEP_INFO 45
#define REQUEST_JOB_RESOURCE 46 #define REQUEST_JOB_RESOURCE 46
#define RESPONSE_JOB_RESOURCE 47 #define RESPONSE_JOB_RESOURCE 47
#define RESPONSE_SLURM_RC 47
typedef struct slurm_protocol_header typedef struct slurm_protocol_header
{ {
...@@ -156,14 +139,25 @@ typedef struct slurm_protocol_header ...@@ -156,14 +139,25 @@ typedef struct slurm_protocol_header
uint32_t body_length ; uint32_t body_length ;
} header_t ; } header_t ;
typedef struct slurm_msg
{
slurm_msg_type_t msg_type ;
slurm_addr address ;
slurm_fd conn_fd ;
void * data ;
uint32_t data_size ;
} slurm_msg_t ;
typedef struct slurm_node_registration_status_msg typedef struct slurm_node_registration_status_msg
{ {
uint32_t timestamp ; uint32_t timestamp ;
uint32_t memory_size ; char* node_name;
uint32_t cpus;
uint32_t real_memory_size ;
uint32_t temporary_disk_space ; uint32_t temporary_disk_space ;
} node_registration_status_msg_t ; } node_registration_status_msg_t ;
typedef struct job_desc { /* Job descriptor for submit, allocate, and update requests */ typedef struct job_desc_msg { /* Job descriptor for submit, allocate, and update requests */
uint16_t contiguous; /* 1 if job requires contiguous nodes, 0 otherwise, uint16_t contiguous; /* 1 if job requires contiguous nodes, 0 otherwise,
* default=0 */ * default=0 */
char *features; /* comma separated list of required features, default NONE */ char *features; /* comma separated list of required features, default NONE */
...@@ -191,12 +185,86 @@ typedef struct job_desc { /* Job descriptor for submit, allocate, and update ...@@ -191,12 +185,86 @@ typedef struct job_desc { /* Job descriptor for submit, allocate, and update
uint32_t num_nodes; /* number of nodes required by job, default=0 */ uint32_t num_nodes; /* number of nodes required by job, default=0 */
uint32_t user_id; /* set only if different from current UID, default set uint32_t user_id; /* set only if different from current UID, default set
* to UID by API, can only be set if user is root */ * to UID by API, can only be set if user is root */
} job_desc_t ; } job_desc_msg_t ;
typedef struct last_update_msg {
uint32_t last_update;
} last_update_msg_t ;
typedef struct return_code_msg {
int32_t return_code;
} return_code_msg_t ;
typedef struct job_id_msg {
uint32_t job_id;
} job_id_msg_t ;
struct build_table {
uint32_t last_update; /* last update time of the build parameters*/
uint16_t backup_interval;/* slurmctld save state interval, seconds */
char *backup_location; /* pathname of state save directory */
char *backup_machine; /* name of slurmctld secondary server */
char *control_daemon; /* pathname of slurmctld */
char *control_machine; /* name of slurmctld primary server */
uint16_t controller_timeout; /* seconds for secondary slurmctld to take over */
char *epilog; /* pathname of job epilog */
uint16_t fast_schedule; /* 1 to *not* check configurations by node
* (only check configuration file, faster) */
uint16_t hash_base; /* base used for hashing node table */
uint16_t heartbeat_interval; /* interval between node heartbeats, seconds */
char *init_program; /* pathname of program to complete with exit 0
* before slurmctld or slurmd start on that node */
uint16_t kill_wait; /* seconds from SIGXCPU to SIGKILL on job termination */
char *prioritize; /* pathname of program to set initial job priority */
char *prolog; /* pathname of job prolog */
char *server_daemon; /* pathame of slurmd */
uint16_t server_timeout;/* how long slurmctld waits for setting node DOWN */
char *slurm_conf; /* pathname of slurm config file */
char *tmp_fs; /* pathname of temporary file system */
};
typedef struct build_table build_info_msg_t ;
struct job_table {
uint32_t job_id; /* job ID */
char *name; /* name of the job */
uint32_t user_id; /* user the job runs as */
uint16_t job_state; /* state of the job, see enum job_states */
uint32_t time_limit; /* maximum run time in minutes or INFINITE */
time_t start_time; /* time execution begins, actual or expected*/
time_t end_time; /* time of termination, actual or expected */
uint32_t priority; /* relative priority of the job */
char *nodes; /* comma delimited list of nodes allocated to job */
int *node_inx; /* list index pairs into node_table for *nodes:
start_range_1, end_range_1, start_range_2, .., -1 */
char *partition; /* name of assigned partition */
uint32_t num_procs; /* number of processors required by job */
uint32_t num_nodes; /* number of nodes required by job */
uint16_t shared; /* 1 if job can share nodes with other jobs */
uint16_t contiguous; /* 1 if job requires contiguous nodes */
uint32_t min_procs; /* minimum processors required per node */
uint32_t min_memory; /* minimum real memory required per node */
uint32_t min_tmp_disk; /* minimum temporary disk required per node */
char *req_nodes; /* comma separated list of required nodes */
int *req_node_inx; /* list index pairs into node_table for *req_nodes:
start_range_1, end_range_1, start_range_2, .., -1 */
char *features; /* comma separated list of required features */
char *job_script; /* pathname of required script */
};
typedef struct job_table job_table_t ;
typedef struct job_info_msg {
uint32_t last_update;
uint32_t record_count;
job_table_t * job_array;
} job_info_msg_t ;
/* the following typedefs follow kevin's communication message naming comvention */
/* free message functions */
void inline slurm_free_last_update_msg ( last_update_msg_t * msg ) ;
void inline slurm_free_job_desc_msg ( job_desc_msg_t * msg ) ;
void inline slurm_free_job_info_msg ( job_info_msg_t * msg ) ;
void inline slurm_free_job_table ( job_table_t * job ) ;
void inline slurm_free_job_id_msg ( job_id_msg_t * msg ) ;
typedef struct slurm_msg
{
slurm_msg_type_t msg_type ;
slurm_addr address ;
void * data ;
} slurm_msg_t ;
#endif #endif
#ifndef _SLURM_PROTOCOL_ERRNO_H
#define _SLURM_PROTOCOL_ERRNO_H
/* communcation layer RESPONSE_SLURM_RC message codes */
#define SLURM_NO_CHANGE_IN_DATA 100
/* general communication layer return codes */
#define SLURM_UNEXPECTED_MSG_ERROR 220
#define SLURM_PROTOCOL_VERSION_ERROR -100
#define SLURM_SOCKET_ERROR -1
#define SLURM_PROTOCOL_SUCCESS 0
#define SLURM_PROTOCOL_FAILURE -1
/* general return codes */
#define SLURM_SUCCESS 0
#define SLURM_FAILURE -1
#endif
#ifndef _SLURM_PROTOCOL_IMPLEMENTATION_C
#define _SLURM_PROTOCOL_IMPLEMENTATION_C_
#if MONG_IMPLEMENTATION
# include <src/common/slurm_protocol_mongo_implementation.h>
#else
# include <src/common/slurm_protocol_socket_implementation.h>
#endif
#endif
#ifndef _SLURM_PROTOCOL_MONGO_COMMON_H
#define _SLURM_PROTOCOL_MONGO_COMMON_H
#if HAVE_CONFIG_H
# include <config.h>
# if HAVE_INTTYPES_H
# include <inttypes.h>
# else
# if HAVE_STDINT_H
# include <stdint.h>
# endif
# endif /* HAVE_INTTYPES_H */
#else /* !HAVE_CONFIG_H */
# include <inttypes.h>
#endif /* HAVE_CONFIG_H */
#include <netinet/in.h>
#define AF_SLURM AF_INET
#define SLURM_INADDR_ANY 0x00000000
/* LINUX SPECIFIC */
/* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */
typedef uint32_t slurm_fd ;
/* this is the slurm equivalent of the BSD sockets sockaddr */
typedef struct mongo_addr_t slurm_addr ;
/*struct kevin {
int16_t family ;
uint16_t port ;
uint32_t address ;
char pad[16 - sizeof ( int16_t ) - sizeof (uint16_t) - sizeof (uint32_t) ] ;
} ;
*/
/* SLURM datatypes */
/* this is a custom data type to describe the slurm msg type type that is placed in the slurm protocol header
* while just an short now, it may change in the future */
/* Now defined in ../../src/common/slurm_protocol_defs.h
* typedef uint16_t slurm_msg_type_t ;
*/
#endif
...@@ -45,13 +45,21 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) ...@@ -45,13 +45,21 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len )
{ {
switch ( msg -> msg_type ) switch ( msg -> msg_type )
{ {
case REQUEST_BUILD_INFO:
pack_last_update ( ( last_update_msg_t * ) msg -> data , ( void ** ) buffer , buf_len ) ;
break;
case RESPONSE_BUILD_INFO:
pack_build_info ( ( build_info_msg_t * ) msg -> data , (void ** ) buffer , buf_len ) ;
break;
case RESPONSE_BUILD_INFO_RC:
break;
case REQUEST_NODE_REGISRATION_STATUS : case REQUEST_NODE_REGISRATION_STATUS :
break ; break ;
case MESSAGE_NODE_REGISRATION_STATUS : case MESSAGE_NODE_REGISRATION_STATUS :
break ; break ;
case REQUEST_RESOURCE_ALLOCATION : case REQUEST_RESOURCE_ALLOCATION :
case REQUEST_SUBMIT_BATCH_JOB : case REQUEST_SUBMIT_BATCH_JOB :
pack_job_desc ( (job_desc_t * ) msg -> data , ( void ** ) buffer , buf_len ) ; pack_job_desc ( (job_desc_msg_t * ) msg -> data , ( void ** ) buffer , buf_len ) ;
break ; break ;
case RESPONSE_RESOURCE_ALLOCATION : case RESPONSE_RESOURCE_ALLOCATION :
break ; break ;
...@@ -118,17 +126,25 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) ...@@ -118,17 +126,25 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len )
* buffer - destination of the pack, note buffer will be incremented by underlying unpack routines * buffer - destination of the pack, note buffer will be incremented by underlying unpack routines
* length - length of buffer, note length will be decremented by underlying unpack routines * length - length of buffer, note length will be decremented by underlying unpack routines
*/ */
int unpack_msg ( slurm_msg_t * msg ,char ** buffer , uint32_t * buf_len ) int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len )
{ {
switch ( msg-> msg_type ) switch ( msg-> msg_type )
{ {
case REQUEST_BUILD_INFO:
unpack_last_update ( ( last_update_msg_t **) &(msg -> data) , ( void ** ) buffer , buf_len ) ;
break;
case RESPONSE_BUILD_INFO:
unpack_build_info ( ( build_info_msg_t ** ) &(msg -> data) , (void ** ) buffer , buf_len ) ;
break;
case RESPONSE_BUILD_INFO_RC:
break;
case REQUEST_NODE_REGISRATION_STATUS : case REQUEST_NODE_REGISRATION_STATUS :
break ; break ;
case MESSAGE_NODE_REGISRATION_STATUS : case MESSAGE_NODE_REGISRATION_STATUS :
break ; break ;
case REQUEST_RESOURCE_ALLOCATION : case REQUEST_RESOURCE_ALLOCATION :
case REQUEST_SUBMIT_BATCH_JOB : case REQUEST_SUBMIT_BATCH_JOB :
unpack_job_desc ( ( job_desc_t **) & ( msg-> data ), ( void ** ) buffer , buf_len ) ; unpack_job_desc ( ( job_desc_msg_t **) & ( msg-> data ), ( void ** ) buffer , buf_len ) ;
break ; break ;
case RESPONSE_RESOURCE_ALLOCATION : case RESPONSE_RESOURCE_ALLOCATION :
break ; break ;
...@@ -192,24 +208,61 @@ int unpack_msg ( slurm_msg_t * msg ,char ** buffer , uint32_t * buf_len ) ...@@ -192,24 +208,61 @@ int unpack_msg ( slurm_msg_t * msg ,char ** buffer , uint32_t * buf_len )
void pack_node_registration_status_msg ( node_registration_status_msg_t * msg, char ** buffer , uint32_t * length ) void pack_node_registration_status_msg ( node_registration_status_msg_t * msg, char ** buffer , uint32_t * length )
{ {
pack32 ( msg -> timestamp , ( void ** ) buffer , length ) ; pack32 ( msg -> timestamp , ( void ** ) buffer , length ) ;
pack32 ( msg -> memory_size , ( void ** ) buffer , length ) ; packstr ( msg -> node_name , ( void ** ) buffer , length ) ;
pack32 ( msg -> cpus , ( void ** ) buffer , length ) ;
pack32 ( msg -> real_memory_size , ( void ** ) buffer , length ) ;
pack32 ( msg -> temporary_disk_space , ( void ** ) buffer , length ) ; pack32 ( msg -> temporary_disk_space , ( void ** ) buffer , length ) ;
} }
void unpack_node_registration_status_msg ( node_registration_status_msg_t * msg , char ** buffer , uint32_t * length ) int unpack_node_registration_status_msg ( node_registration_status_msg_t ** msg , char ** buffer , uint32_t * length )
{ {
unpack32 ( & msg -> timestamp , ( void ** ) buffer , length ) ; uint16_t uint16_tmp;
unpack32 ( & msg -> memory_size , ( void ** ) buffer , length ) ; node_registration_status_msg_t * node_reg_ptr ;
unpack32 ( & msg -> temporary_disk_space , ( void ** ) buffer , length ) ; /* alloc memory for structure */
node_reg_ptr = malloc ( sizeof ( node_registration_status_msg_t ) ) ;
if (node_reg_ptr == NULL)
{
return ENOMEM;
}
/* load the data values */
/* unpack timestamp of snapshot */
unpack32 ( & node_reg_ptr -> timestamp , ( void ** ) buffer , length ) ;
unpackstr_ptr_malloc ( & node_reg_ptr -> node_name , &uint16_tmp, ( void ** ) buffer , length ) ;
unpack32 ( & node_reg_ptr -> cpus , ( void ** ) buffer , length ) ;
unpack32 ( & node_reg_ptr -> real_memory_size , ( void ** ) buffer , length ) ;
unpack32 ( & node_reg_ptr -> temporary_disk_space , ( void ** ) buffer , length ) ;
*msg = node_reg_ptr ;
return 0 ;
} }
int unpack_build_info ( struct build_buffer **build_buffer_ptr, void * buffer , int buffer_size ) void pack_build_info ( build_info_msg_t * build_ptr, void ** buf_ptr , int * buffer_size )
{ {
uint16_t uint16_tmp; pack32 (build_ptr->last_update, buf_ptr, buffer_size);
uint32_t uint32_time; pack16 (build_ptr->backup_interval, buf_ptr, buffer_size);
struct build_table * build_ptr ; packstr (build_ptr->backup_location, buf_ptr, buffer_size);
void * buf_ptr; packstr (build_ptr->backup_machine, buf_ptr, buffer_size);
packstr (build_ptr->control_daemon, buf_ptr, buffer_size);
packstr (build_ptr->control_machine, buf_ptr, buffer_size);
pack16 (build_ptr->controller_timeout, buf_ptr, buffer_size);
packstr (build_ptr->epilog, buf_ptr, buffer_size);
pack16 (build_ptr->fast_schedule, buf_ptr, buffer_size);
pack16 (build_ptr->hash_base, buf_ptr, buffer_size);
pack16 (build_ptr->heartbeat_interval, buf_ptr, buffer_size);
packstr (build_ptr->init_program, buf_ptr, buffer_size);
pack16 (build_ptr->kill_wait, buf_ptr, buffer_size);
packstr (build_ptr->prioritize, buf_ptr, buffer_size);
packstr (build_ptr->prolog, buf_ptr, buffer_size);
packstr (build_ptr->server_daemon, buf_ptr, buffer_size);
pack16 (build_ptr->server_timeout, buf_ptr, buffer_size);
packstr (build_ptr->slurm_conf, buf_ptr, buffer_size);
packstr (build_ptr->tmp_fs, buf_ptr, buffer_size);
}
int unpack_build_info ( build_info_msg_t **build_buffer_ptr, void ** buffer , int * buffer_size )
{
uint16_t uint16_tmp;
build_info_msg_t * build_ptr ;
/* alloc memory for structure */ /* alloc memory for structure */
build_ptr = malloc ( sizeof ( struct build_table ) ) ; build_ptr = malloc ( sizeof ( struct build_table ) ) ;
if (build_ptr == NULL) if (build_ptr == NULL)
...@@ -219,37 +272,27 @@ int unpack_build_info ( struct build_buffer **build_buffer_ptr, void * buffer , ...@@ -219,37 +272,27 @@ int unpack_build_info ( struct build_buffer **build_buffer_ptr, void * buffer ,
/* load the data values */ /* load the data values */
/* unpack timestamp of snapshot */ /* unpack timestamp of snapshot */
buf_ptr = buffer ; unpack32 (&build_ptr->last_update, buffer, buffer_size);
unpack32 (&uint32_time, &buf_ptr, &buffer_size); unpack16 (&build_ptr->backup_interval, buffer, buffer_size);
unpackstr_ptr_malloc (&build_ptr->backup_location, &uint16_tmp, buffer, buffer_size);
unpack16 (&build_ptr->backup_interval, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->backup_machine, &uint16_tmp, buffer, buffer_size);
unpackstr_ptr (&build_ptr->backup_location, &uint16_tmp, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->control_daemon, &uint16_tmp, buffer, buffer_size);
unpackstr_ptr (&build_ptr->backup_machine, &uint16_tmp, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->control_machine, &uint16_tmp, buffer, buffer_size);
unpackstr_ptr (&build_ptr->control_daemon, &uint16_tmp, &buf_ptr, &buffer_size); unpack16 (&build_ptr->controller_timeout, buffer, buffer_size);
unpackstr_ptr (&build_ptr->control_machine, &uint16_tmp, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->epilog, &uint16_tmp, buffer, buffer_size);
unpack16 (&build_ptr->controller_timeout, &buf_ptr, &buffer_size); unpack16 (&build_ptr->fast_schedule, buffer, buffer_size);
unpackstr_ptr (&build_ptr->epilog, &uint16_tmp, &buf_ptr, &buffer_size); unpack16 (&build_ptr->hash_base, buffer, buffer_size);
unpack16 (&build_ptr->fast_schedule, &buf_ptr, &buffer_size); unpack16 (&build_ptr->heartbeat_interval, buffer, buffer_size);
unpack16 (&build_ptr->hash_base, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->init_program, &uint16_tmp, buffer, buffer_size);
unpack16 (&build_ptr->heartbeat_interval, &buf_ptr, &buffer_size); unpack16 (&build_ptr->kill_wait, buffer, buffer_size);
unpackstr_ptr (&build_ptr->init_program, &uint16_tmp, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->prioritize, &uint16_tmp, buffer, buffer_size);
unpack16 (&build_ptr->kill_wait, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->prolog, &uint16_tmp, buffer, buffer_size);
unpackstr_ptr (&build_ptr->prioritize, &uint16_tmp, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->server_daemon, &uint16_tmp, buffer, buffer_size);
unpackstr_ptr (&build_ptr->prolog, &uint16_tmp, &buf_ptr, &buffer_size); unpack16 (&build_ptr->server_timeout, buffer, buffer_size);
unpackstr_ptr (&build_ptr->server_daemon, &uint16_tmp, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->slurm_conf, &uint16_tmp, buffer, buffer_size);
unpack16 (&build_ptr->server_timeout, &buf_ptr, &buffer_size); unpackstr_ptr_malloc (&build_ptr->tmp_fs, &uint16_tmp, buffer, buffer_size);
unpackstr_ptr (&build_ptr->slurm_conf, &uint16_tmp, &buf_ptr, &buffer_size); *build_buffer_ptr = build_ptr ;
unpackstr_ptr (&build_ptr->tmp_fs, &uint16_tmp, &buf_ptr, &buffer_size); return 0 ;
*build_buffer_ptr = malloc (sizeof (struct build_buffer));
if (*build_buffer_ptr == NULL) {
free (build_ptr);
return ENOMEM;
}
(*build_buffer_ptr)->last_update = (time_t) uint32_time;
(*build_buffer_ptr)->raw_buffer_ptr = buffer;
(*build_buffer_ptr)->build_table_ptr = build_ptr;
return 0;
} }
/* pack_job_desc /* pack_job_desc
...@@ -258,7 +301,7 @@ int unpack_build_info ( struct build_buffer **build_buffer_ptr, void * buffer , ...@@ -258,7 +301,7 @@ int unpack_build_info ( struct build_buffer **build_buffer_ptr, void * buffer ,
* buf_ptr - destination of the pack, note buffer will be incremented by underlying pack routines * buf_ptr - destination of the pack, note buffer will be incremented by underlying pack routines
* buffer_size - length of buffer, note length will be decremented by underlying pack routines * buffer_size - length of buffer, note length will be decremented by underlying pack routines
*/ */
void pack_job_desc ( job_desc_t * job_desc_ptr, void ** buf_ptr , int * buffer_size ) void pack_job_desc ( job_desc_msg_t * job_desc_ptr, void ** buf_ptr , int * buffer_size )
{ {
/* load the data values */ /* load the data values */
/* unpack timestamp of snapshot */ /* unpack timestamp of snapshot */
...@@ -295,38 +338,38 @@ void pack_job_desc ( job_desc_t * job_desc_ptr, void ** buf_ptr , int * buffer_s ...@@ -295,38 +338,38 @@ void pack_job_desc ( job_desc_t * job_desc_ptr, void ** buf_ptr , int * buffer_s
* buf_ptr - destination of the pack, note buffer will be incremented by underlying unpack routines * buf_ptr - destination of the pack, note buffer will be incremented by underlying unpack routines
* buffer_size - length of buffer, note length will be decremented by underlying unpack routines * buffer_size - length of buffer, note length will be decremented by underlying unpack routines
*/ */
void unpack_job_desc ( job_desc_t **job_desc_buffer_ptr, void ** buf_ptr , int * buffer_size ) int unpack_job_desc ( job_desc_msg_t **job_desc_buffer_ptr, void ** buf_ptr , int * buffer_size )
{ {
uint16_t uint16_tmp; uint16_t uint16_tmp;
job_desc_t * job_desc_ptr ; job_desc_msg_t * job_desc_ptr ;
/* alloc memory for structure */ /* alloc memory for structure */
job_desc_ptr = malloc ( sizeof ( job_desc_t ) ) ; job_desc_ptr = malloc ( sizeof ( job_desc_msg_t ) ) ;
if (job_desc_ptr== NULL) if (job_desc_ptr== NULL)
{ {
*job_desc_buffer_ptr = NULL ; *job_desc_buffer_ptr = NULL ;
return ; return ENOMEM ;
} }
/* load the data values */ /* load the data values */
/* unpack timestamp of snapshot */ /* unpack timestamp of snapshot */
unpack16 (&job_desc_ptr->contiguous, buf_ptr, buffer_size); unpack16 (&job_desc_ptr->contiguous, buf_ptr, buffer_size);
unpackstr_ptr (&job_desc_ptr->features, &uint16_tmp, buf_ptr, buffer_size); unpackstr_ptr_malloc (&job_desc_ptr->features, &uint16_tmp, buf_ptr, buffer_size);
unpackstr_ptr (&job_desc_ptr->groups, &uint16_tmp, buf_ptr, buffer_size); unpackstr_ptr_malloc (&job_desc_ptr->groups, &uint16_tmp, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->job_id, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->job_id, buf_ptr, buffer_size);
unpackstr_ptr (&job_desc_ptr->name, &uint16_tmp, buf_ptr, buffer_size); unpackstr_ptr_malloc (&job_desc_ptr->name, &uint16_tmp, buf_ptr, buffer_size);
unpackmem_ptr ( ( char ** ) &job_desc_ptr->partition_key, &uint16_tmp, buf_ptr, buffer_size); unpackmem_malloc ( ( char ** ) &job_desc_ptr->partition_key, &uint16_tmp, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->min_procs, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->min_procs, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->min_memory, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->min_memory, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->min_tmp_disk, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->min_tmp_disk, buf_ptr, buffer_size);
unpackstr_ptr (&job_desc_ptr->partition, &uint16_tmp, buf_ptr, buffer_size); unpackstr_ptr_malloc (&job_desc_ptr->partition, &uint16_tmp, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->priority, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->priority, buf_ptr, buffer_size);
unpackstr_ptr (&job_desc_ptr->partition, &uint16_tmp, buf_ptr, buffer_size); unpackstr_ptr_malloc (&job_desc_ptr->partition, &uint16_tmp, buf_ptr, buffer_size);
unpackstr_ptr (&job_desc_ptr->partition, &uint16_tmp, buf_ptr, buffer_size); unpackstr_ptr_malloc (&job_desc_ptr->partition, &uint16_tmp, buf_ptr, buffer_size);
unpack16 (&job_desc_ptr->shared, buf_ptr, buffer_size); unpack16 (&job_desc_ptr->shared, buf_ptr, buffer_size);
unpack32 (&job_desc_ptr->time_limit, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->time_limit, buf_ptr, buffer_size);
...@@ -336,17 +379,49 @@ void unpack_job_desc ( job_desc_t **job_desc_buffer_ptr, void ** buf_ptr , int * ...@@ -336,17 +379,49 @@ void unpack_job_desc ( job_desc_t **job_desc_buffer_ptr, void ** buf_ptr , int *
unpack32 (&job_desc_ptr->user_id, buf_ptr, buffer_size); unpack32 (&job_desc_ptr->user_id, buf_ptr, buffer_size);
*job_desc_buffer_ptr = job_desc_ptr ; *job_desc_buffer_ptr = job_desc_ptr ;
return 0 ;
}
void pack_last_update ( last_update_msg_t * msg , void ** buffer , uint32_t * length )
{
pack32 ( msg -> last_update , buffer , length ) ;
}
int unpack_last_update ( last_update_msg_t ** msg , void ** buffer , uint32_t * length )
{
last_update_msg_t * last_update_msg ;
last_update_msg = malloc ( sizeof ( last_update_msg_t ) ) ;
if ( last_update_msg == NULL)
{
*msg = NULL ;
return ENOMEM ;
}
unpack32 ( & last_update_msg -> last_update , buffer , length ) ;
*msg = last_update_msg ;
return 0 ;
} }
/* template /* template
void pack_ ( * msg , char ** buffer , uint32_t * length ) void pack_ ( * msg , void ** buffer , uint32_t * length )
{ {
pack16 ( msg -> , buffer , length ) ; pack16 ( msg -> , buffer , length ) ;
pack32 ( msg -> , buffer , length ) ; pack32 ( msg -> , buffer , length ) ;
} }
void unpack_ ( * msg , char ** buffer , uint32_t * length ) void unpack_ ( ** msg , void char ** buffer , uint32_t * length )
{ {
uint16_t uint16_tmp;
job_desc_msg_t * job_desc_ptr ;
job_desc_ptr = malloc ( sizeof ( job_desc_msg_t ) ) ;
if (job_desc_ptr== NULL)
{
*msg = NULL ;
return ;
}
unpack16 ( & msg -> , buffer , length ) ; unpack16 ( & msg -> , buffer , length ) ;
unpack32 ( & msg -> , buffer , length ) ; unpack32 ( & msg -> , buffer , length ) ;
} }
......
...@@ -29,21 +29,14 @@ int unpack_msg ( slurm_msg_t * msgi , char ** buffer , uint32_t * buf_len ) ; ...@@ -29,21 +29,14 @@ int unpack_msg ( slurm_msg_t * msgi , char ** buffer , uint32_t * buf_len ) ;
/* specific Pack / Unpack methods for slurm protocol bodies */ /* specific Pack / Unpack methods for slurm protocol bodies */
void pack_node_registration_status_msg ( node_registration_status_msg_t * msg , char ** buffer , uint32_t * length ) ; void pack_node_registration_status_msg ( node_registration_status_msg_t * msg , char ** buffer , uint32_t * length ) ;
void unpack_node_registration_status_msg (node_registration_status_msg_t * msg , char ** buffer , uint32_t * length ) ; int unpack_node_registration_status_msg (node_registration_status_msg_t ** msg , char ** buffer , uint32_t * length ) ;
void pack_job_desc ( job_desc_t *job_desc_ptr, void ** buffer , int * buf_len ) ; void pack_job_desc ( job_desc_msg_t *job_desc_msg_ptr, void ** buffer , int * buf_len ) ;
void unpack_job_desc ( job_desc_t **job_desc_buffer_ptr, void ** buffer , int * buffer_size ) ; int unpack_job_desc ( job_desc_msg_t **job_desc_msg_ptr, void ** buffer , int * buffer_size ) ;
/* template
void pack_ ( char ** buffer , uint32_t * length , * msg ) void pack_last_update ( last_update_msg_t * msg , void ** buffer , uint32_t * length ) ;
{ int unpack_last_update ( last_update_msg_t ** msg , void ** buffer , uint32_t * length ) ;
pack16 ( msg -> , buffer , length ) ;
pack32 ( msg -> , buffer , length ) ; void pack_build_info ( struct build_table * build_ptr, void ** buffer , int * buffer_size ) ;
} int unpack_build_info ( struct build_table **build_buffer_ptr, void ** buffer , int * buffer_size ) ;
void unpack_ ( char ** buffer , uint32_t * length , * messge )
{
unpack16 ( & msg -> , buffer , length ) ;
unpack32 ( & msg -> , buffer , length ) ;
}
*/
#endif #endif
#ifndef _SLURM_PROTOCOL_SOCKET_COMMON_H
#define _SLURM_PROTOCOL_SOCKET_COMMON_H
#if HAVE_CONFIG_H
# include <config.h>
# if HAVE_INTTYPES_H
# include <inttypes.h>
# else
# if HAVE_STDINT_H
# include <stdint.h>
# endif
# endif /* HAVE_INTTYPES_H */
#else /* !HAVE_CONFIG_H */
# include <inttypes.h>
#endif /* HAVE_CONFIG_H */
#include <netinet/in.h>
#define AF_SLURM AF_INET
#define SLURM_INADDR_ANY 0x00000000
/* LINUX SPECIFIC */
/* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */
typedef uint32_t slurm_fd ;
/* this is the slurm equivalent of the BSD sockets sockaddr */
typedef struct sockaddr_in slurm_addr ;
/*struct kevin {
int16_t family ;
uint16_t port ;
uint32_t address ;
char pad[16 - sizeof ( int16_t ) - sizeof (uint16_t) - sizeof (uint32_t) ] ;
} ;
*/
/* SLURM datatypes */
/* this is a custom data type to describe the slurm msg type type that is placed in the slurm protocol header
* while just an short now, it may change in the future */
/* Now defined in ../../src/common/slurm_protocol_defs.h
* typedef uint16_t slurm_msg_type_t ;
*/
#endif
...@@ -65,7 +65,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin ...@@ -65,7 +65,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin
} }
if ( ( recv_len = _slurm_recv ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) ) if ( ( recv_len = _slurm_recv ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) )
*/ */
if ( ( recv_len = _slurm_recv ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) ) if ( ( recv_len = _slurm_recv ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) )
{ {
debug ( "Error receiving legth of datagram. Total Bytes Sent %i \n", recv_len ) ; debug ( "Error receiving legth of datagram. Total Bytes Sent %i \n", recv_len ) ;
} }
...@@ -75,7 +75,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin ...@@ -75,7 +75,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin
while ( total_len < transmit_size ) while ( total_len < transmit_size )
{ {
/* if ( ( recv_len = _slurm_recv ( connection_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) */ /* if ( ( recv_len = _slurm_recv ( connection_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) */
if ( ( recv_len = _slurm_recv ( open_fd , buffer , transmit_size , NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR ) if ( ( recv_len = _slurm_recv ( open_fd , buffer , transmit_size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) == SLURM_SOCKET_ERROR )
{ {
debug ( "Error receiving legth of datagram. errno %i \n", errno ) ; debug ( "Error receiving legth of datagram. errno %i \n", errno ) ;
return recv_len ; return recv_len ;
...@@ -88,7 +88,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin ...@@ -88,7 +88,7 @@ ssize_t _slurm_msg_recvfrom ( slurm_fd open_fd, char *buffer , size_t size , uin
/* /*
_slurm_close ( connection_fd ) ; _slurm_close ( connection_fd ) ;
*/ */
return recv_len ; return total_len ;
} }
ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address ) ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint32_t flags, slurm_addr * slurm_address )
...@@ -109,13 +109,13 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3 ...@@ -109,13 +109,13 @@ ssize_t _slurm_msg_sendto ( slurm_fd open_fd, char *buffer , size_t size , uint3
} }
if ( ( send_len = _slurm_send ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) ) if ( ( send_len = _slurm_send ( connection_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) )
*/ */
if ( ( send_len = _slurm_send ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) ) if ( ( send_len = _slurm_send ( open_fd , size_buffer_temp , sizeof ( uint32_t ) , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ) != sizeof ( uint32_t ) )
{ {
debug ( "Error sending length of datagram\n" ) ; debug ( "Error sending length of datagram\n" ) ;
} }
/* send_len = _slurm_send ( connection_fd , buffer , size , NO_SEND_RECV_FLAGS ) ; */ /* send_len = _slurm_send ( connection_fd , buffer , size , NO_SEND_RECV_FLAGS ) ; */
send_len = _slurm_send ( open_fd , buffer , size , NO_SEND_RECV_FLAGS ) ; send_len = _slurm_send ( open_fd , buffer , size , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS ) ;
if ( send_len != size ) if ( send_len != size )
{ {
debug ( "_slurm_msg_sendto only transmitted %i of %i bytes\n", send_len , size ) ; debug ( "_slurm_msg_sendto only transmitted %i of %i bytes\n", send_len , size ) ;
...@@ -148,7 +148,7 @@ slurm_fd _slurm_listen_stream ( slurm_addr * slurm_address ) ...@@ -148,7 +148,7 @@ slurm_fd _slurm_listen_stream ( slurm_addr * slurm_address )
return rc ; return rc ;
} }
if ( ( rc = _slurm_listen ( connection_fd , DEFAULT_LISTEN_BACKLOG ) ) == SLURM_SOCKET_ERROR ) if ( ( rc = _slurm_listen ( connection_fd , SLURM_PROTOCOL_DEFAULT_LISTEN_BACKLOG ) ) == SLURM_SOCKET_ERROR )
{ {
debug( "Error listening on slurm stream socket: errno %i\n" , errno ) ; debug( "Error listening on slurm stream socket: errno %i\n" , errno ) ;
return rc ; return rc ;
...@@ -220,7 +220,7 @@ extern slurm_fd _slurm_create_socket ( slurm_socket_type_t type ) ...@@ -220,7 +220,7 @@ extern slurm_fd _slurm_create_socket ( slurm_socket_type_t type )
* one will be chosen automatically. Returns 0 on success, -1 for errors. */ * one will be chosen automatically. Returns 0 on success, -1 for errors. */
extern int _slurm_socketpair (int __domain, int __type, int __protocol, int __fds[2]) extern int _slurm_socketpair (int __domain, int __type, int __protocol, int __fds[2])
{ {
return SLURM_NOT_IMPLEMENTED ; return SLURM_PROTOCOL_FUNCTION_NOT_IMPLEMENTED ;
} }
/* Give the socket FD the local address ADDR (which is LEN bytes long). */ /* Give the socket FD the local address ADDR (which is LEN bytes long). */
...@@ -404,7 +404,7 @@ void _slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ...@@ -404,7 +404,7 @@ void _slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host
void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host )
{ {
struct hostent * host_info = gethostbyname ( host ) ; struct hostent * host_info = gethostbyname ( host ) ;
memcpy ( & slurm_address -> sin_addr . s_addr , & host_info -> h_addr , host_info -> h_length ) ; memcpy ( & slurm_address -> sin_addr . s_addr , host_info -> h_addr , host_info -> h_length ) ;
slurm_address -> sin_family = AF_SLURM ; slurm_address -> sin_family = AF_SLURM ;
slurm_address -> sin_port = htons ( port ) ; slurm_address -> sin_port = htons ( port ) ;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment