diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index 095fcabc132dc25cc53cbfbcfcf705743e1ae4f5..91c8d34b4d8709aaf53cc71e0c56b0cbdbb883f7 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -319,6 +319,20 @@ void slurm_free_launch_tasks_msg ( launch_tasks_msg_t * msg ) } } +void slurm_free_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t * msg ) +{ + if ( msg ) + { + if ( msg -> credentials ) + xfree ( msg -> credentials ); + if ( msg -> streams ) + xfree ( msg -> streams ); + if ( msg -> global_task_ids ) + xfree ( msg -> global_task_ids ); + xfree ( msg ) ; + } +} + void slurm_free_kill_tasks_msg ( kill_tasks_msg_t * msg ) { if ( msg ) diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 6690a737d27878ac63721eb3915775344e9e4b57..2c1c4d3184a1b7dfae9b1fdbb0803621b3e91a00 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -139,7 +139,9 @@ typedef enum { RESPONSE_LAUNCH_TASKS, MESSAGE_TASK_EXIT, REQUEST_KILL_TASKS, - + REQUEST_REATTACH_TASKS_STREAMS, + RESPONSE_REATTACH_TASKS_STREAMS, + /*DPCS get key to sign submissions*/ REQUEST_GET_KEY = 7001, RESPONSE_GET_KEY, @@ -250,6 +252,17 @@ typedef struct launch_tasks_msg uint32_t * global_task_ids; } launch_tasks_msg_t ; +typedef struct reattach_tasks_streams_msg +{ + uint32_t job_id ; + uint32_t job_step_id ; + uint32_t uid ; + slurm_job_credential_t* credentials; + uint32_t tasks_to_reattach ; + slurm_addr * streams; + uint32_t * global_task_ids; +} reattach_tasks_streams_msg_t ; + typedef struct kill_tasks_msg { uint32_t job_id ; diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 4c376b6a2074db6b6c00ee66cece88257f644765..7286835b9d56860dd44f2c8330d841dd46d9c902 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -131,6 +131,9 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) pack_update_partition_msg ( ( update_part_msg_t * ) msg->data , ( void ** ) buffer , buf_len ) ; break ; + case REQUEST_REATTACH_TASKS_STREAMS : + pack_reattach_tasks_streams_msg ( ( reattach_tasks_streams_msg_t * ) msg->data , ( void ** ) buffer , buf_len ) ; + break ; case REQUEST_LAUNCH_TASKS : pack_launch_tasks_msg ( ( launch_tasks_msg_t * ) msg->data , ( void ** ) buffer , buf_len ) ; break ; @@ -264,6 +267,10 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len ) unpack_launch_tasks_msg ( ( launch_tasks_msg_t ** ) & ( msg->data ) , ( void ** ) buffer , buf_len ) ; break ; + case REQUEST_REATTACH_TASKS_STREAMS : + unpack_reattach_tasks_streams_msg ( ( reattach_tasks_streams_msg_t ** ) & ( msg->data ) , + ( void ** ) buffer , buf_len ) ; + break ; case REQUEST_KILL_TASKS : unpack_cancel_tasks_msg ( ( kill_tasks_msg_t ** ) & ( msg->data ) , ( void ** ) buffer , buf_len ) ; @@ -1016,9 +1023,42 @@ int unpack_return_code ( return_code_msg_t ** msg , void ** buffer , uint32_t * return 0 ; } +void pack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t * msg , void ** buffer , uint32_t * length ) +{ + pack32 ( msg -> job_id , buffer , length ) ; + pack32 ( msg -> job_step_id , buffer , length ) ; + pack32 ( msg -> uid , buffer , length ) ; + pack_job_credential ( msg -> credentials , buffer , length ) ; + pack32 ( msg -> tasks_to_reattach , buffer , length ) ; + pack_slurm_addr_array ( msg -> streams , ( uint16_t ) msg -> tasks_to_reattach, buffer , length ) ; + pack32_array ( msg -> global_task_ids , ( uint16_t ) msg -> tasks_to_reattach , buffer , length ) ; +} + +int unpack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t ** msg_ptr , void ** buffer , uint32_t * length ) +{ + uint16_t uint16_tmp; + reattach_tasks_streams_msg_t * msg ; + + msg = xmalloc ( sizeof ( job_desc_msg_t ) ) ; + if (msg == NULL) + { + *msg_ptr = NULL ; + return ENOMEM ; + } + + unpack32 ( & msg -> job_id , buffer , length ) ; + unpack32 ( & msg -> job_step_id , buffer , length ) ; + unpack32 ( & msg -> uid , buffer , length ) ; + unpack_job_credential( & msg -> credentials , buffer , length ) ; + unpack32 ( & msg -> tasks_to_reattach , buffer , length ) ; + unpack_slurm_addr_array ( & msg -> streams , & uint16_tmp , buffer , length ) ; + unpack32_array ( & msg -> global_task_ids , & uint16_tmp , buffer , length ) ; + *msg_ptr = msg ; + return 0 ; +} + void pack_launch_tasks_msg ( launch_tasks_msg_t * msg , void ** buffer , uint32_t * length ) { - pack32 ( msg -> job_id , buffer , length ) ; pack32 ( msg -> job_step_id , buffer , length ) ; pack32 ( msg -> uid , buffer , length ) ; diff --git a/src/common/slurm_protocol_pack.h b/src/common/slurm_protocol_pack.h index bd91a49c5bdfebfe93287ed10ab81381428fe6ef..2020e3cc0f745f896626ecc2376fafd424015d44 100644 --- a/src/common/slurm_protocol_pack.h +++ b/src/common/slurm_protocol_pack.h @@ -92,4 +92,7 @@ int unpack_kill_tasks_msg ( kill_tasks_msg_t ** msg_ptr , void ** buffer , uint3 void pack_slurm_addr_array ( slurm_addr * slurm_address , uint16_t size_val, void ** buffer , int * length ); void unpack_slurm_addr_array ( slurm_addr ** slurm_address , uint16_t * size_val , void ** buffer , int * length ); + +void pack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t * msg , void ** buffer , uint32_t * length ) ; +int unpack_reattach_tasks_streams_msg ( reattach_tasks_streams_msg_t ** msg_ptr , void ** buffer , uint32_t * length ) ; #endif