Skip to content
Snippets Groups Projects
Commit 0a1fb314 authored by tewk's avatar tewk
Browse files

Continued clean up and restructuring.

I am implementing the new comm layer in all the api functions
added sockopt parameter SO_REUSEADDR to eliminate socket turn around time during debug.
parent 0c60d996
No related branches found
No related tags found
No related merge requests found
...@@ -159,6 +159,37 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -159,6 +159,37 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg )
return rc ; return rc ;
} }
/* looks like a receive message, except the buffer isn't unpacked it is just memcopied */
int slurm_receive_buffer2 ( slurm_fd open_fd , slurm_msg_t * msg )
{
char buftemp[SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE] ;
char * buffer = buftemp ;
header_t header ;
int rc ;
unsigned int unpack_len ;
unsigned int receive_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & (msg)->address ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error recieving msg socket: errno %i\n", errno ) ;
return rc ;
}
/* unpack header */
unpack_len = rc ;
unpack_header ( &header , & buffer , & unpack_len ) ;
if ( (rc = check_header_version ( & header ) ) < 0 )
{
return rc;
}
msg->data = malloc ( unpack_len ) ;
msg->data_size = unpack_len ;
memcpy ( msg->data , buffer , unpack_len ) ;
return unpack_len ;
}
/***** send msg functions */ /***** send msg functions */
/* sends a slurm_protocol msg to the slurmctld based on location information retrieved from the slurmd.conf /* sends a slurm_protocol msg to the slurmctld based on location information retrieved from the slurmd.conf
* if unable to contant the primary slurmctld attempts will be made to contact the backup controller * if unable to contant the primary slurmctld attempts will be made to contact the backup controller
...@@ -191,6 +222,30 @@ int slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -191,6 +222,30 @@ int slurm_send_controller_msg ( slurm_fd open_fd , slurm_msg_t * msg )
return rc ; return rc ;
} }
int slurm_send_controller_buffer2 ( slurm_fd open_fd , slurm_msg_t * msg )
{
int rc ;
slurm_addr primary_destination_address ;
slurm_addr secondary_destination_address ;
/* set slurm_addr structures */
slurm_set_addr ( & primary_destination_address , SLURM_PORT , PRIMARY_SLURM_CONTROLLER ) ;
slurm_set_addr ( & secondary_destination_address , SLURM_PORT , SECONDARY_SLURM_CONTROLLER ) ;
/* try to send to primary first then secondary */
msg -> address = primary_destination_address ;
if ( (rc = slurm_send_node_msg ( open_fd , msg ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Send message to primary controller failed" ) ;
msg -> address = secondary_destination_address ;
if ( (rc = slurm_send_node_msg ( open_fd , msg ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Send messge to secondary controller failed" ) ;
}
}
return rc ;
}
/* sends a message to an arbitrary node /* sends a message to an arbitrary node
* *
* open_fd - file descriptor to send msg on * open_fd - file descriptor to send msg on
...@@ -223,6 +278,33 @@ int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -223,6 +278,33 @@ int slurm_send_node_msg ( slurm_fd open_fd , slurm_msg_t * msg )
return rc ; return rc ;
} }
int slurm_send_node_buffer2 ( slurm_fd open_fd , slurm_msg_t * msg )
{
char buf_temp[SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE] ;
char * buffer = buf_temp ;
header_t header ;
int rc ;
unsigned int pack_len ;
/* initheader */
init_header ( & header , msg->msg_type , SLURM_PROTOCOL_NO_FLAGS ) ;
/* pack header */
pack_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
pack_header ( &header , & buffer , & pack_len ) ;
/* pack msg */
memcpy ( buffer , msg->data , msg->data_size ) ;
pack_len -= msg->data_size ;
/* send msg */
if ( ( rc = _slurm_msg_sendto ( open_fd , buf_temp , SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE - pack_len , SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , &msg->address ) ) == SLURM_SOCKET_ERROR )
{
debug ( "Error sending msg socket: errno %i\n", errno ) ;
}
return rc ;
}
/* /*
* *
* open_fd - file descriptor to receive msg on * open_fd - file descriptor to receive msg on
...@@ -238,9 +320,8 @@ int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slur ...@@ -238,9 +320,8 @@ int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slur
char * buffer = buftemp ; char * buffer = buftemp ;
header_t header ; header_t header ;
int rc ; int rc ;
int bytes_read; unsigned int unpack_len ; /* length left to upack */
unsigned int unpack_len ; unsigned int receive_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ; /* buffer size */
unsigned int receive_len = SLURM_PROTOCOL_MAX_MESSAGE_BUFFER_SIZE ;
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) ; if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , source_address ) ) == SLURM_SOCKET_ERROR ) ;
{ {
...@@ -249,15 +330,20 @@ int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slur ...@@ -249,15 +330,20 @@ int slurm_receive_buffer ( slurm_fd open_fd , slurm_addr * source_address , slur
} }
/* unpack header */ /* unpack header */
bytes_read = rc ;
unpack_len = rc ; unpack_len = rc ;
unpack_header ( &header , & buffer , & unpack_len ) ; unpack_header ( &header , & buffer , & unpack_len ) ;
rc = check_header_version ( & header ) ; /* unpack_header decrements the unpack_len by the size of the header, so
if ( rc < 0 ) return rc ; * unpack_len not holds the size of the data left in the buffer */
if ( ( rc = check_header_version ( & header ) ) < 0 )
{
return rc ;
}
*msg_type = header . msg_type ; *msg_type = header . msg_type ;
data_buffer = buffer ; /* *data_buffer = malloc ( unpack_len ) ; */
return bytes_read ; memcpy ( data_buffer , buffer , unpack_len ) ;
return unpack_len ;
} }
/* /*
......
...@@ -15,6 +15,7 @@ this may need to be increased to 350k-512k */ ...@@ -15,6 +15,7 @@ this may need to be increased to 350k-512k */
#define SLURM_PROTOCOL_VERSION 1 #define SLURM_PROTOCOL_VERSION 1
#define SLURM_PROTOCOL_NO_FLAGS 0 /* used in the header to set flags to empty */ #define SLURM_PROTOCOL_NO_FLAGS 0 /* used in the header to set flags to empty */
typedef uint16_t slurm_msg_type_t ;
#if MONG_IMPLEMENTATION #if MONG_IMPLEMENTATION
# include <src/common/slurm_protocol_mongo_common.h> # include <src/common/slurm_protocol_mongo_common.h>
......
#include <src/common/slurm_protocol_defs.h> #include <src/common/slurm_protocol_defs.h>
#include <stdlib.h> #include <stdlib.h>
#include <src/slurmctld/slurmctld.h> #include <src/slurmctld/slurmctld.h>
/* short messages*/
void slurm_free_last_update_msg ( last_update_msg_t * msg ) void slurm_free_last_update_msg ( last_update_msg_t * msg )
{ {
free ( msg ) ; free ( msg ) ;
...@@ -12,39 +12,74 @@ void slurm_free_job_id_msg ( job_id_msg_t * msg ) ...@@ -12,39 +12,74 @@ void slurm_free_job_id_msg ( job_id_msg_t * msg )
free ( msg ) ; free ( msg ) ;
} }
void slurm_free_job_desc_msg ( job_desc_msg_t * msg ) void slurm_free_return_code_msg ( return_code_msg_t * msg )
{ {
free ( msg->features ) ;
free ( msg->groups ) ;
free ( msg->name ) ;
free ( msg->partition_key ) ;
free ( msg->partition ) ;
free ( msg->req_nodes ) ;
free ( msg->job_script ) ;
free ( msg ) ; free ( msg ) ;
} }
void slurm_free_job_info_msg ( job_info_msg_t * msg )
void slurm_free_build_info ( build_info_msg_t * build_ptr )
{
if ( build_ptr )
{
free ( build_ptr->backup_location ) ;
free ( build_ptr->backup_machine ) ;
free ( build_ptr->control_daemon ) ;
free ( build_ptr->control_machine ) ;
free ( build_ptr->epilog ) ;
free ( build_ptr->init_program ) ;
free ( build_ptr->prolog ) ;
free ( build_ptr->server_daemon ) ;
free ( build_ptr->slurm_conf ) ;
free ( build_ptr->tmp_fs ) ;
free ( build_ptr ) ;
}
}
void slurm_free_job_desc_msg ( job_desc_msg_t * msg )
{
if ( msg )
{
free ( msg->features ) ;
free ( msg->groups ) ;
free ( msg->name ) ;
free ( msg->partition_key ) ;
free ( msg->partition ) ;
free ( msg->req_nodes ) ;
free ( msg->job_script ) ;
free ( msg ) ;
}
}
void slurm_free_job_info ( job_info_msg_t * msg )
{ {
int i; int i;
for (i = 0; i < msg -> record_count; i++) { if ( msg )
slurm_free_job_table ( & ( msg->job_array[i] ) ) ; {
if ( msg -> job_array )
{
for (i = 0; i < msg -> record_count; i++) {
slurm_free_job_table ( & ( msg->job_array[i] ) ) ;
}
}
free ( msg );
} }
free ( msg );
} }
void slurm_free_job_table ( job_table_t * job ) void slurm_free_job_table ( job_table_t * job )
{ {
if ( job ) if ( job )
{ {
if (job->nodes) free (job->nodes) ; free (job->nodes) ;
if (job->partition) free (job->partition) ; free (job->partition) ;
if (job->name) free (job->name) ; free (job->name) ;
if (job->node_inx) free (job->node_inx) ; free (job->node_inx) ;
if (job->req_nodes) free (job->req_nodes) ; free (job->req_nodes) ;
if (job->features) free (job->features) ; free (job->features) ;
if (job->job_script) free (job->job_script) ; free (job->job_script) ;
if (job->req_node_inx) free (job->req_node_inx) ; free (job->req_node_inx) ;
free ( job ) ; free ( job ) ;
} }
} }
...@@ -21,72 +21,32 @@ ...@@ -21,72 +21,32 @@
#include <src/common/slurm_protocol_common.h> #include <src/common/slurm_protocol_common.h>
/* SLURM Message types */ /* SLURM Message types */
typedef enum { typedef enum { test1, test2
REQUEST_NODE_REGISRATION_STATUS = 1,
MESSAGE_NODE_REGISRATION_STATUS = 2,
REQUEST_RESOURCE_ALLOCATION = 3,
RESPONSE_RESOURCE_ALLOCATION = 4,
REQUEST_CANCEL_JOB = 5,
REQUEST_RECONFIGURE = 6,
RESPONSE_CANCEL_JOB = 7,
REQUEST_JOB_INFO = 8,
RESPONSE_JOB_INFO = 9,
REQUEST_JOB_STEP_INFO = 10,
RESPONSE_JOB_STEP_INFO = 11,
REQUEST_NODE_INFO = 12,
RESPONSE_NODE_INFO = 13,
REQUEST_PARTITION_INFO = 14,
RESPONSE_PARTITION_INFO = 15,
REQUEST_ACCTING_INFO = 16,
RESPONSE_ACCOUNTING_INFO = 17,
REQUEST_BUILD_INFO = 18,
RESPONSE_BUILD_INFO = 19,
MESSAGE_UPLOAD_ACCOUNTING_INFO = 20,
RESPONSE_RECONFIGURE = 21,
REQUEST_SUBMIT_BATCH_JOB = 22,
RESPONSE_SUBMIT_BATCH_JOB = 23,
REQUEST_CANCEL_JOB_STEP = 24,
RESPONSE_CANCEL_JOB_STEP = 25,
REQUEST_SIGNAL_JOB = 26,
RESPONSE_SIGNAL_JOB = 27,
REQUEST_SIGNAL_JOB_STEP = 28,
RESPONSE_SIGNAL_JOB_STEP = 29,
REQUEST_BATCH_JOB_LAUNCH = 30,
RESPONSE_BATCH_JOB_LAUNCH = 31,
MESSAGE_TASK_EXIT = 32,
MESSAGE_REVOKE_JOB_CREDENTIAL = 33,
REQUEST_LAUNCH_TASKS = 34,
REQUEST_CREATE_JOB_STEP = 35,
RESPONSE_CREATE_JOB_STEP = 36,
REQUEST_RUN_JOB_STEP = 37,
RESPONSE_RUN_JOB_STEP = 38,
REQUEST_JOB_ATTACH = 39,
RESPONSE_JOB_ATTACH = 40,
RESPONSE_LAUNCH_TASKS = 41,
REQUEST_GET_KEY = 42,
RESPONSE_GET_KEY = 43,
REQUEST_GET_JOB_STEP_INFO = 44,
RESPONSE_GET_JOB_STEP_INFO = 45,
REQUEST_JOB_RESOURCE = 46,
RESPONSE_JOB_RESOURCE = 47
/*
REQUEST_RUN_JOB_STEP = 48,
RESPONSE_RUN_JOB_STEP = 49
*/
} SLURM_MSG_TYPE_T ; } SLURM_MSG_TYPE_T ;
typedef uint16_t slurm_msg_type_t ;
#define REQUEST_NODE_REGISRATION_STATUS 1 #define REQUEST_NODE_REGISRATION_STATUS 1001
#define MESSAGE_NODE_REGISRATION_STATUS 2 #define MESSAGE_NODE_REGISRATION_STATUS 1002
#define REQUEST_RECONFIGURE 1011
#define REQUEST_RESOURCE_ALLOCATION 3 #define RESPONSE_RECONFIGURE 1012
#define RESPONSE_RESOURCE_ALLOCATION 4
#define REQUEST_RESOURCE_ALLOCATION 4001
#define REQUEST_CANCEL_JOB 5 #define RESPONSE_RESOURCE_ALLOCATION 4002
#define RESPONSE_CANCEL_JOB 7 #define REQUEST_SUBMIT_BATCH_JOB 4011
#define RESPONSE_SUBMIT_BATCH_JOB 4012
#define REQUEST_RECONFIGURE 6 #define REQUEST_BATCH_JOB_LAUNCH 4021
#define RESPONSE_BATCH_JOB_LAUNCH 4022
#define REQUEST_SIGNAL_JOB 4031
#define RESPONSE_SIGNAL_JOB 4032
#define REQUEST_CANCEL_JOB 4041
#define RESPONSE_CANCEL_JOB 4042
#define REQUEST_JOB_RESOURCE 4051
#define RESPONSE_JOB_RESOURCE 4052
#define REQUEST_JOB_ATTACH 4061
#define RESPONSE_JOB_ATTACH 4062
#define MESSAGE_REVOKE_JOB_CREDENTIAL 4901
#define REQUEST_BUILD_INFO 3011
#define RESPONSE_BUILD_INFO 3012
#define REQUEST_JOB_INFO 3021 #define REQUEST_JOB_INFO 3021
#define RESPONSE_JOB_INFO 3022 #define RESPONSE_JOB_INFO 3022
#define REQUEST_JOB_STEP_INFO 3031 #define REQUEST_JOB_STEP_INFO 3031
...@@ -94,43 +54,33 @@ typedef uint16_t slurm_msg_type_t ; ...@@ -94,43 +54,33 @@ typedef uint16_t slurm_msg_type_t ;
#define REQUEST_NODE_INFO 3041 #define REQUEST_NODE_INFO 3041
#define RESPONSE_NODE_INFO 3042 #define RESPONSE_NODE_INFO 3042
#define REQUEST_PARTITION_INFO 3051 #define REQUEST_PARTITION_INFO 3051
#define RESPONSE_PATITION_INFO 3052 #define RESPONSE_PARTITION_INFO 3052
#define REQUEST_ACCTING_INFO 3061 #define REQUEST_ACCTING_INFO 3061
#define RESPONSE_ACCOUNTING_INFO 3062 #define RESPONSE_ACCOUNTING_INFO 3062
#define REQUEST_BUILD_INFO 3011 #define REQUEST_GET_JOB_STEP_INFO 3071
#define RESPONSE_BUILD_INFO 3012 #define RESPONSE_GET_JOB_STEP_INFO 4072
#define RESPONSE_BUILD_INFO_RC 3013
#define REQUEST_CREATE_JOB_STEP 5001
#define MESSAGE_UPLOAD_ACCOUNTING_INFO 20 #define RESPONSE_CREATE_JOB_STEP 5002
#define RESPONSE_RECONFIGURE 21 #define REQUEST_RUN_JOB_STEP 5011
#define REQUEST_SUBMIT_BATCH_JOB 22 #define RESPONSE_RUN_JOB_STEP 5012
#define RESPONSE_SUBMIT_BATCH_JOB 23 #define REQUEST_SIGNAL_JOB_STEP 5021
#define REQUEST_CANCEL_JOB_STEP 24 #define RESPONSE_SIGNAL_JOB_STEP 5022
#define RESPONSE_CANCEL_JOB_STEP 25 #define REQUEST_CANCEL_JOB_STEP 5031
#define REQUEST_SIGNAL_JOB 26 #define RESPONSE_CANCEL_JOB_STEP 5032
#define RESPONSE_SIGNAL_JOB 27
#define REQUEST_SIGNAL_JOB_STEP 28 #define REQUEST_LAUNCH_TASKS 6001
#define RESPONSE_SIGNAL_JOB_STEP 29 #define RESPONSE_LAUNCH_TASKS 6002
#define REQUEST_BATCH_JOB_LAUNCH 30 #define MESSAGE_TASK_EXIT 6003
#define RESPONSE_BATCH_JOB_LAUNCH 31
#define MESSAGE_TASK_EXIT 32 /*DPCS get key to sign submissions*/
#define MESSAGE_REVOKE_JOB_CREDENTIAL 33 #define REQUEST_GET_KEY 8001
#define REQUEST_LAUNCH_TASKS 34 #define RESPONSE_GET_KEY 8002
#define REQUEST_CREATE_JOB_STEP 35
#define RESPONSE_CREATE_JOB_STEP 36 #define RESPONSE_SLURM_RC 9000
#define REQUEST_RUN_JOB_STEP 37 #define MESSAGE_UPLOAD_ACCOUNTING_INFO 9010
#define RESPONSE_RUN_JOB_STEP 38
#define REQUEST_JOB_ATTACH 39 /*core api protocol message structures */
#define RESPONSE_JOB_ATTACH 40
#define RESPONSE_LAUNCH_TASKS 41
#define REQUEST_GET_KEY 42
#define RESPONSE_GET_KEY 43
#define REQUEST_GET_JOB_STEP_INFO 44
#define RESPONSE_GET_JOB_STEP_INFO 45
#define REQUEST_JOB_RESOURCE 46
#define RESPONSE_JOB_RESOURCE 47
#define RESPONSE_SLURM_RC 47
typedef struct slurm_protocol_header typedef struct slurm_protocol_header
{ {
uint16_t version ; uint16_t version ;
...@@ -148,6 +98,20 @@ typedef struct slurm_msg ...@@ -148,6 +98,20 @@ typedef struct slurm_msg
uint32_t data_size ; uint32_t data_size ;
} slurm_msg_t ; } slurm_msg_t ;
/* really short messages */
typedef struct last_update_msg {
uint32_t last_update;
} last_update_msg_t ;
typedef struct return_code_msg {
int32_t return_code;
} return_code_msg_t ;
typedef struct job_id_msg {
uint32_t job_id;
} job_id_msg_t ;
typedef struct slurm_node_registration_status_msg typedef struct slurm_node_registration_status_msg
{ {
uint32_t timestamp ; uint32_t timestamp ;
...@@ -187,18 +151,6 @@ typedef struct job_desc_msg { /* Job descriptor for submit, allocate, and upd ...@@ -187,18 +151,6 @@ typedef struct job_desc_msg { /* Job descriptor for submit, allocate, and upd
* to UID by API, can only be set if user is root */ * to UID by API, can only be set if user is root */
} job_desc_msg_t ; } job_desc_msg_t ;
typedef struct last_update_msg {
uint32_t last_update;
} last_update_msg_t ;
typedef struct return_code_msg {
int32_t return_code;
} return_code_msg_t ;
typedef struct job_id_msg {
uint32_t job_id;
} job_id_msg_t ;
struct build_table { struct build_table {
uint32_t last_update; /* last update time of the build parameters*/ uint32_t last_update; /* last update time of the build parameters*/
uint16_t backup_interval;/* slurmctld save state interval, seconds */ uint16_t backup_interval;/* slurmctld save state interval, seconds */
...@@ -263,9 +215,13 @@ typedef struct job_info_msg { ...@@ -263,9 +215,13 @@ typedef struct job_info_msg {
/* free message functions */ /* free message functions */
void inline slurm_free_last_update_msg ( last_update_msg_t * msg ) ; void inline slurm_free_last_update_msg ( last_update_msg_t * msg ) ;
void inline slurm_free_job_desc_msg ( job_desc_msg_t * msg ) ; void inline slurm_free_return_code_msg ( return_code_msg_t * msg ) ;
void inline slurm_free_job_info_msg ( job_info_msg_t * msg ) ;
void inline slurm_free_job_table ( job_table_t * job ) ;
void inline slurm_free_job_id_msg ( job_id_msg_t * msg ) ; void inline slurm_free_job_id_msg ( job_id_msg_t * msg ) ;
void inline slurm_free_build_info ( build_info_msg_t * build_ptr ) ;
void inline slurm_free_job_info ( job_info_msg_t * msg ) ;
void inline slurm_free_job_table ( job_table_t * job ) ;
void inline slurm_free_job_desc_msg ( job_desc_msg_t * msg ) ;
#endif #endif
...@@ -45,14 +45,17 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) ...@@ -45,14 +45,17 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len )
{ {
switch ( msg -> msg_type ) switch ( msg -> msg_type )
{ {
case REQUEST_BUILD_INFO: case REQUEST_BUILD_INFO :
case REQUEST_JOB_INFO :
case REQUEST_JOB_STEP_INFO :
case REQUEST_NODE_INFO :
case REQUEST_PARTITION_INFO :
case REQUEST_ACCTING_INFO :
pack_last_update ( ( last_update_msg_t * ) msg -> data , ( void ** ) buffer , buf_len ) ; pack_last_update ( ( last_update_msg_t * ) msg -> data , ( void ** ) buffer , buf_len ) ;
break; break;
case RESPONSE_BUILD_INFO: case RESPONSE_BUILD_INFO:
pack_build_info ( ( build_info_msg_t * ) msg -> data , (void ** ) buffer , buf_len ) ; pack_build_info ( ( build_info_msg_t * ) msg -> data , (void ** ) buffer , buf_len ) ;
break; break ;
case RESPONSE_BUILD_INFO_RC:
break;
case REQUEST_NODE_REGISRATION_STATUS : case REQUEST_NODE_REGISRATION_STATUS :
break ; break ;
case MESSAGE_NODE_REGISRATION_STATUS : case MESSAGE_NODE_REGISRATION_STATUS :
...@@ -81,7 +84,7 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) ...@@ -81,7 +84,7 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len )
case RESPONSE_SIGNAL_JOB : case RESPONSE_SIGNAL_JOB :
case RESPONSE_SIGNAL_JOB_STEP : case RESPONSE_SIGNAL_JOB_STEP :
break ; break ;
case REQUEST_JOB_INFO : case RESPONSE_JOB_INFO :
break ; break ;
case REQUEST_JOB_ATTACH : case REQUEST_JOB_ATTACH :
break ; break ;
...@@ -130,14 +133,17 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len ) ...@@ -130,14 +133,17 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len )
{ {
switch ( msg-> msg_type ) switch ( msg-> msg_type )
{ {
case REQUEST_BUILD_INFO: case REQUEST_BUILD_INFO :
case REQUEST_JOB_INFO :
case REQUEST_JOB_STEP_INFO :
case REQUEST_NODE_INFO :
case REQUEST_PARTITION_INFO :
case REQUEST_ACCTING_INFO :
unpack_last_update ( ( last_update_msg_t **) &(msg -> data) , ( void ** ) buffer , buf_len ) ; unpack_last_update ( ( last_update_msg_t **) &(msg -> data) , ( void ** ) buffer , buf_len ) ;
break; break;
case RESPONSE_BUILD_INFO: case RESPONSE_BUILD_INFO:
unpack_build_info ( ( build_info_msg_t ** ) &(msg -> data) , (void ** ) buffer , buf_len ) ; unpack_build_info ( ( build_info_msg_t ** ) &(msg -> data) , (void ** ) buffer , buf_len ) ;
break; break;
case RESPONSE_BUILD_INFO_RC:
break;
case REQUEST_NODE_REGISRATION_STATUS : case REQUEST_NODE_REGISRATION_STATUS :
break ; break ;
case MESSAGE_NODE_REGISRATION_STATUS : case MESSAGE_NODE_REGISRATION_STATUS :
...@@ -166,8 +172,6 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len ) ...@@ -166,8 +172,6 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len )
case RESPONSE_SIGNAL_JOB : case RESPONSE_SIGNAL_JOB :
case RESPONSE_SIGNAL_JOB_STEP : case RESPONSE_SIGNAL_JOB_STEP :
break ; break ;
case REQUEST_JOB_INFO :
break ;
case REQUEST_JOB_ATTACH : case REQUEST_JOB_ATTACH :
break ; break ;
case RESPONSE_JOB_ATTACH : case RESPONSE_JOB_ATTACH :
......
...@@ -136,12 +136,18 @@ slurm_fd _slurm_listen_stream ( slurm_addr * slurm_address ) ...@@ -136,12 +136,18 @@ slurm_fd _slurm_listen_stream ( slurm_addr * slurm_address )
{ {
int rc ; int rc ;
slurm_fd connection_fd ; slurm_fd connection_fd ;
const int one = 1;
if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) == SLURM_SOCKET_ERROR ) if ( ( connection_fd =_slurm_create_socket ( SLURM_STREAM ) ) == SLURM_SOCKET_ERROR )
{ {
debug( "Error creating slurm stream socket: errno %i\n", errno ) ; debug( "Error creating slurm stream socket: errno %i\n", errno ) ;
return connection_fd ; return connection_fd ;
} }
if ( ( rc = _slurm_setsockopt(connection_fd , SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one) ) ) ) {
debug("setsockopt SO_REUSEADDR");
return rc ;
}
if ( ( rc = _slurm_bind ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR ) if ( ( rc = _slurm_bind ( connection_fd , ( struct sockaddr const * ) slurm_address , sizeof ( slurm_addr ) ) ) == SLURM_SOCKET_ERROR )
{ {
debug( "Error binding slurm stream socket: errno %i\n" , errno ) ; debug( "Error binding slurm stream socket: errno %i\n" , errno ) ;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment