Skip to content
Snippets Groups Projects
Commit ca0d89cc authored by Moe Jette's avatar Moe Jette
Browse files

Remove just started test code slurm_protocol_message_server_daemon.c,

Get test code pair slurm_protocol_message_server.c and
slurm_protocol_message_client.c working.
parent c056e660
No related branches found
No related tags found
No related merge requests found
...@@ -6,6 +6,6 @@ else ...@@ -6,6 +6,6 @@ else
elan_testprogs = elan_testprogs =
endif endif
noinst_PROGRAMS = $(elan_testprogs) pack-test log-test stream_server stream_client bitstring-test slurm_protocol_message_client slurm_protocol_message_server slurm_protocol_message_server_daemon noinst_PROGRAMS = $(elan_testprogs) pack-test log-test stream_server stream_client bitstring-test slurm_protocol_message_client slurm_protocol_message_server
LDADD = $(top_srcdir)/src/common/libcommon.la LDADD = $(top_srcdir)/src/common/libcommon.la
...@@ -6,32 +6,47 @@ ...@@ -6,32 +6,47 @@
int main ( int argc , char * argv[] ) int main ( int argc , char * argv[] )
{ {
/* declare file descriptors */
slurm_fd worker_socket ; slurm_fd worker_socket ;
/* declare address structures */
slurm_addr worker_address ; slurm_addr worker_address ;
slurm_addr peer_address ; slurm_msg_t msg;
slurm_msg_type_t msg_type ; slurm_msg_t resp;
int16_t port;
unsigned int buffer_len = 1024 ; update_node_msg_t *in_msg, out_msg;
char buf_temp [ buffer_len ] ;
char * buffer = buf_temp ;
char * test_send = "This is a test of simple socket communication" ;
unsigned int test_send_len = strlen ( test_send ) ;
unsigned int length_io ;
/* init address sturctures */ /* init address sturctures */
slurm_set_addr_uint ( & worker_address , 7001 , SLURM_INADDR_ANY ) ; if (argc > 1)
/* open and listen on socket */ port = atoi( argv[1] ) ;
if ((argc < 2) || (port < 1)) {
printf("Usage: %s <port_number>\n", argv[0] );
exit( 1 );
}
slurm_set_addr_uint ( & worker_address , port , SLURM_INADDR_ANY ) ;
worker_socket = slurm_open_msg_conn ( & worker_address ) ; worker_socket = slurm_open_msg_conn ( & worker_address ) ;
length_io = slurm_receive_buffer ( worker_socket , & peer_address, & msg_type , buffer , buffer_len ) ; msg.address = worker_address;
printf ( "Bytes Recieved %i\n", length_io ) ; msg.msg_type = REQUEST_UPDATE_NODE;
out_msg.node_state = 0x1234;
out_msg.node_names = "Test message";
msg.data = &out_msg;
slurm_send_node_msg( worker_socket , &msg ) ;
printf("Sending message=%s\n", out_msg.node_names);
if (slurm_receive_msg (worker_socket, &resp) < 0) {
printf("Error reading slurm_receive_msg %m\n");
exit(1);
}
if (resp.msg_type != REQUEST_UPDATE_NODE) {
printf("Got wrong message type: %u\n", resp.msg_type);
exit(1);
}
in_msg = (update_node_msg_t *) resp.data;
printf("Message received=%s\n", in_msg->node_names);
msg_type = 1 ; msg.address = worker_address;
length_io = slurm_send_node_buffer ( worker_socket , & peer_address, msg_type , test_send , test_send_len ) ; msg.msg_type = REQUEST_SHUTDOWN_IMMEDIATE;
printf ( "Bytes Sent %i\n", length_io ) ; printf("Sending server shutdown request\n");
slurm_send_node_msg( worker_socket , &msg ) ;
slurm_shutdown_msg_conn ( worker_socket ) ; slurm_shutdown_msg_conn ( worker_socket ) ;
......
...@@ -6,40 +6,54 @@ ...@@ -6,40 +6,54 @@
int main ( int argc , char * argv[] ) int main ( int argc , char * argv[] )
{ {
/* declare file descriptors */
slurm_fd listen_socket ; slurm_fd listen_socket ;
slurm_fd worker_socket ; slurm_fd worker_socket ;
/* declare address structures */
slurm_addr listen_address ;
slurm_addr peer_address ; slurm_addr peer_address ;
slurm_msg_t msg;
slurm_msg_type_t msg_type; slurm_msg_t resp;
int16_t port;
unsigned int buffer_len = 1024 ; update_node_msg_t *in_msg, out_msg;
char buf_temp [ buffer_len ] ;
char * buffer = buf_temp ;
char * test_send = "This is a test of simple socket communication" ;
unsigned int test_send_len = strlen ( test_send ) ;
unsigned int length_io ;
/* init address sturctures */ /* init address sturctures */
slurm_set_addr_uint ( & listen_address , 7001 , SLURM_INADDR_ANY ) ; if (argc > 1)
/* open and listen on socket */ port = atoi( argv[1] ) ;
listen_socket = slurm_init_msg_engine ( & listen_address ) ;
if ((argc < 2) || (port < 1)) {
printf("Usage: %s <port_number>\n", argv[0] );
exit( 1 );
}
listen_socket = slurm_init_msg_engine_port (port);
printf ( "listen socket %i\n", listen_socket ) ; printf ( "listen socket %i\n", listen_socket ) ;
worker_socket = slurm_accept_msg_conn ( listen_socket , & peer_address ) ; worker_socket = slurm_accept_msg_conn ( listen_socket , & peer_address ) ;
printf ( "worker socket %i\n", worker_socket ) ; printf ( "worker socket %i\n", worker_socket ) ;
while (1) {
msg_type = 1 ; if (slurm_receive_msg (worker_socket, &msg) == SLURM_SOCKET_ERROR ) {
length_io = slurm_send_node_buffer ( worker_socket , & peer_address, msg_type , test_send , test_send_len ) ; printf ("slurm_receive_msg error\n");
printf ( "Bytes Sent %i\n", length_io ) ; break;
}
length_io = slurm_receive_buffer ( worker_socket , & peer_address, & msg_type , buffer , buffer_len ) ;
printf ( "Bytes Recieved %i\n", length_io ) ; if (msg.msg_type == REQUEST_SHUTDOWN_IMMEDIATE) {
printf ("processing shutdown request\n");
break;
}
if (msg.msg_type == REQUEST_UPDATE_NODE) {
in_msg = (update_node_msg_t *) msg.data;
if (msg.data_size > 0)
printf ("Message received=%s\n",in_msg->node_names);
}
resp.address = msg.address;
resp.msg_type = REQUEST_UPDATE_NODE;
out_msg.node_state = 0x1234;
out_msg.node_names = "Message received";
resp.data = &out_msg;
printf("Sending message=%s\n", out_msg.node_names);
slurm_send_node_msg( worker_socket , &resp ) ;
}
slurm_shutdown_msg_engine ( worker_socket ) ; slurm_shutdown_msg_engine ( worker_socket ) ;
return 0 ; return 0 ;
} }
/*****************************************************************************\
* slurm_protocol_message_server_daemon.c -
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Kevin Tew <tew1@llnl.gov> et. al.
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with ConMan; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#ifdef have_config_h
# include <config.h>
#endif
#include <errno.h>
#include <stdio.h>
#include "slurmctld.h"
#include "pack.h"
#include <src/common/slurm_protocol_api.h>
#define BUF_SIZE 1024
time_t init_time;
void slurmex_req ( slurm_msg_t * msg );
/*
inline static void slurm_rpc_dump_build ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_nodes ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_partitions ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_jobs ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_cancel ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_node_registration ( slurm_msg_t * msg ) ;
*/
int
main (int argc, char *argv[])
{
int error_code ;
slurm_fd newsockfd;
slurm_fd sockfd;
slurm_msg_t * msg = NULL ;
slurm_addr cli_addr ;
char node_name[MAX_NAME_LEN];
log_options_t opts = LOG_OPTS_STDERR_ONLY ;
init_time = time (NULL);
log_init(argv[0], opts, SYSLOG_FACILITY_DAEMON, NULL);
if ( ( error_code = init_slurm_conf () ) )
fatal ("slurmd: init_slurm_conf error %d", error_code);
if ( ( error_code = read_slurm_conf ( ) ) )
fatal ("slurmd: error %d from read_slurm_conf reading %s", error_code, SLURM_CONFIG_FILE);
if ( ( error_code = gethostname (node_name, MAX_NAME_LEN) ) )
fatal ("slurmd: errno %d from gethostname", errno);
if ( ( sockfd = slurm_init_msg_engine_port ( atoi (SLURMCTLD_PORT) ) )
== SLURM_SOCKET_ERROR )
fatal ("slurmctld: error starting message engine \n", errno);
while (1)
{
/* accept needed for stream implementation
* is a no-op in message implementation that just passes sockfd to newsockfd
*/
if ( ( newsockfd = slurm_accept_msg_conn ( sockfd , & cli_addr ) ) == SLURM_SOCKET_ERROR )
{
error ("slurmctld: error %d from connect", errno) ;
break ;
}
/* receive message call that must occur before thread spawn because in message
* implementation their is no connection and the message is the sign of a new connection */
msg = xmalloc ( sizeof ( slurm_msg_t ) ) ;
if (msg == NULL)
return ENOMEM;
if ( ( error_code = slurm_receive_msg ( newsockfd , msg ) ) == SLURM_SOCKET_ERROR )
{
error ("slurmctld: error %d from accept", errno);
break ;
}
msg -> conn_fd = newsockfd ;
/*************************
* convert to pthread, tbd
*************************/
slurmex_req ( msg ); /* process the request */
/* close should only be called when the stream implementation is being used
* the following call will be a no-op in the message implementation */
slurm_close_accepted_conn ( newsockfd ); /* close the new socket */
}
return 0 ;
}
void
slurmex_req ( slurm_msg_t * msg )
{
switch ( msg->msg_type )
{
/*
case REQUEST_BUILD_INFO:
slurm_rpc_dump_build ( msg ) ;
slurm_free_last_update_msg ( msg -> data ) ;
break;
case REQUEST_NODE_INFO:
slurm_rpc_dump_nodes ( msg ) ;
slurm_free_last_update_msg ( msg -> data ) ;
break ;
case REQUEST_JOB_INFO:
slurm_rpc_dump_jobs ( msg ) ;
slurm_free_last_update_msg ( msg -> data ) ;
break;
case REQUEST_PARTITION_INFO:
slurm_rpc_dump_partitions ( msg ) ;
slurm_free_last_update_msg ( msg -> data ) ;
break;
case REQUEST_RESOURCE_ALLOCATION:
break;
case REQUEST_CANCEL_JOB:
slurm_rpc_job_cancel ( msg ) ;
slurm_free_job_id_msg ( msg -> data ) ;
break;
case REQUEST_SUBMIT_BATCH_JOB:
slurm_rpc_submit_batch_job ( msg ) ;
slurm_free_job_desc_msg ( msg -> data ) ;
break;
case REQUEST_NODE_REGISRATION_STATUS:
break;
case REQUEST_RECONFIGURE:
slurm_rpc_reconfigure_controller ( msg ) ;
break;
*/
default:
error ("slurmctld_req: invalid request msg type %d\n", msg-> msg_type);
slurm_send_rc_msg ( msg , EINVAL );
break;
}
slurm_free_msg ( msg ) ;
}
/* Reconfigure - re-initialized from configuration files */
void
slurm_rpc_ex_example ( slurm_msg_t * msg )
{
/* init */
int error_code;
clock_t start_time;
start_time = clock ();
/* do RPC call */
/*error_code = init_slurm_conf ();
if (error_code == 0)
error_code = read_slurm_conf ( );
reset_job_bitmaps ();
*/
/* return result */
if (error_code)
{
error ("slurmctld_req: reconfigure error %d, time=%ld",
error_code, (long) (clock () - start_time));
slurm_send_rc_msg ( msg , error_code );
}
else
{
info ("slurmctld_req: reconfigure completed successfully, time=%ld",
(long) (clock () - start_time));
slurm_send_rc_msg ( msg , SLURM_SUCCESS );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment