-
Danny Auble authored
fixed forward logic to report correct reponse for all childern and parent. Fixed jobacct logic to always be able to work off submit time. changed logic to in shutdown to wait around to make sure shutdown message is sent to all slurmds.
Danny Auble authoredfixed forward logic to report correct reponse for all childern and parent. Fixed jobacct logic to always be able to work off submit time. changed logic to in shutdown to wait around to make sure shutdown message is sent to all slurmds.
slurm_protocol_api.c 47.13 KiB
/*****************************************************************************\
* slurm_protocol_api.c - high-level slurm communication functions
* $Id$
*****************************************************************************
* Copyright (C) 2002-2006 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by Kevin Tew <tew1@llnl.gov>, et. al.
* UCRL-CODE-217948.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#if HAVE_CONFIG_H
# include "config.h"
#endif
/* GLOBAL INCLUDES */
#ifdef WITH_PTHREADS
# include <pthread.h>
#endif /* WITH_PTHREADS */
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
/* PROJECT INCLUDES */
#include "src/common/macros.h"
#include "src/common/pack.h"
#include "src/common/parse_spec.h"
#include "src/common/read_config.h"
#include "src/common/slurm_auth.h"
#include "src/common/slurm_protocol_interface.h"
#include "src/common/slurm_protocol_api.h"
#include "src/common/slurm_protocol_common.h"
#include "src/common/slurm_protocol_pack.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/log.h"
#include "src/common/forward.h"
/* EXTERNAL VARIABLES */
/* #DEFINES */
#define _DEBUG 0
#define MAX_SHUTDOWN_RETRY 5
#define MAX_RETRIES 3
/* STATIC VARIABLES */
/* static pthread_mutex_t config_lock = PTHREAD_MUTEX_INITIALIZER; */
static slurm_protocol_config_t proto_conf_default;
static slurm_protocol_config_t *proto_conf = &proto_conf_default;
/* static slurm_ctl_conf_t slurmctld_conf; */
/* STATIC FUNCTIONS */
static void _remap_slurmctld_errno(void);
/**********************************************************************\
* protocol configuration functions
\**********************************************************************/
/* slurm_set_api_config
* sets the slurm_protocol_config object
* NOT THREAD SAFE
* IN protocol_conf - slurm_protocol_config object
*
* XXX: Why isn't the "config_lock" mutex used here?
*/
int slurm_set_api_config(slurm_protocol_config_t * protocol_conf)
{
proto_conf = protocol_conf;
return SLURM_SUCCESS;
}
/* slurm_get_api_config
* returns a pointer to the current slurm_protocol_config object
* RET slurm_protocol_config_t - current slurm_protocol_config object
*/
slurm_protocol_config_t *slurm_get_api_config()
{
return proto_conf;
}
/* slurm_api_set_conf_file
* set slurm configuration file to a non-default value
* pathname IN - pathname of slurm configuration file to be used
*/
extern void slurm_api_set_conf_file(char *pathname)
{
slurm_conf_reinit(pathname);
return;
}
/* slurm_api_set_default_config
* called by the send_controller_msg function to insure that at least
* the compiled in default slurm_protocol_config object is initialized
* RET int - return code
*/
int slurm_api_set_default_config()
{
int rc = SLURM_SUCCESS;
slurm_ctl_conf_t *conf;
/*slurm_conf_init(NULL);*/
conf = slurm_conf_lock();
if (conf->control_addr == NULL) {
error("Unable to establish controller machine");
rc = SLURM_ERROR;
goto cleanup;
}
if (conf->slurmctld_port == 0) {
error("Unable to establish controller port");
rc = SLURM_ERROR;
goto cleanup;
}
slurm_set_addr(&proto_conf_default.primary_controller,
conf->slurmctld_port,
conf->control_addr);
if (proto_conf_default.primary_controller.sin_port == 0) {
error("Unable to establish control machine address");
rc = SLURM_ERROR;
goto cleanup;
}
if (conf->backup_addr) {
slurm_set_addr(&proto_conf_default.secondary_controller,
conf->slurmctld_port,
conf->backup_addr);
}
proto_conf = &proto_conf_default;
cleanup:
slurm_conf_unlock();
return rc;
}
/* slurm_api_clear_config
* execute this only at program termination to free all memory */
void slurm_api_clear_config(void)
{
slurm_conf_destroy();
}
/* update internal configuration data structure as needed.
* exit with lock set */
/* static inline void _lock_update_config() */
/* { */
/* slurm_api_set_default_config(); */
/* slurm_mutex_lock(&config_lock); */
/* } */
/* slurm_get_mpi_default
* get default mpi value from slurmctld_conf object
* RET char * - mpi default value from slurm.conf, MUST be xfreed by caller
*/
char *slurm_get_mpi_default(void)
{
char *mpi_default;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
mpi_default = xstrdup(conf->mpi_default);
slurm_conf_unlock();
return mpi_default;
}
/* slurm_get_plugin_dir
* get plugin directory from slurmctld_conf object
* RET char * - plugin directory, MUST be xfreed by caller
*/
char *slurm_get_plugin_dir(void)
{
char *plugin_dir;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
plugin_dir = xstrdup(conf->plugindir);
slurm_conf_unlock();
return plugin_dir;
}
/* slurm_get_auth_type
* returns the authentication type from slurmctld_conf object
* RET char * - auth type, MUST be xfreed by caller
*/
char *slurm_get_auth_type(void)
{
char *auth_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
auth_type = xstrdup(conf->authtype);
slurm_conf_unlock();
return auth_type;
}
/* slurm_get_propagate_prio_process
* return the PropagatePrioProcess flag from slurmctld_conf object
*/
extern uint16_t slurm_get_propagate_prio_process(void)
{
uint16_t propagate_prio;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
propagate_prio = conf->propagate_prio_process;
slurm_conf_unlock();
return propagate_prio;
}
/* slurm_get_fast_schedule
* returns the value of fast_schedule in slurmctld_conf object
*/
extern uint16_t slurm_get_fast_schedule(void)
{
uint16_t fast_val;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
fast_val = conf->fast_schedule;
slurm_conf_unlock();
return fast_val;
}
/* slurm_set_tree_width
* sets the value of tree_width in slurmctld_conf object
* RET 0 or error code
*/
extern int slurm_set_tree_width(uint16_t tree_width)
{
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
if (tree_width == 0) {
error("can't have span count of 0");
return SLURM_ERROR;
}
conf->tree_width = tree_width;
slurm_conf_unlock();
return SLURM_SUCCESS;
}
/* slurm_get_tree_width
* returns the value of tree_width in slurmctld_conf object
*/
extern uint16_t slurm_get_tree_width(void)
{
uint16_t tree_width;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
tree_width = conf->tree_width;
slurm_conf_unlock();
return tree_width;
}
/* slurm_set_auth_type
* set the authentication type in slurmctld_conf object
* used for security testing purposes
* RET 0 or error code
*/
extern int slurm_set_auth_type(char *auth_type)
{
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
xfree(conf->authtype);
conf->authtype = xstrdup(auth_type);
slurm_conf_unlock();
return 0;
}
/* slurm_get_jobacct_loc
* returns the job accounting loc from the slurmctld_conf object
* RET char * - job accounting loc, MUST be xfreed by caller
*/
char *slurm_get_jobacct_loc(void)
{
char *jobacct_logfile;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
jobacct_logfile = xstrdup(conf->job_acct_logfile);
slurm_conf_unlock();
return jobacct_logfile;
}
/* slurm_get_jobacct_freq
* returns the job accounting poll frequency from the slurmctld_conf object
* RET int - job accounting frequency
*/
uint16_t slurm_get_jobacct_freq(void)
{
uint16_t freq;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
freq = conf->job_acct_freq;
slurm_conf_unlock();
return freq;
}
/* slurm_get_jobacct_type
* returns the job accounting type from the slurmctld_conf object
* RET char * - job accounting type, MUST be xfreed by caller
*/
char *slurm_get_jobacct_type(void)
{
char *jobacct_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
jobacct_type = xstrdup(conf->job_acct_type);
slurm_conf_unlock();
return jobacct_type;
}
/* slurm_get_jobcomp_type
* returns the job completion logger type from slurmctld_conf object
* RET char * - job completion type, MUST be xfreed by caller
*/
char *slurm_get_jobcomp_type(void)
{
char *jobcomp_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
jobcomp_type = xstrdup(conf->job_comp_type);
slurm_conf_unlock();
return jobcomp_type;
}
/* slurm_get_proctrack_type
* get ProctrackType from slurmctld_conf object
* RET char * - proctrack type, MUST be xfreed by caller
*/
char *slurm_get_proctrack_type(void)
{
char *proctrack_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
proctrack_type = xstrdup(conf->proctrack_type);
slurm_conf_unlock();
return proctrack_type;
}
/* slurm_get_slurmd_port
* returns slurmd port from slurmctld_conf object
* RET uint16_t - slurmd port
*/
uint16_t slurm_get_slurmd_port(void)
{
uint16_t slurmd_port;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
slurmd_port = conf->slurmd_port;
slurm_conf_unlock();
return slurmd_port;
}
/* slurm_get_slurm_user_id
* returns slurmd uid from slurmctld_conf object
* RET uint32_t - slurm user id
*/
uint32_t slurm_get_slurm_user_id(void)
{
uint32_t slurm_uid;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
slurm_uid = conf->slurm_user_id;
slurm_conf_unlock();
return slurm_uid;
}
/* slurm_get_sched_type
* get sched type from slurmctld_conf object
* RET char * - sched type, MUST be xfreed by caller
*/
char *slurm_get_sched_type(void)
{
char *sched_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
sched_type = xstrdup(conf->schedtype);
slurm_conf_unlock();
return sched_type;
}
/* slurm_get_select_type
* get select_type from slurmctld_conf object
* RET char * - select_type, MUST be xfreed by caller
*/
char *slurm_get_select_type(void)
{
char *select_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
select_type = xstrdup(conf->select_type);
slurm_conf_unlock();
return select_type;
}
/* slurm_get_switch_type
* get switch type from slurmctld_conf object
* RET char * - switch type, MUST be xfreed by caller
*/
char *slurm_get_switch_type(void)
{
char *switch_type;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
switch_type = xstrdup(conf->switch_type);
slurm_conf_unlock();
return switch_type;
}
/* slurm_get_wait_time
* returns wait_time from slurmctld_conf object
* RET uint16_t - wait_time
*/
uint16_t slurm_get_wait_time(void)
{
uint16_t wait_time;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
wait_time = conf->wait_time;
slurm_conf_unlock();
return wait_time;
}
/* slurm_get_srun_prolog
* return the name of the srun prolog program
* RET char * - name of prolog program, must be xfreed by caller
*/
char *slurm_get_srun_prolog(void)
{
char *prolog;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
prolog = xstrdup(conf->srun_prolog);
slurm_conf_unlock();
return prolog;
}
/* slurm_get_srun_epilog
* return the name of the srun epilog program
* RET char * - name of epilog program, must be xfreed by caller
*/
char *slurm_get_srun_epilog(void)
{
char *epilog;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
epilog = xstrdup(conf->srun_epilog);
slurm_conf_unlock();
return epilog;
}
/* slurm_get_task_epilog
* RET task_epilog name, must be xfreed by caller */
char *slurm_get_task_epilog(void)
{
char *task_epilog;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
task_epilog = xstrdup(conf->task_epilog);
slurm_conf_unlock();
return task_epilog;
}
/* slurm_get_task_prolog
* RET task_prolog name, must be xfreed by caller */
char *slurm_get_task_prolog(void)
{
char *task_prolog;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
task_prolog = xstrdup(conf->task_prolog);
slurm_conf_unlock();
return task_prolog;
}
/* slurm_get_task_plugin
* RET task_plugin name, must be xfreed by caller */
char *slurm_get_task_plugin(void)
{
char *task_plugin;
slurm_ctl_conf_t *conf;
conf = slurm_conf_lock();
task_plugin = xstrdup(conf->task_plugin);
slurm_conf_unlock();
return task_plugin;
}
/* Change general slurm communication errors to slurmctld specific errors */
static void _remap_slurmctld_errno(void)
{
int err = slurm_get_errno();
if (err == SLURM_COMMUNICATIONS_CONNECTION_ERROR)
slurm_seterrno(SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR);
else if (err == SLURM_COMMUNICATIONS_SEND_ERROR)
slurm_seterrno(SLURMCTLD_COMMUNICATIONS_SEND_ERROR);
else if (err == SLURM_COMMUNICATIONS_RECEIVE_ERROR)
slurm_seterrno(SLURMCTLD_COMMUNICATIONS_RECEIVE_ERROR);
else if (err == SLURM_COMMUNICATIONS_SHUTDOWN_ERROR)
slurm_seterrno(SLURMCTLD_COMMUNICATIONS_SHUTDOWN_ERROR);
}
/**********************************************************************\
* general message management functions used by slurmctld, slurmd
\**********************************************************************/
/*
* Initialize a slurm server at port "port"
*
* IN port - port to bind the msg server to
* RET slurm_fd - file descriptor of the connection created
*/
slurm_fd slurm_init_msg_engine_port(uint16_t port)
{
slurm_addr addr;
slurm_set_addr_any(&addr, port);
return _slurm_init_msg_engine(&addr);
}
/*
* Same as above, but initialize using a slurm address "addr"
*
* IN addr - slurm_addr to bind the msg server to
* RET slurm_fd - file descriptor of the connection created
*/
slurm_fd slurm_init_msg_engine(slurm_addr *addr)
{
return _slurm_init_msg_engine(addr);
}
/*
* Close an established message engine.
* Returns SLURM_SUCCESS or SLURM_FAILURE.
*
* IN fd - an open file descriptor to close
* RET int - the return code
*/
int slurm_shutdown_msg_engine(slurm_fd fd)
{
int rc = _slurm_close(fd);
if (rc)
slurm_seterrno(SLURM_COMMUNICATIONS_SHUTDOWN_ERROR);
return rc;
}
/*
* Close an established message connection.
* Returns SLURM_SUCCESS or SLURM_FAILURE.
*
* IN fd - an open file descriptor to close
* RET int - the return code
*/
int slurm_shutdown_msg_conn(slurm_fd fd)
{
return _slurm_close(fd);
}
/**********************************************************************\
* msg connection establishment functions used by msg clients
\**********************************************************************/
/* In the bsd socket implementation it creates a SOCK_STREAM socket
* and calls connect on it a SOCK_DGRAM socket called with connect
* is defined to only receive messages from the address/port pair
* argument of the connect call slurm_address - for now it is
* really just a sockaddr_in
* IN slurm_address - slurm_addr of the connection destination
* RET slurm_fd - file descriptor of the connection created
*/
slurm_fd slurm_open_msg_conn(slurm_addr * slurm_address)
{
return _slurm_open_msg_conn(slurm_address);
}
/* calls connect to make a connection-less datagram connection to the
* primary or secondary slurmctld message engine
* RET slurm_fd - file descriptor of the connection created
*/
slurm_fd slurm_open_controller_conn()
{
slurm_fd fd;
slurm_ctl_conf_t *conf;
if (slurm_api_set_default_config() < 0)
return SLURM_FAILURE;
if ((fd = slurm_open_msg_conn(&proto_conf->primary_controller)) >= 0)
return fd;
debug("Failed to contact primary controller: %m");
conf = slurm_conf_lock();
if (!conf->backup_controller) {
slurm_conf_unlock();
goto fail;
}
slurm_conf_unlock();
if ((fd = slurm_open_msg_conn(&proto_conf->secondary_controller)) >= 0)
return fd;
debug("Failed to contact secondary controller: %m");
fail:
slurm_seterrno_ret(SLURMCTLD_COMMUNICATIONS_CONNECTION_ERROR);
}
/* calls connect to make a connection-less datagram connection to the
* primary or secondary slurmctld message engine
* RET slurm_fd - file descriptor of the connection created
* IN dest - controller to contact, primary or secondary
*/
slurm_fd slurm_open_controller_conn_spec(enum controller_id dest)
{
slurm_addr *addr;
slurm_fd rc;
if (slurm_api_set_default_config() < 0) {
debug3("Error: Unable to set default config");
return SLURM_ERROR;
}
addr = (dest == PRIMARY_CONTROLLER) ?
&proto_conf->primary_controller :
&proto_conf->secondary_controller;
if (!addr) return SLURM_ERROR;
rc = slurm_open_msg_conn(addr);
if (rc == -1)
_remap_slurmctld_errno();
return rc;
}
/* In the bsd implmentation maps directly to a accept call
* IN open_fd - file descriptor to accept connection on
* OUT slurm_address - slurm_addr of the accepted connection
* RET slurm_fd - file descriptor of the connection created
*/
slurm_fd slurm_accept_msg_conn(slurm_fd open_fd,
slurm_addr * slurm_address)
{
return _slurm_accept_msg_conn(open_fd, slurm_address);
}
/* In the bsd implmentation maps directly to a close call, to close
* the socket that was accepted
* IN open_fd - an open file descriptor to close
* RET int - the return code
*/
int slurm_close_accepted_conn(slurm_fd open_fd)
{
return _slurm_close_accepted_conn(open_fd);
}
/**********************************************************************\
* receive message functions
\**********************************************************************/
/*
* NOTE: memory is allocated for the returned msg and must be freed at
* some point using the slurm_free_functions
* IN open_fd - file descriptor to receive msg on
* OUT msg - a slurm_msg struct to be filled in by the function
* RET int - size of msg received in bytes before being unpacked
*/
List slurm_receive_msg(slurm_fd fd, slurm_msg_t *msg, int timeout)
{
char *buf = NULL;
size_t buflen = 0;
header_t header;
int rc;
void *auth_cred = NULL;
Buf buffer;
/* int count = 0; */
ret_types_t *ret_type = NULL;
/* ListIterator itr; */
List ret_list = list_create(destroy_ret_types);
msg->forward_struct = NULL;
msg->forward_struct_init = 0;
xassert(fd >= 0);
if ((timeout*=1000) == 0)
timeout = SLURM_MESSAGE_TIMEOUT_MSEC_STATIC;
/*
* Receive a msg. slurm_msg_recvfrom() will read the message
* length and allocate space on the heap for a buffer containing
* the message.
*/
if (_slurm_msg_recvfrom_timeout(fd, &buf, &buflen, 0, timeout) < 0) {
forward_init(&header.forward, NULL);
/* no need to init forward_struct_init here */
rc = errno;
goto total_return;
}
#if _DEBUG
_print_data (buftemp, rc);
#endif
buffer = create_buf(buf, buflen);
if(unpack_header(&header, buffer) == SLURM_ERROR) {
free_buf(buffer);
rc = SLURM_COMMUNICATIONS_RECEIVE_ERROR;
goto total_return;
}
if (check_header_version(&header) < 0) {
free_buf(buffer);
rc = SLURM_PROTOCOL_VERSION_ERROR;
goto total_return;
}
//info("ret_cnt = %d",header.ret_cnt);
if(header.ret_cnt > 0) {
while((ret_type = list_pop(header.ret_list)) != NULL)
list_push(ret_list, ret_type);
header.ret_cnt = 0;
list_destroy(header.ret_list);
header.ret_list = NULL;
}
if(header.orig_addr.sin_addr.s_addr != 0) {
memcpy(&msg->orig_addr, &header.orig_addr, sizeof(slurm_addr));
} else {
memcpy(&header.orig_addr, &msg->orig_addr, sizeof(slurm_addr));
}
/* Forward message to other nodes */
if(header.forward.cnt > 0) {
debug("forwarding to %d", header.forward.cnt);
msg->forward_struct = xmalloc(sizeof(forward_struct_t));
msg->forward_struct_init = FORWARD_INIT;
msg->forward_struct->buf_len = remaining_buf(buffer);
msg->forward_struct->buf =
xmalloc(sizeof(char) * msg->forward_struct->buf_len);
memcpy(msg->forward_struct->buf,
&buffer->head[buffer->processed],
msg->forward_struct->buf_len);
msg->forward_struct->ret_list = ret_list;
msg->forward_struct->timeout = timeout-header.forward.timeout;
msg->forward_struct->fwd_cnt = header.forward.cnt;
debug3("forwarding messages to %d nodes!!!!",
msg->forward_struct->fwd_cnt);
if(forward_msg(msg->forward_struct, &header) == SLURM_ERROR) {
error("problem with forward msg");
}
}
if ((auth_cred = g_slurm_auth_unpack(buffer)) == NULL) {
error( "authentication: %s ",
g_slurm_auth_errstr(g_slurm_auth_errno(NULL)));
free_buf(buffer);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
rc = g_slurm_auth_verify( auth_cred, NULL, 2 );
if (rc != SLURM_SUCCESS) {
error( "authentication: %s ",
g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred)));
(void) g_slurm_auth_destroy(auth_cred);
free_buf(buffer);
rc = SLURM_PROTOCOL_AUTHENTICATION_ERROR;
goto total_return;
}
/*
* Unpack message body
*/
msg->msg_type = header.msg_type;
msg->srun_node_id = header.srun_node_id;
if ( (header.body_length > remaining_buf(buffer)) ||
(unpack_msg(msg, buffer) != SLURM_SUCCESS) ) {
(void) g_slurm_auth_destroy(auth_cred);
free_buf(buffer);
rc = ESLURM_PROTOCOL_INCOMPLETE_PACKET;
goto total_return;
}
msg->auth_cred = (void *) auth_cred;
free_buf(buffer);
rc = SLURM_SUCCESS;
total_return:
destroy_forward(&header.forward);
if(rc != SLURM_SUCCESS) {
error("slurm_receive_msg: %s", slurm_strerror(rc));
}
errno = rc;
return ret_list;
}
/**********************************************************************\
* send message functions
\**********************************************************************/
/*
* Do the wonderful stuff that needs be done to pack msg
* and hdr into buffer
*/
static void
_pack_msg(slurm_msg_t *msg, header_t *hdr, Buf buffer)
{
unsigned int tmplen, msglen;
tmplen = get_buf_offset(buffer);
pack_msg(msg, buffer);
msglen = get_buf_offset(buffer) - tmplen;
/* update header with correct cred and msg lengths */
update_header(hdr, msglen);
/* repack updated header */
tmplen = get_buf_offset(buffer);
set_buf_offset(buffer, 0);
pack_header(hdr, buffer);
set_buf_offset(buffer, tmplen);
}
/*
* Adds header to buffer and sends message over an open file descriptor `fd'
* Returns the size of the message sent in bytes, or -1 on failure.
*/
int slurm_add_header_and_send(slurm_fd fd, slurm_msg_t *msg)
{
Buf send_buf = NULL;
header_t header;
int buf_len = get_buf_offset(msg->buffer);
int rc = SLURM_SUCCESS;
void *auth_cred = NULL;
init_header(&header, msg, SLURM_PROTOCOL_NO_FLAGS);
header.body_length = buf_len;
send_buf = init_buf(BUF_SIZE+buf_len);
/*
* Pack header into buffer for transmission
*/
pack_header(&header, send_buf);
/*
* Initialize header with Auth credential and message type.
*/
auth_cred = g_slurm_auth_create(NULL, 2);
if (auth_cred == NULL) {
error("authentication: %s",
g_slurm_auth_errstr(g_slurm_auth_errno(NULL)) );
errno = SLURM_PROTOCOL_AUTHENTICATION_ERROR;
goto ret_error;
}
/*
* Pack auth credential
*/
rc = g_slurm_auth_pack(auth_cred, send_buf);
(void) g_slurm_auth_destroy(auth_cred);
if (rc) {
error("authentication: %s",
g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred)));
free_buf(send_buf);
send_buf = NULL;
errno = SLURM_PROTOCOL_AUTHENTICATION_ERROR;
goto ret_error;
}
/* add to send_buf */
if (remaining_buf(send_buf) < buf_len) {
send_buf->size += (buf_len + BUF_SIZE);
xrealloc(send_buf->head, send_buf->size);
}
if (buf_len) {
memcpy(&send_buf->head[send_buf->processed],
msg->buffer->head, buf_len);
send_buf->processed += buf_len;
}
/*
* send message
*/
rc = _slurm_msg_sendto(fd,
get_buf_data(send_buf),
get_buf_offset(send_buf),
SLURM_PROTOCOL_NO_SEND_RECV_FLAGS);
ret_error:
if (rc < 0)
error("slurm_add_header_and_send: %m");
free_buf(send_buf);
return rc;
}
/*
* Send a slurm message over an open file descriptor `fd'
* Returns the size of the message sent in bytes, or -1 on failure.
*/
int slurm_send_node_msg(slurm_fd fd, slurm_msg_t * msg)
{
header_t header;
Buf buffer;
int rc;
void * auth_cred;
/*
* Initialize header with Auth credential and message type.
*/
auth_cred = g_slurm_auth_create(NULL, 2);
if (auth_cred == NULL) {
error("authentication: %s",
g_slurm_auth_errstr(g_slurm_auth_errno(NULL)) );
slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
}
if(msg->forward.init != FORWARD_INIT) {
forward_init(&msg->forward, NULL);
/* no need to init forward_struct_init here */
msg->ret_list = NULL;
}
forward_wait(msg);
init_header(&header, msg, SLURM_PROTOCOL_NO_FLAGS);
/*
* Pack header into buffer for transmission
*/
buffer = init_buf(BUF_SIZE);
pack_header(&header, buffer);
/*
* Pack auth credential
*/
rc = g_slurm_auth_pack(auth_cred, buffer);
(void) g_slurm_auth_destroy(auth_cred);
if (rc) {
error("authentication: %s",
g_slurm_auth_errstr(g_slurm_auth_errno(auth_cred)));
free_buf(buffer);
slurm_seterrno_ret(SLURM_PROTOCOL_AUTHENTICATION_ERROR);
}
/*
* Pack message into buffer
*/
_pack_msg(msg, &header, buffer);
#if _DEBUG
_print_data (get_buf_data(buffer),get_buf_offset(buffer));
#endif
/*
* Send message
*/
rc = _slurm_msg_sendto( fd, get_buf_data(buffer),
get_buf_offset(buffer),
SLURM_PROTOCOL_NO_SEND_RECV_FLAGS );
if (rc < 0)
error("slurm_msg_sendto: %m");
free_buf(buffer);
return rc;
}
/**********************************************************************\
* stream functions
\**********************************************************************/
/* slurm_listen_stream
* opens a stream server and listens on it
* IN slurm_address - slurm_addr to bind the server stream to
* RET slurm_fd - file descriptor of the stream created
*/
slurm_fd slurm_listen_stream(slurm_addr * slurm_address)
{
return _slurm_listen_stream(slurm_address);
}
/* slurm_accept_stream
* accepts a incomming stream connection on a stream server slurm_fd
* IN open_fd - file descriptor to accept connection on
* OUT slurm_address - slurm_addr of the accepted connection
* RET slurm_fd - file descriptor of the accepted connection
*/
slurm_fd slurm_accept_stream(slurm_fd open_fd, slurm_addr * slurm_address)
{
return _slurm_accept_stream(open_fd, slurm_address);
}
/* slurm_open_stream
* opens a client connection to stream server
* IN slurm_address - slurm_addr of the connection destination
* RET slurm_fd - file descriptor of the connection created
* NOTE: Retry with various ports as needed if connection is refused
*/
slurm_fd slurm_open_stream(slurm_addr * slurm_address)
{
return _slurm_open_stream(slurm_address, true);
}
/* slurm_write_stream
* writes a buffer out a stream file descriptor
* IN open_fd - file descriptor to write on
* IN buffer - buffer to send
* IN size - size of buffer send
* IN timeout - how long to wait in milliseconds
* RET size_t - bytes sent , or -1 on errror
*/
size_t slurm_write_stream(slurm_fd open_fd, char *buffer, size_t size)
{
return _slurm_send_timeout(open_fd, buffer, size,
SLURM_PROTOCOL_NO_SEND_RECV_FLAGS,
SLURM_MESSAGE_TIMEOUT_MSEC_STATIC);
}
size_t slurm_write_stream_timeout(slurm_fd open_fd, char *buffer,
size_t size, int timeout)
{
return _slurm_send_timeout(open_fd, buffer, size,
SLURM_PROTOCOL_NO_SEND_RECV_FLAGS,
timeout);
}
/* slurm_read_stream
* read into buffer grom a stream file descriptor
* IN open_fd - file descriptor to read from
* OUT buffer - buffer to receive into
* IN size - size of buffer
* IN timeout - how long to wait in milliseconds
* RET size_t - bytes read , or -1 on errror
*/
size_t slurm_read_stream(slurm_fd open_fd, char *buffer, size_t size)
{
return _slurm_recv_timeout(open_fd, buffer, size,
SLURM_PROTOCOL_NO_SEND_RECV_FLAGS,
SLURM_MESSAGE_TIMEOUT_MSEC_STATIC);
}
size_t slurm_read_stream_timeout(slurm_fd open_fd, char *buffer,
size_t size, int timeout)
{
return _slurm_recv_timeout(open_fd, buffer, size,
SLURM_PROTOCOL_NO_SEND_RECV_FLAGS,
timeout);
}
/* slurm_get_stream_addr
* esentially a encapsilated get_sockname
* IN open_fd - file descriptor to retreive slurm_addr for
* OUT address - address that open_fd to bound to
*/
int slurm_get_stream_addr(slurm_fd open_fd, slurm_addr * address)
{
return _slurm_get_stream_addr(open_fd, address);
}
/* slurm_close_stream
* closes either a server or client stream file_descriptor
* IN open_fd - an open file descriptor to close
* RET int - the return code
*/
int slurm_close_stream(slurm_fd open_fd)
{
return _slurm_close_stream(open_fd);
}
/* make an open slurm connection blocking or non-blocking
* (i.e. wait or do not wait for i/o completion )
* IN open_fd - an open file descriptor to change the effect
* RET int - the return code
*/
int slurm_set_stream_non_blocking(slurm_fd open_fd)
{
return _slurm_set_stream_non_blocking(open_fd);
}
int slurm_set_stream_blocking(slurm_fd open_fd)
{
return _slurm_set_stream_blocking(open_fd);
}
/**********************************************************************\
* address conversion and management functions
\**********************************************************************/
/* slurm_set_addr_uint
* initializes the slurm_address with the supplied port and ip_address
* OUT slurm_address - slurm_addr to be filled in
* IN port - port in host order
* IN ip_address - ipv4 address in uint32 host order form
*/
void slurm_set_addr_uint(slurm_addr * slurm_address, uint16_t port,
uint32_t ip_address)
{
_slurm_set_addr_uint(slurm_address, port, ip_address);
}
/* slurm_set_addr_any
* initialized the slurm_address with the supplied port on INADDR_ANY
* OUT slurm_address - slurm_addr to be filled in
* IN port - port in host order
*/
void slurm_set_addr_any(slurm_addr * slurm_address, uint16_t port)
{
_slurm_set_addr_uint(slurm_address, port, SLURM_INADDR_ANY);
}
/* slurm_set_addr
* initializes the slurm_address with the supplied port and host name
* OUT slurm_address - slurm_addr to be filled in
* IN port - port in host order
* IN host - hostname or dns name
*/
void slurm_set_addr(slurm_addr * slurm_address, uint16_t port, char *host)
{
_slurm_set_addr_char(slurm_address, port, host);
}
/* reset_slurm_addr
* resets the address field of a slurm_addr, port and family unchanged
* OUT slurm_address - slurm_addr to be reset in
* IN new_address - source of address to write into slurm_address
*/
void reset_slurm_addr(slurm_addr * slurm_address, slurm_addr new_address)
{
_reset_slurm_addr(slurm_address, new_address);
}
/* slurm_set_addr_char
* initializes the slurm_address with the supplied port and host
* OUT slurm_address - slurm_addr to be filled in
* IN port - port in host order
* IN host - hostname or dns name
*/
void slurm_set_addr_char(slurm_addr * slurm_address, uint16_t port,
char *host)
{
_slurm_set_addr_char(slurm_address, port, host);
}
/* slurm_get_addr
* given a slurm_address it returns its port and hostname
* IN slurm_address - slurm_addr to be queried
* OUT port - port number
* OUT host - hostname
* IN buf_len - length of hostname buffer
*/
void slurm_get_addr(slurm_addr * slurm_address, uint16_t * port,
char *host, unsigned int buf_len)
{
_slurm_get_addr(slurm_address, port, host, buf_len);
}
/* slurm_get_ip_str
* given a slurm_address it returns its port and ip address string
* IN slurm_address - slurm_addr to be queried
* OUT port - port number
* OUT ip - ip address in dotted-quad string form
* IN buf_len - length of ip buffer
*/
void slurm_get_ip_str(slurm_addr * slurm_address, uint16_t * port,
char *ip, unsigned int buf_len)
{
unsigned char *uc = (unsigned char *)&slurm_address->sin_addr.s_addr;
*port = slurm_address->sin_port;
snprintf(ip, buf_len, "%u.%u.%u.%u", uc[0], uc[1], uc[2], uc[3]);
}
/* slurm_get_peer_addr
* get the slurm address of the peer connection, similar to getpeeraddr
* IN fd - an open connection
* OUT slurm_address - place to park the peer's slurm_addr
*/
int slurm_get_peer_addr(slurm_fd fd, slurm_addr * slurm_address)
{
struct sockaddr name;
socklen_t namelen = (socklen_t) sizeof(struct sockaddr);
int rc;
if ((rc = _slurm_getpeername((int) fd, &name, &namelen)))
return rc;
memcpy(slurm_address, &name, sizeof(slurm_addr));
return 0;
}
/* slurm_print_slurm_addr
* prints a slurm_addr into a buf
* IN address - slurm_addr to print
* IN buf - space for string representation of slurm_addr
* IN n - max number of bytes to write (including NUL)
*/
void slurm_print_slurm_addr(slurm_addr * address, char *buf, size_t n)
{
_slurm_print_slurm_addr(address, buf, n);
}
/**********************************************************************\
* slurm_addr pack routines
\**********************************************************************/
/*
* Pack just the message with no header and send back the buffer.
*/
Buf slurm_pack_msg_no_header(slurm_msg_t * msg)
{
Buf buffer = NULL;
buffer = init_buf(0);
/*
* Pack message into buffer
*/
pack_msg(msg, buffer);
return buffer;
}
/* slurm_pack_slurm_addr
* packs a slurm_addr into a buffer to serialization transport
* IN slurm_address - slurm_addr to pack
* IN/OUT buffer - buffer to pack the slurm_addr into
*/
void slurm_pack_slurm_addr(slurm_addr * slurm_address, Buf buffer)
{
_slurm_pack_slurm_addr(slurm_address, buffer);
}
/* slurm_pack_slurm_addr
* unpacks a buffer into a slurm_addr after serialization transport
* OUT slurm_address - slurm_addr to unpack to
* IN/OUT buffer - buffer to upack the slurm_addr from
* returns - SLURM error code
*/
int slurm_unpack_slurm_addr_no_alloc(slurm_addr * slurm_address,
Buf buffer)
{
return _slurm_unpack_slurm_addr_no_alloc(slurm_address, buffer);
}
/**********************************************************************\
* simplified communication routines
* They open a connection do work then close the connection all within
* the function
\**********************************************************************/
/* slurm_send_rc_msg
* given the original request message this function sends a
* slurm_return_code message back to the client that made the request
* IN request_msg - slurm_msg the request msg
* IN rc - the return_code to send back to the client
*/
int slurm_send_rc_msg(slurm_msg_t *msg, int rc)
{
slurm_msg_t resp_msg;
return_code_msg_t rc_msg;
if (msg->conn_fd < 0)
return (ENOTCONN);
rc_msg.return_code = rc;
resp_msg.address = msg->address;
resp_msg.msg_type = RESPONSE_SLURM_RC;
resp_msg.data = &rc_msg;
resp_msg.forward = msg->forward;
resp_msg.forward_struct = msg->forward_struct;
resp_msg.forward_struct_init = msg->forward_struct_init;
resp_msg.ret_list = msg->ret_list;
resp_msg.orig_addr = msg->orig_addr;
resp_msg.srun_node_id = msg->srun_node_id;
/* send message */
return slurm_send_node_msg(msg->conn_fd, &resp_msg);
}
/*
* Send and recv a slurm request and response on the open slurm descriptor
*/
static List
_send_and_recv_msg(slurm_fd fd, slurm_msg_t *req,
slurm_msg_t *resp, int timeout)
{
int err = SLURM_SUCCESS;
int retry = 0;
List ret_list = NULL;
int steps = 0;
if (slurm_send_node_msg(fd, req) < 0)
err = errno;
if(err == SLURM_SUCCESS) {
if ((timeout*=1000) == 0)
timeout = SLURM_MESSAGE_TIMEOUT_MSEC_STATIC;
if(req->forward.cnt>0) {
steps = req->forward.cnt/slurm_get_tree_width();
steps += 1;
timeout += (req->forward.timeout*steps);
}
ret_list = slurm_receive_msg(fd, resp, timeout);
}
/* if(!ret_list || list_count(ret_list) == 0) { */
/* no_resp_forwards(&req->forward, &ret_list, errno); */
/* } */
/*
* Attempt to close an open connection
*/
while ((slurm_shutdown_msg_conn(fd) < 0) && (errno == EINTR) ) {
if (retry++ > MAX_SHUTDOWN_RETRY) {
err = errno;
break;
}
}
if (err)
errno = err;
return ret_list;
}
/*
* slurm_send_recv_controller_msg
* opens a connection to the controller, sends the controller a message,
* listens for the response, then closes the connection
* IN request_msg - slurm_msg request
* OUT response_msg - slurm_msg response
* RET int - return code
*/
int slurm_send_recv_controller_msg(slurm_msg_t *req, slurm_msg_t *resp)
{
slurm_fd fd = -1;
int rc = SLURM_SUCCESS;
time_t start_time = time(NULL);
List ret_list = NULL;
int retry = 1;
slurm_ctl_conf_t *conf;
bool backup_controller_flag;
uint16_t slurmctld_timeout;
forward_init(&req->forward, NULL);
req->ret_list = NULL;
req->orig_addr.sin_addr.s_addr = 0;
req->forward_struct_init = 0;
if ((fd = slurm_open_controller_conn()) < 0) {
rc = SLURM_SOCKET_ERROR;
goto cleanup;
}
//info("here 2");
conf = slurm_conf_lock();
backup_controller_flag = conf->backup_controller ? true : false;
slurmctld_timeout = conf->slurmctld_timeout;
slurm_conf_unlock();
while(retry) {
retry = 0;
/* If the backup controller is in the process of assuming
* control, we sleep and retry later */
ret_list = _send_and_recv_msg(fd, req, resp, 0);
if(errno == SLURM_SUCCESS)
g_slurm_auth_destroy(resp->auth_cred);
rc = errno;
if(ret_list) {
if(list_count(ret_list)>0) {
error("We didn't do things correctly "
"missed %d responses",
list_count(ret_list));
}
list_destroy(ret_list);
}
if((rc == SLURM_SUCCESS) &&
(resp->msg_type == RESPONSE_SLURM_RC) &&
((((return_code_msg_t *) resp->data)->return_code)
== ESLURM_IN_STANDBY_MODE) &&
(req->msg_type != MESSAGE_NODE_REGISTRATION_STATUS) &&
(backup_controller_flag) &&
(difftime(time(NULL), start_time) <
(slurmctld_timeout + (slurmctld_timeout / 2)))) {
debug("Neither primary nor backup controller "
"responding, sleep and retry");
slurm_free_return_code_msg(resp->data);
sleep(30);
if ((fd = slurm_open_controller_conn()) < 0)
rc = SLURM_SOCKET_ERROR;
else
retry = 1;
}
if(rc != SLURM_SUCCESS)
break;
}
cleanup:
if (rc != SLURM_SUCCESS)
_remap_slurmctld_errno();
return rc;
}
/* slurm_send_recv_node_msg
* opens a connection to node, sends the node a message, listens
* for the response, then closes the connection
* IN request_msg - slurm_msg request
* OUT response_msg - slurm_msg response
* RET List - return list from multiple nodes
*/
List slurm_send_recv_node_msg(slurm_msg_t *req, slurm_msg_t *resp, int timeout)
{
slurm_fd fd = -1;
if ((fd = slurm_open_msg_conn(&req->address)) < 0)
return NULL;
return _send_and_recv_msg(fd, req, resp, timeout);
}
/* slurm_send_only_controller_msg
* opens a connection to the controller, sends the controller a
* message then, closes the connection
* IN request_msg - slurm_msg request
* RET int - return code
*/
int slurm_send_only_controller_msg(slurm_msg_t *req)
{
int rc = SLURM_SUCCESS;
int retry = 0;
slurm_fd fd = -1;
/*
* Open connection to SLURM controller:
*/
if ((fd = slurm_open_controller_conn()) < 0) {
rc = SLURM_SOCKET_ERROR;
goto cleanup;
}
rc = slurm_send_node_msg(fd, req);
/*
* Attempt to close an open connection
*/
while ( (slurm_shutdown_msg_conn(fd) < 0) && (errno == EINTR) ) {
if (retry++ > MAX_SHUTDOWN_RETRY) {
rc = SLURM_SOCKET_ERROR;
goto cleanup;
}
}
cleanup:
if (rc != SLURM_SUCCESS)
_remap_slurmctld_errno();
return rc;
}
/*
* Open a connection to the "address" specified in the slurm msg `req'
* Then, immediately close the connection w/out waiting for a reply.
*
* Returns SLURM_SUCCESS on success SLURM_FAILURE (< 0) for failure.
*/
int slurm_send_only_node_msg(slurm_msg_t *req)
{
int rc = SLURM_SUCCESS;
int retry = 0;
slurm_fd fd = -1;
if ((fd = slurm_open_msg_conn(&req->address)) < 0)
return SLURM_SOCKET_ERROR;
rc = slurm_send_node_msg(fd, req);
/*
* Attempt to close an open connection
*/
while ( (slurm_shutdown_msg_conn(fd) < 0) && (errno == EINTR) ) {
if (retry++ > MAX_SHUTDOWN_RETRY)
return SLURM_SOCKET_ERROR;
}
return rc;
}
/*
* Send message and recv "return code" message on an already open
* slurm file descriptor
*/
static List _send_recv_rc_msg(slurm_fd fd, slurm_msg_t *req, int timeout)
{
slurm_msg_t msg;
List ret_list = NULL;
ListIterator itr = NULL;
ret_types_t *ret_type = NULL;
ret_data_info_t *ret_data_info = NULL;
int msg_rc;
int set = 0;
int err;
//info("here 1");
ret_list = _send_and_recv_msg(fd, req, &msg, timeout);
if(!ret_list) {
return ret_list;
}
err = errno;
if(errno != SLURM_SUCCESS)
msg_rc = SLURM_ERROR;
else {
msg_rc = ((return_code_msg_t *)msg.data)->return_code;
slurm_free_return_code_msg(msg.data);
g_slurm_auth_destroy(msg.auth_cred);
}
ret_data_info = xmalloc(sizeof(ret_data_info_t));
ret_data_info->node_name = xstrdup("localhost");
ret_data_info->data = NULL;
debug3("got reply for %s rc %d %d",
ret_data_info->node_name,
msg_rc,
errno);
itr = list_iterator_create(ret_list);
while((ret_type = list_next(itr)) != NULL) {
if(ret_type->msg_rc == msg_rc) {
list_push(ret_type->ret_data_list, ret_data_info);
set = 1;
break;
}
}
list_iterator_destroy(itr);
if(!set) {
ret_type = xmalloc(sizeof(ret_types_t));
list_push(ret_list, ret_type);
ret_type->type = msg.msg_type;
ret_type->msg_rc = msg_rc;
ret_type->err = err;
ret_type->ret_data_list = list_create(destroy_data_info);
list_push(ret_type->ret_data_list, ret_data_info);
}
errno = err;
return ret_list;
}
List slurm_send_recv_rc_packed_msg(slurm_msg_t *msg, int timeout)
{
slurm_fd fd = -1;
int err = SLURM_SUCCESS;
int steps = 0;
slurm_msg_t resp;
List ret_list = NULL;
ListIterator itr;
ret_types_t *ret_type = NULL;
ret_data_info_t *ret_data_info = NULL;
int msg_rc;
int set = 0;
int retry = 0;
if(!msg->buffer) {
err = SLURMCTLD_COMMUNICATIONS_SEND_ERROR;
goto failed;
}
if ((fd = slurm_open_msg_conn(&msg->address)) < 0) {
err = SLURM_SOCKET_ERROR;
goto failed;
}
if(slurm_add_header_and_send(fd, msg) >= 0) {
if ((timeout*=1000) == 0)
timeout = SLURM_MESSAGE_TIMEOUT_MSEC_STATIC;
if(msg->forward.cnt>0) {
steps = msg->forward.cnt/slurm_get_tree_width();
steps += 1;
timeout += (msg->forward.timeout*steps);
}
ret_list = slurm_receive_msg(fd, &resp, timeout);
}
err = errno;
/*
* Attempt to close an open connection
*/
while ((slurm_shutdown_msg_conn(fd) < 0) && (errno == EINTR) ) {
if (retry++ > MAX_RETRIES) {
err = errno;
break;
}
}
failed:
if(!ret_list || list_count(ret_list) == 0) {
no_resp_forwards(&msg->forward, &ret_list, err);
}
if(err != SLURM_SUCCESS)
msg_rc = SLURM_ERROR;
else {
msg_rc = ((return_code_msg_t *)resp.data)->return_code;
slurm_free_return_code_msg(resp.data);
g_slurm_auth_destroy(resp.auth_cred);
}
ret_data_info = xmalloc(sizeof(ret_data_info_t));
ret_data_info->node_name = xstrdup("localhost");
ret_data_info->data = NULL;
itr = list_iterator_create(ret_list);
while((ret_type = list_next(itr)) != NULL) {
if(ret_type->msg_rc == msg_rc) {
list_push(ret_type->ret_data_list, ret_data_info);
set = 1;
break;
}
}
list_iterator_destroy(itr);
if(!set) {
ret_type = xmalloc(sizeof(ret_types_t));
list_push(ret_list, ret_type);
ret_type->type = resp.msg_type;
ret_type->msg_rc = msg_rc;
ret_type->err = err;
ret_type->ret_data_list = list_create(destroy_data_info);
list_push(ret_type->ret_data_list, ret_data_info);
}
return ret_list;
}
/*
* Open a connection to the "address" specified in the the slurm msg "req"
* Then read back an "rc" message returning the "return_code" specified
* in the response in the "rc" parameter.
*/
List slurm_send_recv_rc_msg(slurm_msg_t *req, int timeout)
{
slurm_fd fd = -1;
if ((fd = slurm_open_msg_conn(&req->address)) < 0) {
errno = SLURM_SOCKET_ERROR;
return NULL;
}
return _send_recv_rc_msg(fd, req, timeout);
}
/*
* Open a connection to the "address" specified in the the slurm msg "req"
* Then read back an "rc" message returning the "return_code" specified
* in the response in the "rc" parameter.
*/
int slurm_send_recv_rc_msg_only_one(slurm_msg_t *req, int *rc, int timeout)
{
slurm_fd fd = -1;
List ret_list = NULL;
ret_types_t *ret_type = NULL;
int ret_c = SLURM_SUCCESS;
forward_init(&req->forward, NULL);
req->ret_list = NULL;
req->orig_addr.sin_addr.s_addr = 0;
/* no need to init forward_struct_init here */
if ((fd = slurm_open_msg_conn(&req->address)) < 0) {
return SLURM_SOCKET_ERROR;
}
ret_list = _send_recv_rc_msg(fd, req, timeout);
if(ret_list) {
if(list_count(ret_list)>1)
error("Got %d, expecting 1 from message receiving",
list_count(ret_list));
ret_type = list_pop(ret_list);
if(ret_type) {
*rc = ret_type->msg_rc;
ret_c = ret_type->err;
destroy_ret_types(ret_type);
}
list_destroy(ret_list);
} else
ret_c = SLURM_ERROR;
return ret_c;
}
/*
* Same as above, but send message to controller
*/
int slurm_send_recv_controller_rc_msg(slurm_msg_t *req, int *rc)
{
slurm_fd fd = -1;
List ret_list = NULL;
ret_types_t *ret_type = NULL;
int ret_val = SLURM_SUCCESS;
forward_init(&req->forward, NULL);
req->ret_list = NULL;
req->orig_addr.sin_addr.s_addr = 0;
/* no need to init forward_struct_init here */
if ((fd = slurm_open_controller_conn()) < 0)
return SLURM_SOCKET_ERROR;
ret_list = _send_recv_rc_msg(fd, req, 0);
if(ret_list) {
if(list_count(ret_list)>1)
error("controller_rc_msg: Got %d instead of 1 back",
list_count(ret_list));
ret_type = list_pop(ret_list);
if(ret_type) {
*rc = ret_type->msg_rc;
ret_val = ret_type->err;
destroy_ret_types(ret_type);
}
list_destroy(ret_list);
} else
ret_val = SLURM_ERROR;
return ret_val;
}
extern int *set_span(int total, uint16_t tree_width)
{
int *span = NULL;
int left = total;
int i = 0;
if (tree_width == 0)
tree_width = slurm_get_tree_width();
span = xmalloc(sizeof(int) * tree_width);
//info("span count = %d", tree_width);
memset(span, 0, tree_width);
if(total <= tree_width) {
return span;
}
while(left > 0) {
for(i = 0; i < tree_width; i++) {
if((tree_width-i) >= left) {
if(span[i] == 0) {
left = 0;
break;
} else {
span[i] += left;
left = 0;
break;
}
} else if(left <= tree_width) {
span[i] += left;
left = 0;
break;
}
span[i] += tree_width;
left -= tree_width;
}
}
return span;
}
/*
* Free a slurm message
*/
void slurm_free_msg(slurm_msg_t * msg)
{
(void) g_slurm_auth_destroy(msg->auth_cred);
if(msg->ret_list) {
list_destroy(msg->ret_list);
msg->ret_list = NULL;
}
xfree(msg);
}
/*
* Free just the credential of a message
* Needed by the api.
*/
void slurm_auth_cred_destroy(void *auth_cred)
{
(void) g_slurm_auth_destroy(auth_cred);
}
int convert_to_kilo(int number, char *tmp)
{
int i;
if(number >= 1024) {
i = number % 1024;
if(i > 0) {
i *= 10;
i /= 1024;
sprintf(tmp, "%d.%dk", number/1024, i);
} else
sprintf(tmp, "%dk", number/1024);
} else
sprintf(tmp, "%d", number);
return SLURM_SUCCESS;
}
#if _DEBUG
static void _print_data(char *data, int len)
{
int i;
for (i = 0; i < len; i++) {
if ((i % 10 == 0) && (i != 0))
printf("\n");
printf("%2.2x ", ((int) data[i] & 0xff));
if (i >= 200)
break;
}
printf("\n\n");
}
#endif
/*
* vi: shiftwidth=8 tabstop=8 expandtab
*/