Skip to content
Snippets Groups Projects
Commit 4faba127 authored by tewk's avatar tewk
Browse files

Added packmem_array function to pack

fixed launch_task_response msg
added slurm_protocol and slurmd errnos to slurm_errno.h
added print_slurm_addr to src/common/slurm_protocol_api.c
added print_slurm_credential to src/common/slurm_protocol_util.c
cleaned up debug printing and error reporting for src/slurmd/shmem_struct.c
more cleanup
etc
parent 5e28c5a9
No related branches found
No related tags found
No related merge requests found
...@@ -334,3 +334,34 @@ _unpackstrarray (char ***valp, uint16_t *size_valp, void **bufp, int *lenp) ...@@ -334,3 +334,34 @@ _unpackstrarray (char ***valp, uint16_t *size_valp, void **bufp, int *lenp)
*valp = NULL; *valp = NULL;
} }
/*
* Given a pointer to memory (valp) and a size (size_val),
* store the data at valp. Advance bufp and decrement lenp by (size of size_val)
*/
void
_packmem_array(char *valp, uint16_t size_val, void **bufp, int *lenp)
{
memcpy(*bufp, valp, size_val);
(size_t)*bufp += size_val;
*lenp -= size_val;
}
/*
* Given 'bufp' pointing to a network byte order 32-bit integer
* (size) and an arbitrary data string, return a pointer to the
* data string in 'valp'. Also return the sizes of 'valp' in bytes.
* Advance bufp and decrement lenp by 4 bytes (size of memory
* size records) plus the actual buffer size.
*/
void
_unpackmem_array(char *valp, uint16_t size_valp, void **bufp, int *lenp)
{
if (size_valp > 0) {
memcpy ( valp, *bufp, size_valp);
(size_t)*bufp += size_valp;
*lenp -= size_valp;
}
else
*valp = 0 ;
}
...@@ -34,6 +34,9 @@ void _unpack32array( uint32_t **valp, uint16_t* size_val, void **bufp, int *lenp ...@@ -34,6 +34,9 @@ void _unpack32array( uint32_t **valp, uint16_t* size_val, void **bufp, int *lenp
void _packstrarray(char **valp, uint16_t size_val, void **bufp, int *lenp); void _packstrarray(char **valp, uint16_t size_val, void **bufp, int *lenp);
void _unpackstrarray(char ***valp, uint16_t* size_val, void **bufp, int *lenp); void _unpackstrarray(char ***valp, uint16_t* size_val, void **bufp, int *lenp);
void _packmem_array(char *valp, uint16_t size_val, void **bufp, int *lenp);
void _unpackmem_array(char *valp, uint16_t size_valp, void **bufp, int *lenp);
void _packmem(char *valp, uint16_t size_val, void **bufp, int *lenp); void _packmem(char *valp, uint16_t size_val, void **bufp, int *lenp);
void _unpackmem(char *valp, uint16_t *size_valp, void **bufp, int *lenp); void _unpackmem(char *valp, uint16_t *size_valp, void **bufp, int *lenp);
void _unpackmem_ptr(char **valp, uint16_t *size_valp, void **bufp, int *lenp); void _unpackmem_ptr(char **valp, uint16_t *size_valp, void **bufp, int *lenp);
...@@ -194,5 +197,22 @@ void _unpackmem_malloc(char **valp, uint16_t *size_valp, void **bufp, int *lenp) ...@@ -194,5 +197,22 @@ void _unpackmem_malloc(char **valp, uint16_t *size_valp, void **bufp, int *lenp)
_unpackmem_xmalloc(valp,(uint16_t *)size_valp,bufp,lenp);\ _unpackmem_xmalloc(valp,(uint16_t *)size_valp,bufp,lenp);\
} while (0) } while (0)
#define packmem_array(valp,size,bufp,lenp) do { \
assert(size == 0 || valp != NULL); \
assert((bufp) != NULL && *(bufp) != NULL); \
assert((lenp) != NULL); \
assert(*(lenp) >= (size)); \
_packmem_array(valp,(uint16_t)size,bufp,lenp); \
} while (0)
#define unpackmem_array(valp,size,bufp,lenp) do { \
assert(valp != NULL); \
assert(sizeof(size) == sizeof(uint16_t)); \
assert((bufp) != NULL && *(bufp) != NULL); \
assert((lenp) != NULL); \
assert(*(lenp) >= sizeof(uint16_t)); \
_unpackmem_array(valp,size,bufp,lenp); \
} while (0)
#endif /* _PACK_INCLUDED */ #endif /* _PACK_INCLUDED */
...@@ -8,4 +8,6 @@ ...@@ -8,4 +8,6 @@
#define ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN 90000 #define ESLRUMD_PIPE_ERROR_ON_TASK_SPAWN 90000
#define ESLURMD_KILL_TASK_FAILED 90001 #define ESLURMD_KILL_TASK_FAILED 90001
#define ESLURMD_OPENSSL_ERROR 90002 #define ESLURMD_OPENSSL_ERROR 90002
#define ESLURMD_NO_AVAILABLE_JOB_STEP_SLOTS_IN_SHMEM 90003
#define ESLURMD_NO_AVAILABLE_TASK_SLOTS_IN_SHMEM 90004
#endif #endif
...@@ -275,7 +275,7 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg ) ...@@ -275,7 +275,7 @@ int slurm_receive_msg ( slurm_fd open_fd , slurm_msg_t * msg )
if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & (msg)->address ) ) == SLURM_SOCKET_ERROR ) if ( ( rc = _slurm_msg_recvfrom ( open_fd , buffer , receive_len, SLURM_PROTOCOL_NO_SEND_RECV_FLAGS , & (msg)->address ) ) == SLURM_SOCKET_ERROR )
{ {
int local_errno = errno ; int local_errno = errno ;
debug ( "Error receiving msg socket: errno %i", errno ) ; debug ( "Error receiving msg socket: errno %i", local_errno ) ;
return rc ; return rc ;
} }
...@@ -600,6 +600,11 @@ void slurm_unpack_slurm_addr_no_alloc ( slurm_addr * slurm_address , void ** buf ...@@ -600,6 +600,11 @@ void slurm_unpack_slurm_addr_no_alloc ( slurm_addr * slurm_address , void ** buf
_slurm_unpack_slurm_addr_no_alloc ( slurm_address , buffer , length ) ; _slurm_unpack_slurm_addr_no_alloc ( slurm_address , buffer , length ) ;
} }
void slurm_print_slurm_addr ( FILE * stream , slurm_addr * address )
{
_slurm_print_slurm_addr ( stream , address ) ;
}
/*******************************************/ /*******************************************/
/***** slurm send highlevel msg functions */ /***** slurm send highlevel msg functions */
/*******************************************/ /*******************************************/
......
...@@ -292,6 +292,12 @@ void inline slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , c ...@@ -292,6 +292,12 @@ void inline slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , c
* IN buf_len - length of hostname buffer * IN buf_len - length of hostname buffer
*/ */
void inline slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ; void inline slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ;
/* slurm_print_slurm_addr
* prints a slurm_addr to a stream
* IN stream - FILE * stream
* IN address - slurm_addr to print
*/
void inline slurm_print_slurm_addr ( FILE * stream , slurm_addr * address ) ;
/**********************************************************************/ /**********************************************************************/
/* slurm_addr pack routines*/ /* slurm_addr pack routines*/
......
...@@ -90,6 +90,7 @@ extern void _slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port , ...@@ -90,6 +90,7 @@ extern void _slurm_set_addr_uint ( slurm_addr * slurm_address , uint16_t port ,
extern void _slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ; extern void _slurm_set_addr ( slurm_addr * slurm_address , uint16_t port , char * host ) ;
extern void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ; extern void _slurm_set_addr_char ( slurm_addr * slurm_address , uint16_t port , char * host ) ;
extern void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ; extern void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * host , uint32_t buf_len ) ;
extern void _slurm_print_slurm_addr ( FILE * stream , slurm_addr * address ) ;
/*****************************/ /*****************************/
/* slurm addr pack functions */ /* slurm addr pack functions */
......
...@@ -68,20 +68,22 @@ void unpack_header ( header_t * header , char ** buffer , uint32_t * length ) ...@@ -68,20 +68,22 @@ void unpack_header ( header_t * header , char ** buffer , uint32_t * length )
void pack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) void pack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length )
{ {
uint16_t tmp=SLURM_SSL_SIGNATURE_LENGTH;
assert ( msg != NULL ); assert ( msg != NULL );
pack16( msg->version, buffer, length ) ; pack16( msg->version, buffer, length ) ;
packmem( msg->key, SLURM_SSL_SIGNATURE_LENGTH , buffer, length ) ; packmem_array( msg->key, tmp, buffer, length ) ;
pack32( msg->task_id, buffer, length ) ; pack32( msg->task_id, buffer, length ) ;
pack16( msg->type, buffer, length ) ; pack16( msg->type, buffer, length ) ;
} }
void unpack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) void unpack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length )
{ {
uint16_t uint16_tmp; uint16_t tmp=SLURM_SSL_SIGNATURE_LENGTH;
unpack16( & msg->version, buffer, length ) ; unpack16( & msg->version, buffer, length ) ;
unpackmem( msg->key, & uint16_tmp , buffer , length ) ; unpackmem_array( msg->key, tmp , buffer , length ) ;
unpack32( & msg->task_id, buffer, length ) ; unpack32( & msg->task_id, buffer, length ) ;
unpack16( & msg->type, buffer, length ) ; unpack16( & msg->type, buffer, length ) ;
} }
......
...@@ -122,4 +122,7 @@ int unpack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t ** msg_ptr ...@@ -122,4 +122,7 @@ int unpack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t ** msg_ptr
void pack_revoke_credential_msg ( revoke_credential_msg_t* msg , void ** buffer , uint32_t * length ) ; void pack_revoke_credential_msg ( revoke_credential_msg_t* msg , void ** buffer , uint32_t * length ) ;
int unpack_revoke_credential_msg ( revoke_credential_msg_t** msg , void ** buffer , uint32_t * length ) ; int unpack_revoke_credential_msg ( revoke_credential_msg_t** msg , void ** buffer , uint32_t * length ) ;
void pack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) ;
void unpack_io_stream_header ( slurm_io_stream_header_t * msg , void ** buffer , uint32_t * length ) ;
#endif #endif
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
/* LINUX SPECIFIC */ /* LINUX SPECIFIC */
/* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */ /* this is the slurm equivalent of the operating system file descriptor, which in linux is just an int */
typedef uint32_t slurm_fd ; typedef int32_t slurm_fd ;
/* this is the slurm equivalent of the BSD sockets sockaddr */ /* this is the slurm equivalent of the BSD sockets sockaddr */
typedef struct sockaddr_in slurm_addr ; typedef struct sockaddr_in slurm_addr ;
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <signal.h> #include <signal.h>
#include <stdio.h>
#if HAVE_SYS_SOCKET_H #if HAVE_SYS_SOCKET_H
# include <sys/socket.h> # include <sys/socket.h>
...@@ -512,6 +513,14 @@ void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * hos ...@@ -512,6 +513,14 @@ void _slurm_get_addr ( slurm_addr * slurm_address , uint16_t * port , char * hos
strncpy ( host , host_info -> h_name , buf_len ) ; strncpy ( host , host_info -> h_name , buf_len ) ;
} }
void _slurm_print_slurm_addr ( FILE * stream , slurm_addr * address )
{
fprintf ( stream , "family %x\n", ntohl ( address -> sin_family ) ) ;
fprintf ( stream , "addr %x\n", ntohl ( address -> sin_addr.s_addr ) ) ;
fprintf ( stream , "port %x\n", ntohs ( address -> sin_port ) ) ;
}
void _slurm_pack_slurm_addr ( slurm_addr * slurm_address , void ** buffer , int * length ) void _slurm_pack_slurm_addr ( slurm_addr * slurm_address , void ** buffer , int * length )
{ {
pack32 ( ntohl ( slurm_address -> sin_addr.s_addr ) , ( void ** ) buffer , length ) ; pack32 ( ntohl ( slurm_address -> sin_addr.s_addr ) , ( void ** ) buffer , length ) ;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include <assert.h> #include <assert.h>
#include <src/common/slurm_protocol_defs.h> #include <src/common/slurm_protocol_defs.h>
#include <src/common/slurm_protocol_api.h>
#include <src/common/slurm_protocol_common.h> #include <src/common/slurm_protocol_common.h>
#include <src/common/slurm_protocol_util.h> #include <src/common/slurm_protocol_util.h>
#include <src/common/log.h> #include <src/common/log.h>
...@@ -46,3 +47,124 @@ void init_io_stream_header ( slurm_io_stream_header_t * header , char * key , ui ...@@ -46,3 +47,124 @@ void init_io_stream_header ( slurm_io_stream_header_t * header , char * key , ui
header -> task_id = task_id ; header -> task_id = task_id ;
header -> type = type ; header -> type = type ;
} }
int read_io_stream_header ( slurm_io_stream_header_t * header , int fd )
{
char buffer[sizeof ( slurm_io_stream_header_t )] ;
char * buf_ptr = buffer ;
int buf_size = sizeof ( slurm_io_stream_header_t ) ;
int size = sizeof ( slurm_io_stream_header_t ) ;
int read_size ;
read_size = slurm_read_stream ( fd , buffer , sizeof ( slurm_io_stream_header_t ) ) ;
unpack_io_stream_header ( header , (void ** ) & buf_ptr , & size ) ;
return read_size ;
}
int write_io_stream_header ( slurm_io_stream_header_t * header , int fd )
{
char buffer[sizeof ( slurm_io_stream_header_t )] ;
char * buf_ptr = buffer ;
int buf_size = sizeof ( slurm_io_stream_header_t ) ;
int size = sizeof ( slurm_io_stream_header_t ) ;
pack_io_stream_header ( header , (void ** ) & buf_ptr , & size ) ;
return slurm_write_stream ( fd , buffer , buf_size - size ) ;
}
int read_io_stream_header2 ( slurm_io_stream_header_t * header , int fd )
{
int read_size ;
if ( ( read_size = slurm_read_stream ( fd , ( char * ) & header -> version , sizeof ( header -> version ) ) ) != sizeof ( header -> version ) )
{
return read_size ;
}
header -> version = ntohs ( header -> version ) ;
if ( ( read_size = slurm_read_stream ( fd , header -> key , sizeof ( header -> key ) ) ) != sizeof ( header -> key ) )
{
return read_size ;
}
if ( ( read_size = slurm_read_stream ( fd , ( char * ) & header -> version , sizeof ( header -> task_id ) ) ) != sizeof ( header -> task_id ) )
{
return read_size ;
}
header -> task_id = ntohl ( header -> task_id ) ;
if ( ( read_size = slurm_read_stream ( fd , ( char * ) & header -> version , sizeof ( header -> type ) ) ) != sizeof ( header -> type ) )
{
return read_size ;
}
header -> type = ntohs ( header -> type ) ;
return SLURM_SUCCESS ;
}
int write_io_stream_header2 ( slurm_io_stream_header_t * header , int fd )
{
int write_size ;
slurm_io_stream_header_t header2 = *header ;
header -> version = htons ( header2 . version ) ;
if ( (write_size = slurm_write_stream ( fd , ( char * ) & header2 . version , sizeof ( header2 . version ) ) ) != sizeof ( header2 . version ) )
{
return write_size;
}
if ( ( write_size = slurm_write_stream ( fd , header2 . key , sizeof ( header2 . key ) ) ) != sizeof ( header2 . key ) )
{
return write_size;
}
header -> task_id = htonl ( header2 . task_id ) ;
if ( ( write_size = slurm_write_stream ( fd , ( char * ) & header2 . version , sizeof ( header2 . task_id ) ) ) != sizeof ( header2 . task_id ) )
{
return write_size;
}
header -> type = htons ( header2 . type ) ;
if ( ( write_size = slurm_write_stream ( fd , ( char * ) & header2 . version , sizeof ( header2 . type ) ) ) != sizeof ( header2 . type ) )
{
return write_size;
}
return SLURM_SUCCESS ;
}
void slurm_print_job_credential ( FILE * stream , slurm_job_credential_t * credential )
{
debug3 ( "credential.job_id: %i" , credential -> job_id );
debug3 ( "credential.user_id: %i" , credential -> user_id );
debug3 ( "credential.node_list: %s" , credential -> node_list );
debug3 ( "credential.expiration_time: %lu" , credential -> expiration_time );
debug3 ( "credential.signature: %s" , credential -> signature );
}
void slurm_print_launch_task_msg ( launch_tasks_request_msg_t * msg )
{
int i ;
debug3 ( "job_id: %i", msg->job_id);
debug3 ( "job_step_id: %i", msg->job_step_id);
debug3 ( "uid: %i", msg->uid);
slurm_print_job_credential ( stderr , msg-> credential ) ;
debug3 ( "tasks_to_launch: %i", msg->tasks_to_launch);
debug3 ( "envc: %i", msg->envc);
for ( i=0 ; i < msg->envc ; i++ )
{
debug3 ( "env[%i]: %s", i , msg->env[i] ) ;
}
debug3 ( "cwd: %s", msg->cwd);
debug3 ( "argc: %i", msg->argc);
for ( i=0 ; i < msg->argc ; i++ )
{
debug3 ( "argv[%i]: %s", i , msg->argv[i] ) ;
}
slurm_print_slurm_addr ( stderr , & msg -> response_addr ) ;
slurm_print_slurm_addr ( stderr , & msg -> streams ) ;
for ( i=0 ; i < msg->tasks_to_launch ; i++ )
{
debug3 ( "global_task_id[%i]: %i ", i, msg->global_task_ids[i] );
}
}
...@@ -14,12 +14,21 @@ ...@@ -14,12 +14,21 @@
# include <inttypes.h> # include <inttypes.h>
#endif /* HAVE_CONFIG_H */ #endif /* HAVE_CONFIG_H */
#include <stdio.h>
#include <src/common/slurm_protocol_defs.h> #include <src/common/slurm_protocol_defs.h>
#include <src/common/slurm_protocol_pack.h>
#include <src/common/slurm_protocol_common.h> #include <src/common/slurm_protocol_common.h>
/* slurm protocol header functions */
uint32_t check_header_version( header_t * header) ; uint32_t check_header_version( header_t * header) ;
void init_header ( header_t * header , slurm_msg_type_t msg_type , uint16_t flags ) ; void init_header ( header_t * header , slurm_msg_type_t msg_type , uint16_t flags ) ;
/* io stream header functions */
uint32_t check_io_stream_header_version( slurm_io_stream_header_t * header) ; uint32_t check_io_stream_header_version( slurm_io_stream_header_t * header) ;
void init_io_stream_header ( slurm_io_stream_header_t * header , char * key , uint32_t task_id , uint16_t type ) ; void init_io_stream_header ( slurm_io_stream_header_t * header , char * key , uint32_t task_id , uint16_t type ) ;
/* debug print methods */
void slurm_print_job_credential ( FILE * stream , slurm_job_credential_t * credential ) ;
void slurm_print_launch_task_msg ( launch_tasks_request_msg_t * msg ) ;
#endif #endif
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <sys/shm.h> #include <sys/shm.h>
#include <string.h> #include <string.h>
#include <src/common/slurm_errno.h>
#include <src/common/log.h> #include <src/common/log.h>
#include <src/common/slurm_protocol_api.h> #include <src/common/slurm_protocol_api.h>
#include <src/slurmd/shmem_struct.h> #include <src/slurmd/shmem_struct.h>
...@@ -29,7 +30,7 @@ void * get_shmem ( ) ...@@ -29,7 +30,7 @@ void * get_shmem ( )
key = ftok ( ".", 'a' ); key = ftok ( ".", 'a' );
assert ( key != SLURM_ERROR ); assert ( key != SLURM_ERROR );
shmem_gid = shmget ( key , sizeof ( slurmd_shmem_t ) , IPC_CREAT | OCTAL_RW_PERMISSIONS ); shmem_gid = shmget ( key , sizeof ( slurmd_shmem_t ) , IPC_CREAT | OCTAL_RW_PERMISSIONS );
info ( "shmget id = %i , errno = %i ", shmem_gid, errno ); debug ( "shmget id = %i , errno = %i ", shmem_gid, errno );
assert ( shmem_gid != SLURM_ERROR ) ; assert ( shmem_gid != SLURM_ERROR ) ;
shmem_addr = shmat ( shmem_gid , NULL , 0 ) ; shmem_addr = shmat ( shmem_gid , NULL , 0 ) ;
assert ( shmem_addr != (void * ) SLURM_ERROR ) ; assert ( shmem_addr != (void * ) SLURM_ERROR ) ;
...@@ -92,7 +93,9 @@ job_step_t * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step ...@@ -92,7 +93,9 @@ job_step_t * alloc_job_step ( slurmd_shmem_t * shmem , int job_id , int job_step
} }
} }
pthread_mutex_unlock ( & shmem -> mutex ) ; pthread_mutex_unlock ( & shmem -> mutex ) ;
fatal ( "No available job_step slots in shmem segment"); error ( "No available job_step slots in shmem segment");
slurm_seterrno ( ESLURMD_NO_AVAILABLE_JOB_STEP_SLOTS_IN_SHMEM ) ;
/*return ( NULL ) ;*/
return (void * ) SLURM_ERROR ; return (void * ) SLURM_ERROR ;
} }
...@@ -120,7 +123,9 @@ task_t * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step ) ...@@ -120,7 +123,9 @@ task_t * alloc_task ( slurmd_shmem_t * shmem , job_step_t * job_step )
} }
} }
pthread_mutex_unlock ( & shmem -> mutex ) ; pthread_mutex_unlock ( & shmem -> mutex ) ;
fatal ( "No available task slots in shmem segment"); error ( "No available task slots in shmem segment");
slurm_seterrno ( ESLURMD_NO_AVAILABLE_TASK_SLOTS_IN_SHMEM ) ;
/*return ( NULL ) ;*/
return (void * ) SLURM_ERROR ; return (void * ) SLURM_ERROR ;
} }
...@@ -192,7 +197,7 @@ int find_job_id_for_session ( slurmd_shmem_t * shmem , int session_id ) ...@@ -192,7 +197,7 @@ int find_job_id_for_session ( slurmd_shmem_t * shmem , int session_id )
} }
} }
pthread_mutex_unlock ( & shmem -> mutex ) ; pthread_mutex_unlock ( & shmem -> mutex ) ;
info ( "No job_id found for session_id %i", session_id ); debug ( "No job_id found for session_id %i", session_id );
return SLURM_FAILURE ; return SLURM_FAILURE ;
} }
......
...@@ -54,7 +54,6 @@ time_t init_time; ...@@ -54,7 +54,6 @@ time_t init_time;
slurmd_shmem_t * shmem_seg ; slurmd_shmem_t * shmem_seg ;
/* function prototypes */ /* function prototypes */
static void * request_thread ( void * arg ) ;
static void slurmd_req ( slurm_msg_t * msg ); static void slurmd_req ( slurm_msg_t * msg );
static int slurmd_msg_engine ( void * args ) ; static int slurmd_msg_engine ( void * args ) ;
inline static int send_node_registration_status_msg ( ) ; inline static int send_node_registration_status_msg ( ) ;
...@@ -260,6 +259,15 @@ void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) ...@@ -260,6 +259,15 @@ void slurm_rpc_launch_tasks ( slurm_msg_t * msg )
int error_code; int error_code;
clock_t start_time; clock_t start_time;
launch_tasks_request_msg_t * task_desc = ( launch_tasks_request_msg_t * ) msg->data ; launch_tasks_request_msg_t * task_desc = ( launch_tasks_request_msg_t * ) msg->data ;
slurm_msg_t resp_msg ;
launch_tasks_response_msg_t task_resp ;
char node_name[MAX_NAME_LEN];
slurm_print_launch_task_msg ( task_desc ) ;
/* get hostname */
if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) )
fatal ("slurmd: errno %d from gethostname", errno);
start_time = clock (); start_time = clock ();
info ("slurmd_req: launch tasks message received"); info ("slurmd_req: launch tasks message received");
...@@ -267,18 +275,25 @@ void slurm_rpc_launch_tasks ( slurm_msg_t * msg ) ...@@ -267,18 +275,25 @@ void slurm_rpc_launch_tasks ( slurm_msg_t * msg )
/* do RPC call */ /* do RPC call */
error_code = launch_tasks ( task_desc ); error_code = launch_tasks ( task_desc );
task_resp . return_code = error_code ;
task_resp . node_name = node_name ;
resp_msg . address = task_desc -> response_addr ;
resp_msg . data = & task_resp ;
/* return result */ /* return result */
if (error_code) if (error_code)
{ {
error ("slurmd_req: launch tasks error %d, time=%ld", error ("slurmd_req: launch tasks error %d, time=%ld",
error_code, (long) (clock () - start_time)); error_code, (long) (clock () - start_time));
slurm_send_rc_msg ( msg , error_code ); slurm_send_only_node_msg ( & resp_msg );
} }
else else
{ {
info ("slurmd_req: launch tasks completed successfully, time=%ld", info ("slurmd_req: launch tasks completed successfully, time=%ld",
(long) (clock () - start_time)); (long) (clock () - start_time));
slurm_send_rc_msg ( msg , SLURM_SUCCESS ); slurm_send_only_node_msg ( & resp_msg );
} }
} }
......
...@@ -483,6 +483,9 @@ void * task_exec_thread ( void * arg ) ...@@ -483,6 +483,9 @@ void * task_exec_thread ( void * arg )
/* create pipes to read child stdin, stdout, sterr */ /* create pipes to read child stdin, stdout, sterr */
init_parent_pipes ( task_start->pipes ) ; init_parent_pipes ( task_start->pipes ) ;
setup_parent_pipes ( task_start->pipes ) ;
forward_io ( arg ) ;
#define FORK_ERROR -1 #define FORK_ERROR -1
#define CHILD_PROCCESS 0 #define CHILD_PROCCESS 0
...@@ -542,8 +545,6 @@ void * task_exec_thread ( void * arg ) ...@@ -542,8 +545,6 @@ void * task_exec_thread ( void * arg )
default: /*parent proccess */ default: /*parent proccess */
task_start->exec_pid = cpid ; task_start->exec_pid = cpid ;
setup_parent_pipes ( task_start->pipes ) ;
forward_io ( arg ) ;
waitpid ( cpid , NULL , 0 ) ; waitpid ( cpid , NULL , 0 ) ;
} }
return ( void * ) SLURM_SUCCESS ; return ( void * ) SLURM_SUCCESS ;
...@@ -658,3 +659,5 @@ int reattach_tasks_streams ( reattach_tasks_streams_msg_t * req_msg ) ...@@ -658,3 +659,5 @@ int reattach_tasks_streams ( reattach_tasks_streams_msg_t * req_msg )
} }
return error_code ; return error_code ;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment