From 4faba127de79656bb8e793207caa24627742c330 Mon Sep 17 00:00:00 2001 From: tewk <tewk@unknown> Date: Thu, 25 Jul 2002 19:58:54 +0000 Subject: [PATCH] Added packmem_array function to pack fixed launch_task_response msg added slurm_protocol and slurmd errnos to slurm_errno.h added print_slurm_addr to src/common/slurm_protocol_api.c added print_slurm_credential to src/common/slurm_protocol_util.c cleaned up debug printing and error reporting for src/slurmd/shmem_struct.c more cleanup etc --- src/common/pack.c | 31 +++++ src/common/pack.h | 20 +++ src/common/slurm_errno.h | 2 + src/common/slurm_protocol_api.c | 7 +- src/common/slurm_protocol_api.h | 6 + src/common/slurm_protocol_interface.h | 1 + src/common/slurm_protocol_pack.c | 8 +- src/common/slurm_protocol_pack.h | 3 + src/common/slurm_protocol_socket_common.h | 2 +- .../slurm_protocol_socket_implementation.c | 9 ++ src/common/slurm_protocol_util.c | 122 ++++++++++++++++++ src/common/slurm_protocol_util.h | 9 ++ src/slurmd/shmem_struct.c | 13 +- src/slurmd/slurmd.c | 21 ++- src/slurmd/task_mgr.c | 7 +- 15 files changed, 247 insertions(+), 14 deletions(-) diff --git a/src/common/pack.c b/src/common/pack.c index 2fab8cfc6cd..993fe4a22ef 100644 --- a/src/common/pack.c +++ b/src/common/pack.c @@ -334,3 +334,34 @@ _unpackstrarray (char ***valp, uint16_t *size_valp, void **bufp, int *lenp) *valp = NULL; } + +/* + * Given a pointer to memory (valp) and a size (size_val), + * store the data at valp. Advance bufp and decrement lenp by (size of size_val) + */ +void +_packmem_array(char *valp, uint16_t size_val, void **bufp, int *lenp) +{ + memcpy(*bufp, valp, size_val); + (size_t)*bufp += size_val; + *lenp -= size_val; +} + +/* + * Given 'bufp' pointing to a network byte order 32-bit integer + * (size) and an arbitrary data string, return a pointer to the + * data string in 'valp'. Also return the sizes of 'valp' in bytes. + * Advance bufp and decrement lenp by 4 bytes (size of memory + * size records) plus the actual buffer size. + */ +void +_unpackmem_array(char *valp, uint16_t size_valp, void **bufp, int *lenp) +{ + if (size_valp > 0) { + memcpy ( valp, *bufp, size_valp); + (size_t)*bufp += size_valp; + *lenp -= size_valp; + } + else + *valp = 0 ; +} diff --git a/src/common/pack.h b/src/common/pack.h index 7cbafc93e73..e694cbc4af9 100644 --- a/src/common/pack.h +++ b/src/common/pack.h @@ -34,6 +34,9 @@ void _unpack32array( uint32_t **valp, uint16_t* size_val, void **bufp, int *lenp void _packstrarray(char **valp, uint16_t size_val, void **bufp, int *lenp); void _unpackstrarray(char ***valp, uint16_t* size_val, void **bufp, int *lenp); +void _packmem_array(char *valp, uint16_t size_val, void **bufp, int *lenp); +void _unpackmem_array(char *valp, uint16_t size_valp, void **bufp, int *lenp); + void _packmem(char *valp, uint16_t size_val, void **bufp, int *lenp); void _unpackmem(char *valp, uint16_t *size_valp, void **bufp, int *lenp); void _unpackmem_ptr(char **valp, uint16_t *size_valp, void **bufp, int *lenp); @@ -194,5 +197,22 @@ void _unpackmem_malloc(char **valp, uint16_t *size_valp, void **bufp, int *lenp) _unpackmem_xmalloc(valp,(uint16_t *)size_valp,bufp,lenp);\ } while (0) +#define packmem_array(valp,size,bufp,lenp) do { \ + assert(size == 0 || valp != NULL); \ + assert((bufp) != NULL && *(bufp) != NULL); \ + assert((lenp) != NULL); \ + assert(*(lenp) >= (size)); \ + _packmem_array(valp,(uint16_t)size,bufp,lenp); \ +} while (0) + +#define unpackmem_array(valp,size,bufp,lenp) do { \ + assert(valp != NULL); \ + assert(sizeof(size) == sizeof(uint16_t)); \ + assert((bufp) != NULL && *(bufp) != NULL); \ + assert((lenp) != NULL); \ + assert(*(lenp) >= sizeof(uint16_t)); \ + _unpackmem_array(valp,size,bufp,lenp); \ +} while (0) + #endif /* _PACK_INCLUDED */ diff --git a/src/common/slurm_errno.h b/src/common/slurm_errno.h index 78b835462ff..e42d7c2c796 100644 --- a/src/common/slurm_errno.h +++ b/src/common/slurm_errno.h @@ -8,4 +8,6 @@ #define ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN 90000 #define ESLURMD_KILL_TASK_FAILED 90001 #define ESLURMD_OPENSSL_ERROR 90002 +#define ESLURMD_NO_AVAILABLE_JOB_STEP_SLOTS_IN_SHMEM 90003 +#define ESLURMD_NO_AVAILABLE_TASK_SLOTS_IN_SHMEM 90004 #endif diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 170493105bf..65bc31b745a 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -275,7 +275,7 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & (msg)->address ) ) == SLURM_SOCKET_ERROR ) { int local_errno = errno ; - debug ( "Error receiving msg socket: errno %i", errno ) ; + debug ( "Error receiving msg socket: errno %i", local_errno ) ; return rc ; } @@ -600,6 +600,11 @@ void slurm_unpack_slurm_addr_no_alloc ( slurm_addr * slurm_address , void ** buf _slurm_unpack_slurm_addr_no_alloc ( slurm_address , buffer , length ) ; } +void slurm_print_slurm_addr ( FILE * stream , slurm_addr * address ) +{ + _slurm_print_slurm_addr ( stream , address ) ; +} + /*******************************************/ /***** slurm send highlevel msg functions */ /*******************************************/ diff --git a/src/common/slurm_protocol_api.h b/src/common/slurm_protocol_api.h index 51c4e98d092..ee460c2b46c 100644 --- a/src/common/slurm_protocol_api.h +++ b/src/common/slurm_protocol_api.h @@ -292,6 +292,12 @@ void inline slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , c * IN buf_len - length of hostname buffer */ void inline slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ; +/* slurm_print_slurm_addr + * prints a slurm_addr to a stream + * IN stream - FILE * stream + * IN address - slurm_addr to print + */ +void inline slurm_print_slurm_addr ( FILE * stream , slurm_addr * address ) ; /**********************************************************************/ /* slurm_addr pack routines*/ diff --git a/src/common/slurm_protocol_interface.h b/src/common/slurm_protocol_interface.h index 506accf71b1..d0f5a970fca 100644 --- a/src/common/slurm_protocol_interface.h +++ b/src/common/slurm_protocol_interface.h @@ -90,6 +90,7 @@ extern void _slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , extern void _slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ; extern void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ; extern void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ; +extern void _slurm_print_slurm_addr ( FILE * stream , slurm_addr * address ) ; /*****************************/ /* slurm addr pack functions */ diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 5451b07f6c2..8eedbeb9339 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -68,20 +68,22 @@ void unpack_header ( header_t * header , char ** buffer , uint32_t * length ) void pack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) { + uint16_t tmp=SLURM_SSL_SIGNATURE_LENGTH; + assert ( msg != NULL ); pack16( msg->version, buffer, length ) ; - packmem( msg->key, SLURM_SSL_SIGNATURE_LENGTH , buffer, length ) ; + packmem_array( msg->key, tmp, buffer, length ) ; pack32( msg->task_id, buffer, length ) ; pack16( msg->type, buffer, length ) ; } void unpack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) { - uint16_t uint16_tmp; + uint16_t tmp=SLURM_SSL_SIGNATURE_LENGTH; unpack16( & msg->version, buffer, length ) ; - unpackmem( msg->key, & uint16_tmp , buffer , length ) ; + unpackmem_array( msg->key, tmp , buffer , length ) ; unpack32( & msg->task_id, buffer, length ) ; unpack16( & msg->type, buffer, length ) ; } diff --git a/src/common/slurm_protocol_pack.h b/src/common/slurm_protocol_pack.h index 1034515acec..08a09e4e9c6 100644 --- a/src/common/slurm_protocol_pack.h +++ b/src/common/slurm_protocol_pack.h @@ -122,4 +122,7 @@ int unpack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t ** msg_ptr void pack_revoke_credential_msg ( revoke_credential_msg_t* msg , void ** buffer , uint32_t * length ) ; int unpack_revoke_credential_msg ( revoke_credential_msg_t** msg , void ** buffer , uint32_t * length ) ; + +void pack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) ; +void unpack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) ; #endif diff --git a/src/common/slurm_protocol_socket_common.h b/src/common/slurm_protocol_socket_common.h index ed1d658cf58..7095092a8a9 100644 --- a/src/common/slurm_protocol_socket_common.h +++ b/src/common/slurm_protocol_socket_common.h @@ -21,7 +21,7 @@ /* LINUX SPECIFIC */ /* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */ -typedef uint32_t slurm_fd ; +typedef int32_t slurm_fd ; /* this is the slurm equivalent of the BSD sockets sockaddr */ typedef struct sockaddr_in slurm_addr ; diff --git a/src/common/slurm_protocol_socket_implementation.c b/src/common/slurm_protocol_socket_implementation.c index 86aa5ba88b0..531e5dbb6a3 100644 --- a/src/common/slurm_protocol_socket_implementation.c +++ b/src/common/slurm_protocol_socket_implementation.c @@ -9,6 +9,7 @@ #include <sys/time.h> #include <sys/types.h> #include <signal.h> +#include <stdio.h> #if HAVE_SYS_SOCKET_H # include <sys/socket.h> @@ -512,6 +513,14 @@ void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * hos strncpy ( host , host_info -> h_name , buf_len ) ; } +void _slurm_print_slurm_addr ( FILE * stream , slurm_addr * address ) +{ + fprintf ( stream , "family %x\n", ntohl ( address -> sin_family ) ) ; + fprintf ( stream , "addr %x\n", ntohl ( address -> sin_addr.s_addr ) ) ; + fprintf ( stream , "port %x\n", ntohs ( address -> sin_port ) ) ; +} + + void _slurm_pack_slurm_addr ( slurm_addr * slurm_address , void ** buffer , int * length ) { pack32 ( ntohl ( slurm_address -> sin_addr.s_addr ) , ( void ** ) buffer , length ) ; diff --git a/src/common/slurm_protocol_util.c b/src/common/slurm_protocol_util.c index 3010f0698ab..014b596b001 100644 --- a/src/common/slurm_protocol_util.c +++ b/src/common/slurm_protocol_util.c @@ -2,6 +2,7 @@ #include <assert.h> #include <src/common/slurm_protocol_defs.h> +#include <src/common/slurm_protocol_api.h> #include <src/common/slurm_protocol_common.h> #include <src/common/slurm_protocol_util.h> #include <src/common/log.h> @@ -46,3 +47,124 @@ void init_io_stream_header ( slurm_io_stream_header_t * header , char * key , ui header -> task_id = task_id ; header -> type = type ; } + +int read_io_stream_header ( slurm_io_stream_header_t * header , int fd ) +{ + char buffer[sizeof ( slurm_io_stream_header_t )] ; + char * buf_ptr = buffer ; + int buf_size = sizeof ( slurm_io_stream_header_t ) ; + int size = sizeof ( slurm_io_stream_header_t ) ; + int read_size ; + + read_size = slurm_read_stream ( fd , buffer , sizeof ( slurm_io_stream_header_t ) ) ; + unpack_io_stream_header ( header , (void ** ) & buf_ptr , & size ) ; + return read_size ; +} + +int write_io_stream_header ( slurm_io_stream_header_t * header , int fd ) +{ + char buffer[sizeof ( slurm_io_stream_header_t )] ; + char * buf_ptr = buffer ; + int buf_size = sizeof ( slurm_io_stream_header_t ) ; + int size = sizeof ( slurm_io_stream_header_t ) ; + + pack_io_stream_header ( header , (void ** ) & buf_ptr , & size ) ; + return slurm_write_stream ( fd , buffer , buf_size - size ) ; +} + +int read_io_stream_header2 ( slurm_io_stream_header_t * header , int fd ) +{ + int read_size ; + + if ( ( read_size = slurm_read_stream ( fd , ( char * ) & header -> version , sizeof ( header -> version ) ) ) != sizeof ( header -> version ) ) + { + return read_size ; + } + header -> version = ntohs ( header -> version ) ; + + if ( ( read_size = slurm_read_stream ( fd , header -> key , sizeof ( header -> key ) ) ) != sizeof ( header -> key ) ) + { + return read_size ; + } + + if ( ( read_size = slurm_read_stream ( fd , ( char * ) & header -> version , sizeof ( header -> task_id ) ) ) != sizeof ( header -> task_id ) ) + { + return read_size ; + } + header -> task_id = ntohl ( header -> task_id ) ; + + if ( ( read_size = slurm_read_stream ( fd , ( char * ) & header -> version , sizeof ( header -> type ) ) ) != sizeof ( header -> type ) ) + { + return read_size ; + } + header -> type = ntohs ( header -> type ) ; + + return SLURM_SUCCESS ; +} + +int write_io_stream_header2 ( slurm_io_stream_header_t * header , int fd ) +{ + int write_size ; + slurm_io_stream_header_t header2 = *header ; + + header -> version = htons ( header2 . version ) ; + if ( (write_size = slurm_write_stream ( fd , ( char * ) & header2 . version , sizeof ( header2 . version ) ) ) != sizeof ( header2 . version ) ) + { + return write_size; + } + + if ( ( write_size = slurm_write_stream ( fd , header2 . key , sizeof ( header2 . key ) ) ) != sizeof ( header2 . key ) ) + { + return write_size; + } + + header -> task_id = htonl ( header2 . task_id ) ; + if ( ( write_size = slurm_write_stream ( fd , ( char * ) & header2 . version , sizeof ( header2 . task_id ) ) ) != sizeof ( header2 . task_id ) ) + { + return write_size; + } + + header -> type = htons ( header2 . type ) ; + if ( ( write_size = slurm_write_stream ( fd , ( char * ) & header2 . version , sizeof ( header2 . type ) ) ) != sizeof ( header2 . type ) ) + { + return write_size; + } + + return SLURM_SUCCESS ; +} + +void slurm_print_job_credential ( FILE * stream , slurm_job_credential_t * credential ) +{ + debug3 ( "credential.job_id: %i" , credential -> job_id ); + debug3 ( "credential.user_id: %i" , credential -> user_id ); + debug3 ( "credential.node_list: %s" , credential -> node_list ); + debug3 ( "credential.expiration_time: %lu" , credential -> expiration_time ); + debug3 ( "credential.signature: %s" , credential -> signature ); +} + +void slurm_print_launch_task_msg ( launch_tasks_request_msg_t * msg ) +{ + int i ; + debug3 ( "job_id: %i", msg->job_id); + debug3 ( "job_step_id: %i", msg->job_step_id); + debug3 ( "uid: %i", msg->uid); + slurm_print_job_credential ( stderr , msg-> credential ) ; + debug3 ( "tasks_to_launch: %i", msg->tasks_to_launch); + debug3 ( "envc: %i", msg->envc); + for ( i=0 ; i < msg->envc ; i++ ) + { + debug3 ( "env[%i]: %s", i , msg->env[i] ) ; + } + debug3 ( "cwd: %s", msg->cwd); + debug3 ( "argc: %i", msg->argc); + for ( i=0 ; i < msg->argc ; i++ ) + { + debug3 ( "argv[%i]: %s", i , msg->argv[i] ) ; + } + slurm_print_slurm_addr ( stderr , & msg -> response_addr ) ; + slurm_print_slurm_addr ( stderr , & msg -> streams ) ; + for ( i=0 ; i < msg->tasks_to_launch ; i++ ) + { + debug3 ( "global_task_id[%i]: %i ", i, msg->global_task_ids[i] ); + } +} diff --git a/src/common/slurm_protocol_util.h b/src/common/slurm_protocol_util.h index 7e318e0f9d9..d93edbc109a 100644 --- a/src/common/slurm_protocol_util.h +++ b/src/common/slurm_protocol_util.h @@ -14,12 +14,21 @@ # include <inttypes.h> #endif /* HAVE_CONFIG_H */ +#include <stdio.h> + #include <src/common/slurm_protocol_defs.h> +#include <src/common/slurm_protocol_pack.h> #include <src/common/slurm_protocol_common.h> +/* slurm protocol header functions */ uint32_t check_header_version( header_t * header) ; void init_header ( header_t * header , slurm_msg_type_t msg_type , uint16_t flags ) ; +/* io stream header functions */ uint32_t check_io_stream_header_version( slurm_io_stream_header_t * header) ; void init_io_stream_header ( slurm_io_stream_header_t * header , char * key , uint32_t task_id , uint16_t type ) ; + +/* debug print methods */ +void slurm_print_job_credential ( FILE * stream , slurm_job_credential_t * credential ) ; +void slurm_print_launch_task_msg ( launch_tasks_request_msg_t * msg ) ; #endif diff --git a/src/slurmd/shmem_struct.c b/src/slurmd/shmem_struct.c index 08621083cc9..2ec42b5ae51 100644 --- a/src/slurmd/shmem_struct.c +++ b/src/slurmd/shmem_struct.c @@ -5,6 +5,7 @@ #include <sys/shm.h> #include <string.h> +#include <src/common/slurm_errno.h> #include <src/common/log.h> #include <src/common/slurm_protocol_api.h> #include <src/slurmd/shmem_struct.h> @@ -29,7 +30,7 @@ void * get_shmem ( ) key = ftok ( ".", 'a' ); assert ( key != SLURM_ERROR ); shmem_gid = shmget ( key , sizeof ( slurmd_shmem_t ) , IPC_CREAT | OCTAL_RW_PERMISSIONS ); - info ( "shmget id = %i , errno = %i ", shmem_gid, errno ); + debug ( "shmget id = %i , errno = %i ", shmem_gid, errno ); assert ( shmem_gid != SLURM_ERROR ) ; shmem_addr = shmat ( shmem_gid , NULL , 0 ) ; assert ( shmem_addr != (void * ) SLURM_ERROR ) ; @@ -92,7 +93,9 @@ job_step_t * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step } } pthread_mutex_unlock ( & shmem -> mutex ) ; - fatal ( "No available job_step slots in shmem segment"); + error ( "No available job_step slots in shmem segment"); + slurm_seterrno ( ESLURMD_NO_AVAILABLE_JOB_STEP_SLOTS_IN_SHMEM ) ; + /*return ( NULL ) ;*/ return (void * ) SLURM_ERROR ; } @@ -120,7 +123,9 @@ task_t * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step ) } } pthread_mutex_unlock ( & shmem -> mutex ) ; - fatal ( "No available task slots in shmem segment"); + error ( "No available task slots in shmem segment"); + slurm_seterrno ( ESLURMD_NO_AVAILABLE_TASK_SLOTS_IN_SHMEM ) ; + /*return ( NULL ) ;*/ return (void * ) SLURM_ERROR ; } @@ -192,7 +197,7 @@ int find_job_id_for_session ( slurmd_shmem_t * shmem , int session_id ) } } pthread_mutex_unlock ( & shmem -> mutex ) ; - info ( "No job_id found for session_id %i", session_id ); + debug ( "No job_id found for session_id %i", session_id ); return SLURM_FAILURE ; } diff --git a/src/slurmd/slurmd.c b/src/slurmd/slurmd.c index da8a975bb76..912fedb01eb 100644 --- a/src/slurmd/slurmd.c +++ b/src/slurmd/slurmd.c @@ -54,7 +54,6 @@ time_t init_time; slurmd_shmem_t * shmem_seg ; /* function prototypes */ -static void * request_thread ( void * arg ) ; static void slurmd_req ( slurm_msg_t * msg ); static int slurmd_msg_engine ( void * args ) ; inline static int send_node_registration_status_msg ( ) ; @@ -260,6 +259,15 @@ void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) int error_code; clock_t start_time; launch_tasks_request_msg_t * task_desc = ( launch_tasks_request_msg_t * ) msg->data ; + slurm_msg_t resp_msg ; + launch_tasks_response_msg_t task_resp ; + char node_name[MAX_NAME_LEN]; + + slurm_print_launch_task_msg ( task_desc ) ; + + /* get hostname */ + if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) ) + fatal ("slurmd: errno %d from gethostname", errno); start_time = clock (); info ("slurmd_req: launch tasks message received"); @@ -267,18 +275,25 @@ void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) /* do RPC call */ error_code = launch_tasks ( task_desc ); + task_resp . return_code = error_code ; + task_resp . node_name = node_name ; + + resp_msg . address = task_desc -> response_addr ; + resp_msg . data = & task_resp ; + + /* return result */ if (error_code) { error ("slurmd_req: launch tasks error %d, time=%ld", error_code, (long) (clock () - start_time)); - slurm_send_rc_msg ( msg , error_code ); + slurm_send_only_node_msg ( & resp_msg ); } else { info ("slurmd_req: launch tasks completed successfully, time=%ld", (long) (clock () - start_time)); - slurm_send_rc_msg ( msg , SLURM_SUCCESS ); + slurm_send_only_node_msg ( & resp_msg ); } } diff --git a/src/slurmd/task_mgr.c b/src/slurmd/task_mgr.c index 4c211f0662f..999d2377833 100644 --- a/src/slurmd/task_mgr.c +++ b/src/slurmd/task_mgr.c @@ -483,6 +483,9 @@ void * task_exec_thread ( void * arg ) /* create pipes to read child stdin, stdout, sterr */ init_parent_pipes ( task_start->pipes ) ; + + setup_parent_pipes ( task_start->pipes ) ; + forward_io ( arg ) ; #define FORK_ERROR -1 #define CHILD_PROCCESS 0 @@ -542,8 +545,6 @@ void * task_exec_thread ( void * arg ) default: /*parent proccess */ task_start->exec_pid = cpid ; - setup_parent_pipes ( task_start->pipes ) ; - forward_io ( arg ) ; waitpid ( cpid , NULL , 0 ) ; } return ( void * ) SLURM_SUCCESS ; @@ -658,3 +659,5 @@ int reattach_tasks_streams ( reattach_tasks_streams_msg_t * req_msg ) } return error_code ; } + + -- GitLab