diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 7784fed53b0bbc15199f136b5acb33eb84416d42..ea5375359a0dffa661e8e6cecfebd3be1a091eff 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -55,7 +55,7 @@ log_options_t log_opts = LOG_OPTS_STDERR_ONLY ; slurm_ctl_conf_t slurmctld_conf; time_t shutdown_time = (time_t)0; -static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER; int server_thread_count = 0; pid_t slurmctld_pid; @@ -67,26 +67,28 @@ void init_ctld_conf ( slurm_ctl_conf_t * build_ptr ); void parse_commandline( int argc, char* argv[], slurm_ctl_conf_t * ); void *process_rpc ( void * req ); void *slurmctld_background ( void * no_data ); -void *slurmctld_rpc_mgr ( void * no_data ); +void *slurmctld_rpc_mgr( void * no_data ); int slurm_shutdown ( void ); +void * service_connection ( void * arg ); +void usage (char *prog_name); + +inline static void slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate ) ; inline static void slurm_rpc_dump_build ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_nodes ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_partitions ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_jobs ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_cancel ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_complete ( slurm_msg_t * msg ) ; -inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ; -inline static void slurm_rpc_shutdown_controller ( slurm_msg_t * msg ) ; -inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ; +inline static void slurm_rpc_job_step_create( slurm_msg_t* msg ) ; +inline static void slurm_rpc_job_step_get_info ( slurm_msg_t * msg ) ; +inline static void slurm_rpc_job_will_run ( slurm_msg_t * msg ) ; inline static void slurm_rpc_node_registration ( slurm_msg_t * msg ) ; +inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ; +inline static void slurm_rpc_shutdown_controller ( slurm_msg_t * msg ); +inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ; inline static void slurm_rpc_update_job ( slurm_msg_t * msg ) ; inline static void slurm_rpc_update_node ( slurm_msg_t * msg ) ; inline static void slurm_rpc_update_partition ( slurm_msg_t * msg ) ; -inline static void slurm_rpc_job_will_run ( slurm_msg_t * msg ) ; -inline static void slurm_rpc_job_step_create( slurm_msg_t* msg ) ; -inline static void slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate ) ; -static void * service_connection ( void * arg ) ; -void usage (char *prog_name); typedef struct connection_arg { @@ -697,6 +699,81 @@ slurm_rpc_job_step_complete ( slurm_msg_t * msg ) schedule(); } +/* slurm_rpc_job_step_get_info - process RPC msg to get job_step information */ +void list_append_list( List to, List from ) +{ + ListIterator i_from = list_iterator_create( from ); + void *temp = NULL; + while ( (temp = list_next( i_from ) ) != NULL ) + list_append( to, temp ); + +} + +void +slurm_rpc_job_step_get_info ( slurm_msg_t * msg ) +{ + int error_code; + clock_t start_time; + List step_list = list_create( NULL ); + void* resp_buffer = NULL; + int resp_buffer_size = 0; + job_step_info_request_msg_t* request = ( job_step_info_request_msg_t * ) msg-> data ; + + start_time = clock (); + + if ( request->job_id == 0 ) + { + /* Return all steps */ + struct job_record *current_job = NULL; + ListIterator i_jobs = list_iterator_create( job_list ); + + while ( (current_job = list_next( i_jobs ) ) != NULL ) + list_append_list( step_list, current_job->step_list ); + + } + else if ( request->job_step_id == 0 ) + { + /* Return all steps for job_id */ + struct job_record* job_ptr = find_job_record( request->job_id ); + if ( job_ptr == NULL ) + error_code = ESLURM_INVALID_JOB_ID; + else + list_append_list( step_list, job_ptr->step_list ); + + } + else + { + /* Return step with give step_id/job_id */ + struct step_record* step = find_step_record( find_job_record( request->job_id ), request->job_step_id ); + if ( step == NULL ) + error_code = ESLURM_INVALID_JOB_ID; + else + list_append( step_list, step ); + } + + if ( error_code ) + { + error ("slurm_rpc_job_step_get_info error %d for job step %u.%u, time=%ld", + error_code, request->job_id, request->job_step_id, + (long) (clock () - start_time)); + slurm_send_rc_msg ( msg , error_code ); + } + else + { + slurm_msg_t response_msg ; + + pack_ctld_job_step_info_reponse_msg( step_list, &resp_buffer, &resp_buffer_size ); + response_msg . address = msg -> address ; + response_msg . msg_type = RESPONSE_JOB_STEP_INFO; + response_msg . data = resp_buffer ; + response_msg . data_size = resp_buffer_size ; + slurm_send_node_msg( msg->conn_fd , &response_msg ) ; + + } + + list_destroy( step_list ); +} + /* slurm_rpc_update_job - process RPC to update the configuration of a job (e.g. priority) */ void slurm_rpc_update_job ( slurm_msg_t * msg ) diff --git a/src/slurmctld/pack.c b/src/slurmctld/pack.c index d85c5f796beb324fcd7d5c3e59a5cf1d1dcd1b49..ba1aa4d13c582bbd9f4ebdc62f77e57e8977e4f0 100644 --- a/src/slurmctld/pack.c +++ b/src/slurmctld/pack.c @@ -38,9 +38,26 @@ #include <src/common/bitstring.h> #include <src/common/list.h> +#include <src/common/slurm_protocol_pack.h> #include <src/slurmctld/slurmctld.h> #define BUF_SIZE 1024 +#define REALLOC_MULTIPLIER 4 + + +/* buffer_realloc - reallocates the buffer if/when it gets smaller than BUF_SIZE */ +inline void buffer_realloc( void** buffer, void** current, int* size, int* len_left ) +{ + int current_offset = *current - *buffer; + + if ( *len_left < BUF_SIZE ) + { + *size += BUF_SIZE * REALLOC_MULTIPLIER ; + *len_left += BUF_SIZE * REALLOC_MULTIPLIER ; + *buffer = xrealloc( *buffer, *size ); + *current = buffer + current_offset; + } +} void @@ -66,7 +83,26 @@ pack_ctld_job_step_info( struct step_record* step, void **buf_ptr, int *buf_len } void -pack_job_step_info_reponse_msg( List steps ) +pack_ctld_job_step_info_reponse_msg( List steps, void** buffer_base, int* buffer_size ) { + ListIterator iterator = list_iterator_create( steps ); + struct step_record* current_step = NULL; + int current_size = 0; + void* current = buffer_base; + + *buffer_size = BUF_SIZE * REALLOC_MULTIPLIER; + *buffer_base = xmalloc( *buffer_size ); + + + pack32( (uint32_t)time(NULL), ¤t, ¤t_size ); /* FIXME What am I really suppose to put as the time?*/ + pack32( (uint32_t)list_count(steps), ¤t, ¤t_size ); + /* Pack the Steps */ + while( ( current_step = (struct step_record*)list_next( iterator ) ) != NULL ) + { + pack_ctld_job_step_info( current_step, ¤t, ¤t_size ); + buffer_realloc( buffer_base, ¤t, buffer_size, ¤t_size ); + } } + + diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 09a089ea87d9f1382329c87d4eb1ae8607dddcf9..cfd1dc4e47fe34b0d95d426ae2e74f5831dd71dd 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -323,6 +323,9 @@ extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int all /* job_time_limit - enforce job time limits */ extern void job_time_limit (void); +/* list_append_list - Appends the elements of from list onto the to list */ +extern void list_append_list( List to, List from ); + /* list_compare_config - compare two entry from the config list based upon weight */ extern int list_compare_config (void *config_entry1, void *config_entry2); @@ -362,6 +365,16 @@ extern void pack_all_jobs (char **buffer_ptr, int *buffer_size, */ extern void pack_all_node (char **buffer_ptr, int *buffer_size, time_t * update_time); +/* pack_ctld_job_step_info_reponse_msg - packs a job_step_info_response message + * IN - steps - a list of steps to send + * NOTE: This xmallocs memory for the buffer, so the user must use xfree on it. + */ +extern void pack_ctld_job_step_info_reponse_msg( List steps, void** buffer_base, int* buffer_size ); + +/* pack_ctld_job_step_info - packs a job_step_info_t from a step_record + */ +extern void pack_ctld_job_step_info( struct step_record* step, void **buf_ptr, int *buf_len); + /* * pack_all_part - dump all partition information for all partitions in * machine independent form (for network transmission)