Skip to content
Snippets Groups Projects
Commit 9a540d61 authored by jce's avatar jce
Browse files

1- finished a pack method

2- finished adding the job_step message stuff...
parent 80b327fb
No related branches found
No related tags found
No related merge requests found
...@@ -55,7 +55,7 @@ ...@@ -55,7 +55,7 @@
log_options_t log_opts = LOG_OPTS_STDERR_ONLY ; log_options_t log_opts = LOG_OPTS_STDERR_ONLY ;
slurm_ctl_conf_t slurmctld_conf; slurm_ctl_conf_t slurmctld_conf;
time_t shutdown_time = (time_t)0; time_t shutdown_time = (time_t)0;
static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t thread_count_lock = PTHREAD_MUTEX_INITIALIZER;
int server_thread_count = 0; int server_thread_count = 0;
pid_t slurmctld_pid; pid_t slurmctld_pid;
...@@ -67,26 +67,28 @@ void init_ctld_conf ( slurm_ctl_conf_t * build_ptr ); ...@@ -67,26 +67,28 @@ void init_ctld_conf ( slurm_ctl_conf_t * build_ptr );
void parse_commandline( int argc, char* argv[], slurm_ctl_conf_t * ); void parse_commandline( int argc, char* argv[], slurm_ctl_conf_t * );
void *process_rpc ( void * req ); void *process_rpc ( void * req );
void *slurmctld_background ( void * no_data ); void *slurmctld_background ( void * no_data );
void *slurmctld_rpc_mgr ( void * no_data ); void *slurmctld_rpc_mgr( void * no_data );
int slurm_shutdown ( void ); int slurm_shutdown ( void );
void * service_connection ( void * arg );
void usage (char *prog_name);
inline static void slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate ) ;
inline static void slurm_rpc_dump_build ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_build ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_nodes ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_nodes ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_partitions ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_partitions ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_jobs ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_jobs ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_step_cancel ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_cancel ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_step_complete ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_complete ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_create( slurm_msg_t* msg ) ;
inline static void slurm_rpc_shutdown_controller ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_get_info ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_will_run ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_node_registration ( slurm_msg_t * msg ) ; inline static void slurm_rpc_node_registration ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_shutdown_controller ( slurm_msg_t * msg );
inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_update_job ( slurm_msg_t * msg ) ; inline static void slurm_rpc_update_job ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_update_node ( slurm_msg_t * msg ) ; inline static void slurm_rpc_update_node ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_update_partition ( slurm_msg_t * msg ) ; inline static void slurm_rpc_update_partition ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_will_run ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_step_create( slurm_msg_t* msg ) ;
inline static void slurm_rpc_allocate_resources ( slurm_msg_t * msg , uint8_t immediate ) ;
static void * service_connection ( void * arg ) ;
void usage (char *prog_name);
typedef struct connection_arg typedef struct connection_arg
{ {
...@@ -697,6 +699,81 @@ slurm_rpc_job_step_complete ( slurm_msg_t * msg ) ...@@ -697,6 +699,81 @@ slurm_rpc_job_step_complete ( slurm_msg_t * msg )
schedule(); schedule();
} }
/* slurm_rpc_job_step_get_info - process RPC msg to get job_step information */
void list_append_list( List to, List from )
{
ListIterator i_from = list_iterator_create( from );
void *temp = NULL;
while ( (temp = list_next( i_from ) ) != NULL )
list_append( to, temp );
}
void
slurm_rpc_job_step_get_info ( slurm_msg_t * msg )
{
int error_code;
clock_t start_time;
List step_list = list_create( NULL );
void* resp_buffer = NULL;
int resp_buffer_size = 0;
job_step_info_request_msg_t* request = ( job_step_info_request_msg_t * ) msg-> data ;
start_time = clock ();
if ( request->job_id == 0 )
{
/* Return all steps */
struct job_record *current_job = NULL;
ListIterator i_jobs = list_iterator_create( job_list );
while ( (current_job = list_next( i_jobs ) ) != NULL )
list_append_list( step_list, current_job->step_list );
}
else if ( request->job_step_id == 0 )
{
/* Return all steps for job_id */
struct job_record* job_ptr = find_job_record( request->job_id );
if ( job_ptr == NULL )
error_code = ESLURM_INVALID_JOB_ID;
else
list_append_list( step_list, job_ptr->step_list );
}
else
{
/* Return step with give step_id/job_id */
struct step_record* step = find_step_record( find_job_record( request->job_id ), request->job_step_id );
if ( step == NULL )
error_code = ESLURM_INVALID_JOB_ID;
else
list_append( step_list, step );
}
if ( error_code )
{
error ("slurm_rpc_job_step_get_info error %d for job step %u.%u, time=%ld",
error_code, request->job_id, request->job_step_id,
(long) (clock () - start_time));
slurm_send_rc_msg ( msg , error_code );
}
else
{
slurm_msg_t response_msg ;
pack_ctld_job_step_info_reponse_msg( step_list, &resp_buffer, &resp_buffer_size );
response_msg . address = msg -> address ;
response_msg . msg_type = RESPONSE_JOB_STEP_INFO;
response_msg . data = resp_buffer ;
response_msg . data_size = resp_buffer_size ;
slurm_send_node_msg( msg->conn_fd , &response_msg ) ;
}
list_destroy( step_list );
}
/* slurm_rpc_update_job - process RPC to update the configuration of a job (e.g. priority) */ /* slurm_rpc_update_job - process RPC to update the configuration of a job (e.g. priority) */
void void
slurm_rpc_update_job ( slurm_msg_t * msg ) slurm_rpc_update_job ( slurm_msg_t * msg )
......
...@@ -38,9 +38,26 @@ ...@@ -38,9 +38,26 @@
#include <src/common/bitstring.h> #include <src/common/bitstring.h>
#include <src/common/list.h> #include <src/common/list.h>
#include <src/common/slurm_protocol_pack.h>
#include <src/slurmctld/slurmctld.h> #include <src/slurmctld/slurmctld.h>
#define BUF_SIZE 1024 #define BUF_SIZE 1024
#define REALLOC_MULTIPLIER 4
/* buffer_realloc - reallocates the buffer if/when it gets smaller than BUF_SIZE */
inline void buffer_realloc( void** buffer, void** current, int* size, int* len_left )
{
int current_offset = *current - *buffer;
if ( *len_left < BUF_SIZE )
{
*size += BUF_SIZE * REALLOC_MULTIPLIER ;
*len_left += BUF_SIZE * REALLOC_MULTIPLIER ;
*buffer = xrealloc( *buffer, *size );
*current = buffer + current_offset;
}
}
void void
...@@ -66,7 +83,26 @@ pack_ctld_job_step_info( struct step_record* step, void **buf_ptr, int *buf_len ...@@ -66,7 +83,26 @@ pack_ctld_job_step_info( struct step_record* step, void **buf_ptr, int *buf_len
} }
void void
pack_job_step_info_reponse_msg( List steps ) pack_ctld_job_step_info_reponse_msg( List steps, void** buffer_base, int* buffer_size )
{ {
ListIterator iterator = list_iterator_create( steps );
struct step_record* current_step = NULL;
int current_size = 0;
void* current = buffer_base;
*buffer_size = BUF_SIZE * REALLOC_MULTIPLIER;
*buffer_base = xmalloc( *buffer_size );
pack32( (uint32_t)time(NULL), &current, &current_size ); /* FIXME What am I really suppose to put as the time?*/
pack32( (uint32_t)list_count(steps), &current, &current_size );
/* Pack the Steps */
while( ( current_step = (struct step_record*)list_next( iterator ) ) != NULL )
{
pack_ctld_job_step_info( current_step, &current, &current_size );
buffer_realloc( buffer_base, &current, buffer_size, &current_size );
}
} }
...@@ -323,6 +323,9 @@ extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int all ...@@ -323,6 +323,9 @@ extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int all
/* job_time_limit - enforce job time limits */ /* job_time_limit - enforce job time limits */
extern void job_time_limit (void); extern void job_time_limit (void);
/* list_append_list - Appends the elements of from list onto the to list */
extern void list_append_list( List to, List from );
/* list_compare_config - compare two entry from the config list based upon weight */ /* list_compare_config - compare two entry from the config list based upon weight */
extern int list_compare_config (void *config_entry1, void *config_entry2); extern int list_compare_config (void *config_entry1, void *config_entry2);
...@@ -362,6 +365,16 @@ extern void pack_all_jobs (char **buffer_ptr, int *buffer_size, ...@@ -362,6 +365,16 @@ extern void pack_all_jobs (char **buffer_ptr, int *buffer_size,
*/ */
extern void pack_all_node (char **buffer_ptr, int *buffer_size, time_t * update_time); extern void pack_all_node (char **buffer_ptr, int *buffer_size, time_t * update_time);
/* pack_ctld_job_step_info_reponse_msg - packs a job_step_info_response message
* IN - steps - a list of steps to send
* NOTE: This xmallocs memory for the buffer, so the user must use xfree on it.
*/
extern void pack_ctld_job_step_info_reponse_msg( List steps, void** buffer_base, int* buffer_size );
/* pack_ctld_job_step_info - packs a job_step_info_t from a step_record
*/
extern void pack_ctld_job_step_info( struct step_record* step, void **buf_ptr, int *buf_len);
/* /*
* pack_all_part - dump all partition information for all partitions in * pack_all_part - dump all partition information for all partitions in
* machine independent form (for network transmission) * machine independent form (for network transmission)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment