Skip to content
Snippets Groups Projects
Commit 1f2f8f00 authored by tewk's avatar tewk
Browse files

Implemented new comm layer

unpacks for client side info calls in progress
parent cf92a974
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,9 @@ ...@@ -25,6 +25,9 @@
#include "slurm.h" #include "slurm.h"
#include "nodelist.h" #include "nodelist.h"
#include <src/common/slurm_protocol_api.h>
void slurm_print_job_table (struct job_table * job_ptr );
#if DEBUG_MODULE #if DEBUG_MODULE
/* main is used here for testing purposes only */ /* main is used here for testing purposes only */
int int
...@@ -45,60 +48,62 @@ main (int argc, char *argv[]) ...@@ -45,60 +48,62 @@ main (int argc, char *argv[])
job_buffer_ptr->last_update, job_buffer_ptr->job_count); job_buffer_ptr->last_update, job_buffer_ptr->job_count);
job_ptr = job_buffer_ptr->job_table_ptr; job_ptr = job_buffer_ptr->job_table_ptr;
for (i = 0; i < job_buffer_ptr->job_count; i++) { slurm_print_job_info ( job_ptr ) ;
printf ("JobId=%u UserId=%u ",
job_ptr[i].job_id, job_ptr[i].user_id);
printf ("JobState=%u TimeLimit=%u ",
job_ptr[i].job_state, job_ptr[i].time_limit);
printf ("Priority=%u Partition=%s\n",
job_ptr[i].priority, job_ptr[i].partition);
printf (" Name=%s NodeList=%s ",
job_ptr[i].name, job_ptr[i].nodes);
printf ("StartTime=%x EndTime=%x\n",
(uint32_t) job_ptr[i].start_time,
(uint32_t) job_ptr[i].end_time);
printf (" NodeListIndecies=");
for (j = 0; job_ptr[i].node_inx; j++) {
if (j > 0)
printf(",%d", job_ptr[i].node_inx[j]);
else
printf("%d", job_ptr[i].node_inx[j]);
if (job_ptr[i].node_inx[j] == -1)
break;
}
printf("\n");
printf (" ReqProcs=%u ReqNodes=%u ",
job_ptr[i].num_procs, job_ptr[i].num_nodes);
printf ("Shared=%u Contiguous=%u ",
job_ptr[i].shared, job_ptr[i].contiguous);
printf ("MinProcs=%u MinMemory=%u ",
job_ptr[i].min_procs, job_ptr[i].min_memory);
printf ("MinTmpDisk=%u\n",
job_ptr[i].min_tmp_disk);
printf (" ReqNodeList=%s Features=%s ",
job_ptr[i].req_nodes, job_ptr[i].features);
printf ("JobScript=%s\n",
job_ptr[i].job_script);
printf (" ReqNodeListIndecies=");
for (j = 0; job_ptr[i].req_node_inx; j++) {
if (j > 0)
printf(",%d", job_ptr[i].req_node_inx[j]);
else
printf("%d", job_ptr[i].req_node_inx[j]);
if (job_ptr[i].req_node_inx[j] == -1)
break;
}
printf("\n\n");
}
slurm_free_job_info (job_buffer_ptr); slurm_free_job_info (job_buffer_ptr);
exit (0); exit (0);
} }
#endif #endif
void
slurm_print_job_info ( job_info_msg_t * job_info_msg_ptr )
{
int i;
job_table_t * job_ptr = job_info_msg_ptr -> job_array ;
for (i = 0; i < job_info_msg_ptr-> record_count; i++)
{
slurm_print_job_table ( & job_ptr[i] ) ;
}
}
void
slurm_print_job_table (struct job_table * job_ptr )
{
int j;
printf ("JobId=%u UserId=%u ", job_ptr->job_id, job_ptr->user_id);
printf ("JobState=%u TimeLimit=%u ", job_ptr->job_state, job_ptr->time_limit);
printf ("Priority=%u Partition=%s\n", job_ptr->priority, job_ptr->partition);
printf (" Name=%s NodeList=%s ", job_ptr->name, job_ptr->nodes);
printf ("StartTime=%x EndTime=%x\n", (uint32_t) job_ptr->start_time, (uint32_t) job_ptr->end_time);
printf (" NodeListIndecies=");
for (j = 0; job_ptr->node_inx; j++) {
if (j > 0)
printf(",%d", job_ptr->node_inx[j]);
else
printf("%d", job_ptr->node_inx[j]);
if (job_ptr->node_inx[j] == -1)
break;
}
printf("\n");
printf (" ReqProcs=%u ReqNodes=%u ", job_ptr->num_procs, job_ptr->num_nodes);
printf ("Shared=%u Contiguous=%u ", job_ptr->shared, job_ptr->contiguous);
printf ("MinProcs=%u MinMemory=%u ", job_ptr->min_procs, job_ptr->min_memory);
printf ("MinTmpDisk=%u\n", job_ptr->min_tmp_disk);
printf (" ReqNodeList=%s Features=%s ", job_ptr->req_nodes, job_ptr->features);
printf ("JobScript=%s\n", job_ptr->job_script);
printf (" ReqNodeListIndecies=");
for (j = 0; job_ptr->req_node_inx; j++) {
if (j > 0)
printf(",%d", job_ptr->req_node_inx[j]);
else
printf("%d", job_ptr->req_node_inx[j]);
if (job_ptr->req_node_inx[j] == -1)
break;
}
printf("\n\n");
}
/* /*
* slurm_free_job_info - free the job information buffer (if allocated) * slurm_free_job_info - free the job information buffer (if allocated)
...@@ -140,53 +145,75 @@ slurm_free_job_info (struct job_buffer *job_buffer_ptr) ...@@ -140,53 +145,75 @@ slurm_free_job_info (struct job_buffer *job_buffer_ptr)
int int
slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr) slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr)
{ {
int buffer_offset, buffer_size, in_size, i, sockfd; int msg_size ;
char request_msg[64], *buffer,*node_inx_str; int rc ;
slurm_fd sockfd ;
slurm_msg_t request_msg ;
slurm_msg_t response_msg ;
last_update_msg_t last_time_msg ;
/* init message connection for message communication with controller */
if ( ( sockfd = slurm_open_controller_conn ( SLURM_PORT ) ) == SLURM_SOCKET_ERROR )
return SLURM_SOCKET_ERROR ;
/* send request message */
/* pack32 ( update_time , &buf_ptr , &buffer_size ); */
last_time_msg . last_update = update_time ;
request_msg . msg_type = REQUEST_JOB_INFO ;
request_msg . data = &last_time_msg ;
if ( ( rc = slurm_send_controller_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR )
return SLURM_SOCKET_ERROR ;
/* receive message */
if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR )
return SLURM_SOCKET_ERROR ;
/* shutdown message connection */
if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR )
return SLURM_SOCKET_ERROR ;
switch ( response_msg . msg_type )
{
case REQUEST_JOB_INFO:
/** *build_table_ptr = ( build_table_t * ) response_msg . data ; */
break ;
case RESPONSE_SLURM_RC:
break ;
default:
return SLURM_UNEXPECTED_MSG_ERROR ;
break ;
}
return SLURM_SUCCESS ;
}
int unpack_job_info_buffer ( job_info_msg_t * msg )
{
int buffer_offset ;
int buffer_size;
int in_size;
int uint32_record_count ;
int i;
char *buffer,*node_inx_str;
void *buf_ptr; void *buf_ptr;
struct sockaddr_in serv_addr;
uint16_t uint16_tmp; uint16_t uint16_tmp;
uint32_t uint32_tmp, uint32_time; uint32_t uint32_time;
uint32_t uint32_tmp;
struct job_table *job; struct job_table *job;
char ** job_buffer_ptr ;
*job_buffer_ptr = NULL; *job_buffer_ptr = NULL;
if ((sockfd = socket (AF_INET, SOCK_STREAM, 0)) < 0)
return EINVAL;
serv_addr.sin_family = PF_INET;
serv_addr.sin_addr.s_addr = inet_addr (SLURMCTLD_HOST);
serv_addr.sin_port = htons (SLURMCTLD_PORT);
if (connect
(sockfd, (struct sockaddr *) &serv_addr,
sizeof (serv_addr)) < 0) {
close (sockfd);
return EINVAL;
}
sprintf (request_msg, "DumpJob LastUpdate=%lu",
(long) (update_time));
if (send (sockfd, request_msg, strlen (request_msg) + 1, 0) <
strlen (request_msg)) {
close (sockfd);
return EINVAL;
}
buffer = NULL;
buffer_offset = 0;
buffer_size = 8 * 1024;
while (1) {
buffer = realloc (buffer, buffer_size);
if (buffer == NULL) {
close (sockfd);
return ENOMEM;
}
in_size =
recv (sockfd, &buffer[buffer_offset],
(buffer_size - buffer_offset), 0);
if (in_size <= 0) { /* end of input */
in_size = 0;
break;
}
buffer_offset += in_size;
buffer_size += in_size;
}
close (sockfd);
buffer_size = buffer_offset + in_size; buffer_size = buffer_offset + in_size;
buffer = realloc (buffer, buffer_size); buffer = realloc (buffer, buffer_size);
if (buffer == NULL) if (buffer == NULL)
...@@ -198,12 +225,8 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr) ...@@ -198,12 +225,8 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr)
/* load buffer's header (data structure version and time) */ /* load buffer's header (data structure version and time) */
buf_ptr = buffer; buf_ptr = buffer;
unpack32 (&uint32_tmp, &buf_ptr, &buffer_size);
if (uint32_tmp != JOB_STRUCT_VERSION) {
free (buffer);
return EINVAL;
}
unpack32 (&uint32_time, &buf_ptr, &buffer_size); unpack32 (&uint32_time, &buf_ptr, &buffer_size);
unpack32 (&uint32_record_count, &buf_ptr, &buffer_size);
/* load individual job info */ /* load individual job info */
job = NULL; job = NULL;
...@@ -224,20 +247,16 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr) ...@@ -224,20 +247,16 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr)
job[i].end_time = (time_t) uint32_tmp; job[i].end_time = (time_t) uint32_tmp;
unpack32 (&job[i].priority, &buf_ptr, &buffer_size); unpack32 (&job[i].priority, &buf_ptr, &buffer_size);
unpackstr_ptr (&job[i].nodes, &uint16_tmp, unpackstr_ptr (&job[i].nodes, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (job[i].nodes == NULL) if (job[i].nodes == NULL)
job[i].nodes = ""; job[i].nodes = "";
unpackstr_ptr (&job[i].partition, &uint16_tmp, unpackstr_ptr (&job[i].partition, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (job[i].partition == NULL) if (job[i].partition == NULL)
job[i].partition = ""; job[i].partition = "";
unpackstr_ptr (&job[i].name, &uint16_tmp, unpackstr_ptr (&job[i].name, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (job[i].name == NULL) if (job[i].name == NULL)
job[i].name = ""; job[i].name = "";
unpackstr_ptr (&node_inx_str, &uint16_tmp, unpackstr_ptr (&node_inx_str, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (node_inx_str == NULL) if (node_inx_str == NULL)
node_inx_str = ""; node_inx_str = "";
job[i].node_inx = bitfmt2int(node_inx_str); job[i].node_inx = bitfmt2int(node_inx_str);
...@@ -251,21 +270,17 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr) ...@@ -251,21 +270,17 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr)
unpack32 (&job[i].min_memory, &buf_ptr, &buffer_size); unpack32 (&job[i].min_memory, &buf_ptr, &buffer_size);
unpack32 (&job[i].min_tmp_disk, &buf_ptr, &buffer_size); unpack32 (&job[i].min_tmp_disk, &buf_ptr, &buffer_size);
unpackstr_ptr (&job[i].req_nodes, &uint16_tmp, unpackstr_ptr (&job[i].req_nodes, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (job[i].req_nodes == NULL) if (job[i].req_nodes == NULL)
job[i].req_nodes = ""; job[i].req_nodes = "";
unpackstr_ptr (&node_inx_str, &uint16_tmp, unpackstr_ptr (&node_inx_str, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (node_inx_str == NULL) if (node_inx_str == NULL)
node_inx_str = ""; node_inx_str = "";
job[i].req_node_inx = bitfmt2int(node_inx_str); job[i].req_node_inx = bitfmt2int(node_inx_str);
unpackstr_ptr (&job[i].features, &uint16_tmp, unpackstr_ptr (&job[i].features, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (job[i].features == NULL) if (job[i].features == NULL)
job[i].features = ""; job[i].features = "";
unpackstr_ptr (&job[i].job_script, &uint16_tmp, unpackstr_ptr (&job[i].job_script, &uint16_tmp, &buf_ptr, &buffer_size);
&buf_ptr, &buffer_size);
if (job[i].job_script == NULL) if (job[i].job_script == NULL)
job[i].job_script = ""; job[i].job_script = "";
} }
...@@ -283,9 +298,11 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr) ...@@ -283,9 +298,11 @@ slurm_load_job (time_t update_time, struct job_buffer **job_buffer_ptr)
} }
return ENOMEM; return ENOMEM;
} }
/*
(*job_buffer_ptr)->last_update = (time_t) uint32_time; (*job_buffer_ptr)->last_update = (time_t) uint32_time;
(*job_buffer_ptr)->job_count = i; (*job_buffer_ptr)->job_count = i;
(*job_buffer_ptr)->raw_buffer_ptr = buffer; (*job_buffer_ptr)->raw_buffer_ptr = buffer;
(*job_buffer_ptr)->job_table_ptr = job; (*job_buffer_ptr)->job_table_ptr = job;
*/
return 0; return 0;
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment