From bf0837c748d72dfd0b8d25aa4145a4b3b6e5480c Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Thu, 13 Mar 2008 23:00:45 +0000 Subject: [PATCH] revert last set of checkins for accounting message traffic re-design and try again later --- src/common/slurm_accounting_storage.c | 13 +-- src/common/slurm_accounting_storage.h | 10 +- src/common/slurmdbd_defs.c | 82 +------------- src/common/slurmdbd_defs.h | 45 ++------ .../filetxt/accounting_storage_filetxt.c | 6 - .../gold/accounting_storage_gold.c | 5 - .../mysql/accounting_storage_mysql.c | 5 - .../none/accounting_storage_none.c | 5 - .../pgsql/accounting_storage_pgsql.c | 5 - .../slurmdbd/accounting_storage_slurmdbd.c | 30 ++--- src/slurmctld/assoc_mgr.c | 107 +----------------- src/slurmctld/assoc_mgr.h | 9 +- src/slurmctld/controller.c | 12 +- src/slurmdbd/proc_req.c | 29 ++--- src/slurmdbd/proc_req.h | 11 +- src/slurmdbd/rpc_mgr.c | 41 +------ 16 files changed, 49 insertions(+), 366 deletions(-) diff --git a/src/common/slurm_accounting_storage.c b/src/common/slurm_accounting_storage.c index b01ff52d40b..ea8239ca4af 100644 --- a/src/common/slurm_accounting_storage.c +++ b/src/common/slurm_accounting_storage.c @@ -1,5 +1,5 @@ /*****************************************************************************\ - * slurm_accounting_storage.c - account storage plugin wrapper. + * slurm_accounting_storage.c - account torage plugin wrapper. * * $Id: slurm_accounting_storage.c 10744 2007-01-11 20:09:18Z da $ ***************************************************************************** @@ -58,8 +58,7 @@ */ typedef struct slurm_acct_storage_ops { - int (*set_msg_port) (uint16_t port); - void *(*get_conn) (void); + void *(*get_conn) (); int (*close_conn) (void *db_conn); int (*add_users) (void *db_conn, List user_list); @@ -188,7 +187,6 @@ static slurm_acct_storage_ops_t * _acct_storage_get_ops( * Must be synchronized with slurm_acct_storage_ops_t above. */ static const char *syms[] = { - "acct_storage_p_set_msg_port", "acct_storage_p_get_connection", "acct_storage_p_close_connection", "acct_storage_p_add_users", @@ -1242,13 +1240,6 @@ extern int slurm_acct_storage_fini(void) return rc; } -extern int acct_storage_g_set_msg_port(uint16_t port) -{ - if (slurm_acct_storage_init(NULL) < 0) - return SLURM_ERROR; - return (*(g_acct_storage_context->ops.set_msg_port))(port); -} - extern void *acct_storage_g_get_connection() { if (slurm_acct_storage_init(NULL) < 0) diff --git a/src/common/slurm_accounting_storage.h b/src/common/slurm_accounting_storage.h index 5db408f06dc..92f778ca057 100644 --- a/src/common/slurm_accounting_storage.h +++ b/src/common/slurm_accounting_storage.h @@ -194,19 +194,11 @@ extern acct_admin_level_t str_2_acct_admin_level(char *level); extern int slurm_acct_storage_init(char *loc); /* load the plugin */ extern int slurm_acct_storage_fini(void); /* unload the plugin */ -/* - * If running on slurmctld, specify a port for messages from SlurmDBD - * to slurmctld for notification of database changes - * IN: port - socket port open for incoming message - * RET: SLURM_SUCCESS on success SLURM_ERROR else - */ -extern int acct_storage_g_set_msg_port(uint16_t port); - /* * get a new connection to the storage unit * RET: pointer used to access db */ -extern void *acct_storage_g_get_connection(void); +extern void *acct_storage_g_get_connection(); /* * release connection to the storage unit diff --git a/src/common/slurmdbd_defs.c b/src/common/slurmdbd_defs.c index ab385535859..fd26de7bcd8 100644 --- a/src/common/slurmdbd_defs.c +++ b/src/common/slurmdbd_defs.c @@ -83,9 +83,7 @@ static time_t agent_shutdown = 0; static pthread_mutex_t slurmdbd_lock = PTHREAD_MUTEX_INITIALIZER; static slurm_fd slurmdbd_fd = -1; -static uint16_t slurmctld_port = 0; static char * slurmdbd_auth_info = NULL; -static char * slurmctld_cluster_name = NULL; static void * _agent(void *x); static void _agent_queue_del(void *x); @@ -113,8 +111,7 @@ static int _tot_wait (struct timeval *start_time); ****************************************************************************/ /* Open a socket connection to SlurmDbd */ -extern int slurm_open_slurmdbd_conn(char *auth_info, - uint16_t port, char *cluster_name) +extern int slurm_open_slurmdbd_conn(char *auth_info) { slurm_mutex_lock(&agent_lock); if ((agent_tid == 0) || (agent_list == NULL)) @@ -125,9 +122,6 @@ extern int slurm_open_slurmdbd_conn(char *auth_info, xfree(slurmdbd_auth_info); if (auth_info) slurmdbd_auth_info = xstrdup(auth_info); - slurmctld_port = port; - if (cluster_name) - slurmctld_cluster_name = xstrdup(cluster_name); if (slurmdbd_fd < 0) _open_slurmdbd_fd(); slurm_mutex_unlock(&slurmdbd_lock); @@ -144,70 +138,11 @@ extern int slurm_close_slurmdbd_conn(void) slurm_mutex_lock(&slurmdbd_lock); _close_slurmdbd_fd(); xfree(slurmdbd_auth_info); - xfree(slurmctld_cluster_name); - slurmctld_port = 0; slurm_mutex_unlock(&slurmdbd_lock); return SLURM_SUCCESS; } -/* - * Receive a message from the SlurmDBD and authenticate it - * IN: fd - the open file to be read from - * OUT: msg the message from SlurmDBD, must be freed by the caller - * Returns SLURM_SUCCESS or an error code - */ -extern int slurm_recv_slurmdbd_msg(slurm_fd fd, slurmdbd_msg_t *msg) -{ - char *in_msg = NULL; - Buf buffer; - uint32_t nw_size, msg_size; - ssize_t msg_read, offset; - int rc = SLURM_ERROR; - - if (!_fd_readable(fd)) { - error("Premature close from slurmdbd"); - return rc; - } - msg_read = read(fd, &nw_size, sizeof(nw_size)); - if (msg_read == 0) { - error("Premature EOF from slurmdbd"); - return rc; - } - if (msg_read != sizeof(nw_size)) { - error("Could not read msg_size from slurmdbd"); - return rc; - } - msg_size = ntohl(nw_size); - if ((msg_size < 2) || (msg_size > 1000000)) { - error("Invalid msg_size (%u) from slurmdbd", - msg_size); - return SLURM_ERROR; - } - - buffer = init_buf(msg_size); - in_msg = get_buf_data(buffer);; - offset = 0; - while (msg_size > offset) { - if (!_fd_readable(fd)) - break; /* problem with this socket */ - msg_read = read(fd, (in_msg + offset), - (msg_size - offset)); - if (msg_read <= 0) { - error("read(%d): %m", fd); - break; - } - offset += msg_read; - } - if (msg_size != offset) { - error("Could not read full message from slurmdbd"); - } else { - rc = unpack_slurmdbd_msg(msg, buffer); - } - free_buf(buffer); - return rc; -} - /* Send an RPC to the SlurmDBD and wait for the return code reply. * The RPC will not be queued if an error occurs. * Returns SLURM_SUCCESS or an error code */ @@ -598,9 +533,7 @@ static int _send_init_msg(void) buffer = init_buf(1024); pack16((uint16_t) DBD_INIT, buffer); - req.version = SLURMDBD_VERSION; - req.slurmctld_port = slurmctld_port; - req.cluster_name = slurmctld_cluster_name; + req.version = SLURMDBD_VERSION; slurmdbd_pack_init_msg(&req, buffer, slurmdbd_auth_info); rc = _send_msg(buffer); @@ -1258,10 +1191,7 @@ void inline slurmdbd_free_get_jobs_msg(dbd_get_jobs_msg_t *msg) void inline slurmdbd_free_init_msg(dbd_init_msg_t *msg) { - if (msg) { - xfree(msg->cluster_name); - xfree(msg); - } + xfree(msg); } void inline slurmdbd_free_job_complete_msg(dbd_job_comp_msg_t *msg) @@ -1619,8 +1549,6 @@ slurmdbd_pack_init_msg(dbd_init_msg_t *msg, Buf buffer, char *auth_info) int rc; void *auth_cred; - packstr(msg->cluster_name, buffer); - pack16(msg->slurmctld_port, buffer); pack16(msg->version, buffer); auth_cred = g_slurm_auth_create(NULL, 2, auth_info); if (auth_cred == NULL) { @@ -1641,12 +1569,9 @@ int inline slurmdbd_unpack_init_msg(dbd_init_msg_t **msg, Buf buffer, char *auth_info) { void *auth_cred; - uint32_t uint32_tmp; dbd_init_msg_t *msg_ptr = xmalloc(sizeof(dbd_init_msg_t)); *msg = msg_ptr; - safe_unpackstr_xmalloc(&msg_ptr->cluster_name, &uint32_tmp, buffer); - safe_unpack16(&msg_ptr->slurmctld_port, buffer); safe_unpack16(&msg_ptr->version, buffer); auth_cred = g_slurm_auth_unpack(buffer); if (auth_cred == NULL) { @@ -1659,7 +1584,6 @@ slurmdbd_unpack_init_msg(dbd_init_msg_t **msg, Buf buffer, char *auth_info) return SLURM_SUCCESS; unpack_error: - xfree(msg_ptr->cluster_name); xfree(msg_ptr); *msg = NULL; return SLURM_ERROR; diff --git a/src/common/slurmdbd_defs.h b/src/common/slurmdbd_defs.h index 1292f03dcee..1b3afe5febd 100644 --- a/src/common/slurmdbd_defs.h +++ b/src/common/slurmdbd_defs.h @@ -107,7 +107,7 @@ typedef enum { * an account */ DBD_REMOVE_ASSOCS, /* Remove existing association */ DBD_REMOVE_CLUSTERS, /* Remove existing cluster */ - DBD_REMOVE_USERS, /* Remove existing user */ + DBD_REMOVE_USERS, /* Remove existing user */ DBD_STEP_COMPLETE, /* Record step completion */ DBD_STEP_START /* Record step starting */ } slurmdbd_msg_type_t; @@ -145,20 +145,17 @@ typedef struct { } dbd_usage_msg_t; typedef struct dbd_get_jobs_msg { - char *cluster_name; /* name of cluster to query */ - uint32_t gid; /* group id */ - List selected_steps; /* List of jobacct_selected_step_t *'s */ - List selected_parts; /* List of char *'s */ - char *user; /* user name */ + char *cluster_name; /* name of cluster to query */ + uint32_t gid; /* group id */ + List selected_steps; /* List of jobacct_selected_step_t *'s */ + List selected_parts; /* List of char *'s */ + char *user; /* user name */ } dbd_get_jobs_msg_t; typedef struct dbd_init_msg { - char *cluster_name; /* name of cluster to query */ - uint16_t slurmctld_port;/* port on slurmctld to process messages - * originating on slurmdbd */ + uint16_t version; /* protocol version */ uint32_t uid; /* UID originating connection, * filled by authtentication plugin*/ - uint16_t version; /* protocol version */ } dbd_init_msg_t; typedef struct dbd_job_comp_msg { @@ -210,8 +207,8 @@ typedef struct dbd_job_suspend_msg { } dbd_job_suspend_msg_t; typedef struct { - List my_list; /* this list could be of any type as long as it - * is handled correctly on both ends */ + List my_list; /* this list could be of any type as long as it + * is handled correctly on both ends */ } dbd_list_msg_t; typedef struct { @@ -264,32 +261,12 @@ typedef struct dbd_step_start_msg { * Slurm DBD message processing functions \*****************************************************************************/ -/* - * Open a socket connection to SlurmDbd using SlurmdbdAuthInfo specified - * IN: auth_info - If Munge is used for authentication, this will be the - * pathname to the named socket used if not the default - * socket. Typically used if one Munge key and credential - * is used within a cluster and a different for site-wide - * authentication. - * IN: port - socket port used for message originating in SlurmDdb and - * sent to slurmctld or zero if connect is not from slurmctld - * IN: cluster_name - cluster name or NULL if connect is not from slurmctld - * Returns SLURM_SUCCESS or an error code - */ -extern int slurm_open_slurmdbd_conn(char *auth_info, - uint16_t port, char *cluster_name); +/* Open a socket connection to SlurmDbd using SlurmdbdAuthInfo specified */ +extern int slurm_open_slurmdbd_conn(char *auth_info); /* Close the SlurmDBD socket connection */ extern int slurm_close_slurmdbd_conn(void); -/* - * Receive a message from the SlurmDBD and authenticate it - * IN: fd - the open file to be read from - * OUT: msg the message from SlurmDBD, must be freed by the caller - * Returns SLURM_SUCCESS or an error code - */ -extern int slurm_recv_slurmdbd_msg(slurm_fd fd, slurmdbd_msg_t *msg); - /* Send an RPC to the SlurmDBD. Do not wait for the reply. The RPC * will be queued and processed later if the SlurmDBD is not responding. * Returns SLURM_SUCCESS or an error code */ diff --git a/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c b/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c index f8bdf84e41d..5d5d116d0a6 100644 --- a/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c +++ b/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c @@ -228,12 +228,6 @@ extern int fini ( void ) return SLURM_SUCCESS; } -extern int acct_storage_p_set_msg_port(uint16_t port) -{ - /* unused */ - return SLURM_SUCCESS; -} - extern void * acct_storage_p_get_connection() { return NULL; diff --git a/src/plugins/accounting_storage/gold/accounting_storage_gold.c b/src/plugins/accounting_storage/gold/accounting_storage_gold.c index 2de6663d9ec..6bbb5124964 100644 --- a/src/plugins/accounting_storage/gold/accounting_storage_gold.c +++ b/src/plugins/accounting_storage/gold/accounting_storage_gold.c @@ -713,11 +713,6 @@ extern int fini ( void ) return SLURM_SUCCESS; } -extern int acct_storage_p_set_msg_port(uint16_t port) -{ - return SLURM_SUCCESS; -} - extern void * acct_storage_p_get_connection() { return NULL; diff --git a/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c b/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c index b683a15b517..1d7c62dca32 100644 --- a/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c +++ b/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c @@ -469,11 +469,6 @@ extern int fini ( void ) #endif } -extern int acct_storage_p_set_msg_port(uint16_t port) -{ - return SLURM_SUCCESS; -} - extern void *acct_storage_p_get_connection() { #ifdef HAVE_MYSQL diff --git a/src/plugins/accounting_storage/none/accounting_storage_none.c b/src/plugins/accounting_storage/none/accounting_storage_none.c index bc7133ccee4..9a1759077f6 100644 --- a/src/plugins/accounting_storage/none/accounting_storage_none.c +++ b/src/plugins/accounting_storage/none/accounting_storage_none.c @@ -87,11 +87,6 @@ extern int fini ( void ) return SLURM_SUCCESS; } -extern int acct_storage_p_set_msg_port(uint16_t port) -{ - return SLURM_SUCCESS; -} - extern void * acct_storage_p_get_connection() { return NULL; diff --git a/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c b/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c index 2ba66a42266..df3ca96a223 100644 --- a/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c +++ b/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c @@ -665,11 +665,6 @@ extern int fini ( void ) #endif } -extern int acct_storage_p_set_msg_port(uint16_t port) -{ - return SLURM_SUCCESS; -} - extern void *acct_storage_p_get_connection() { #ifdef HAVE_PGSQL diff --git a/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c b/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c index 06664221c1b..bd618d98b15 100644 --- a/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c +++ b/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c @@ -93,7 +93,6 @@ const char plugin_type[] = "accounting_storage/slurmdbd"; const uint32_t plugin_version = 100; static char *cluster_name = NULL; -static uint16_t slurmctld_port = 0; static char *slurmdbd_auth_info = NULL; /* @@ -106,11 +105,17 @@ extern int init ( void ) if (first) { /* since this can be loaded from many different places - * only tell us once. */ + only tell us once. */ + if (!(cluster_name = slurm_get_cluster_name())) + fatal("%s requires ClusterName in slurm.conf", + plugin_name); + slurmdbd_auth_info = slurm_get_accounting_storage_pass(); if(!slurmdbd_auth_info) + verbose("%s loaded AuthInfo=%s", plugin_name, slurmdbd_auth_info); + slurm_open_slurmdbd_conn(slurmdbd_auth_info); first = 0; } else { @@ -129,27 +134,8 @@ extern int fini ( void ) return SLURM_SUCCESS; } -extern int acct_storage_p_set_msg_port(uint16_t port) +extern void *acct_storage_p_get_connection() { - if (!(cluster_name = slurm_get_cluster_name())) { - error("%s requires ClusterName in slurm.conf", - plugin_name); - return SLURM_ERROR; - } - - slurmctld_port = port; - return SLURM_SUCCESS; -} - -extern void *acct_storage_p_get_connection(void) -{ - static int first = 1; - - if (first) { - slurm_open_slurmdbd_conn(slurmdbd_auth_info, slurmctld_port, - cluster_name); - first = 0; - } return NULL; } diff --git a/src/slurmctld/assoc_mgr.c b/src/slurmctld/assoc_mgr.c index 77883f590c2..48624d88d63 100644 --- a/src/slurmctld/assoc_mgr.c +++ b/src/slurmctld/assoc_mgr.c @@ -34,23 +34,17 @@ * with SLURM; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ -#include <pwd.h> -#include <sys/types.h> #include "assoc_mgr.h" -#include "src/common/slurmdbd_defs.h" #include "src/common/xstring.h" - +#include <sys/types.h> +#include <pwd.h> static List local_association_list = NULL; static List local_user_list = NULL; static pthread_mutex_t local_association_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t local_user_lock = PTHREAD_MUTEX_INITIALIZER; -static slurm_fd assoc_server_fd = -1; -static pthread_t assoc_thread = 0; -static time_t assoc_shutdown = 0; - static int _get_local_association_list(void *db_conn) { acct_association_cond_t assoc_q; @@ -137,84 +131,6 @@ static int _get_local_user_list(void *db_conn) return SLURM_SUCCESS; } -static void _process_free_slurmdbd_msg(slurmdbd_msg_t msg) -{ - switch (msg.msg_type) { - case DBD_RC: - /* Sample code only, process the RPC and free msg */ - info("got from SlurmDBD DBD_RC: %u", - ((dbd_rc_msg_t *)msg.data)->return_code); - slurmdbd_free_rc_msg((dbd_rc_msg_t *)msg.data); - break; - default: - error("Invalid msg_type from slurmdbd: %u", - msg.msg_type); - } -} - -static void *_assoc_agent(void *args) -{ - slurm_fd newsockfd; - slurm_addr cli_addr; - slurmdbd_msg_t msg; - - while (assoc_shutdown == 0) { - if((newsockfd = slurm_accept_msg_conn(assoc_server_fd, - &cli_addr)) == - SLURM_SOCKET_ERROR) { - if(errno != EINTR) - error("slurm_accept_msg_conn: %m"); - continue; - } - if(assoc_shutdown) - break; - if(slurm_recv_slurmdbd_msg(newsockfd, &msg) - != SLURM_SUCCESS) { - error("slurm_recv_slurmdbd_msg: %m"); - } else { - info("Received some message from SlurmDBD"); - /* NOTE: authentication should be handle within the - * message un/pack for relevant messages */ - _process_free_slurmdbd_msg(msg); - } - slurm_shutdown_msg_conn(newsockfd); - } - - return NULL; -} - -extern uint16_t assoc_mgr_server(void) -{ - slurm_addr assoc_addr; - uint16_t assoc_port = 0; - pthread_attr_t attr; - - if (assoc_server_fd >= 0) { - error("Association server already spawned"); - return assoc_port; - } - - assoc_server_fd = slurm_init_msg_engine_port(0); - if (assoc_server_fd < 0) - fatal("slurm_init_msg_engine_port: %m"); - if (slurm_get_stream_addr(assoc_server_fd, &assoc_addr) < 0) - fatal("slurm_get_stream_addr: %m"); - assoc_port = ntohs(((struct sockaddr_in) assoc_addr).sin_port); - - slurm_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - if (pthread_create(&assoc_thread, &attr, _assoc_agent, NULL)) { - error("Unable to start association agent: %m"); - if(slurm_shutdown_msg_engine(assoc_server_fd)) - error("slurm_shutdown_msg_engine %m"); - assoc_server_fd = -1; - assoc_port = 0; - } - slurm_attr_destroy( &attr ); - - return assoc_port; -} - extern int assoc_mgr_init(void *db_conn) { if(!slurmctld_cluster_name) @@ -223,14 +139,14 @@ extern int assoc_mgr_init(void *db_conn) if(!local_association_list) if(_get_local_association_list(db_conn) == SLURM_ERROR) return SLURM_ERROR; - if(!local_user_list) + if(!local_user_list) if(_get_local_user_list(db_conn) == SLURM_ERROR) return SLURM_ERROR; return SLURM_SUCCESS; } -extern int assoc_mgr_fini(void) +extern int assoc_mgr_fini() { if(local_association_list) list_destroy(local_association_list); @@ -239,21 +155,6 @@ extern int assoc_mgr_fini(void) local_association_list = NULL; local_user_list = NULL; - if(assoc_server_fd >= 0) { - int i; - assoc_shutdown = time(NULL); - for(i=0; i<4; i++) { - if(pthread_cancel(assoc_thread)) - break;; - usleep(1000); - } - if(i >= 4) - error("Could not kill assoc_thread"); - if(slurm_shutdown_msg_engine(assoc_server_fd)) - error("slurm_shutdown_msg_engine %m"); - assoc_server_fd = -1; - } - return SLURM_SUCCESS; } diff --git a/src/slurmctld/assoc_mgr.h b/src/slurmctld/assoc_mgr.h index df4914f514c..79c00d667fa 100644 --- a/src/slurmctld/assoc_mgr.h +++ b/src/slurmctld/assoc_mgr.h @@ -69,14 +69,7 @@ extern int get_default_account(void *db_conn, acct_user_rec_t *user); extern int get_assoc_id(void *db_conn, acct_association_rec_t *assoc); extern int assoc_mgr_init(void *db_conn); -extern int assoc_mgr_fini(void); - -/* - * Open a socket and spawn a pthread to process request originating in - * the SlurmDBD. The agent will be killed by assoc_mgr_fini(). - * RET: The port that will be used or 0 on error - */ -extern uint16_t assoc_mgr_server(void); +extern int assoc_mgr_fini(); /* * remove association from local cache diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index d1fc290babd..c744d800816 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -306,6 +306,9 @@ int main(int argc, char *argv[]) fatal( "failed to initialize node selection plugin"); if (slurm_acct_storage_init(NULL) != SLURM_SUCCESS ) fatal( "failed to initialize accounting_storage plugin"); + + acct_db_conn = acct_storage_g_get_connection(); + assoc_mgr_init(acct_db_conn); if (slurm_jobacct_gather_init() != SLURM_SUCCESS ) fatal( "failed to initialize jobacct_gather plugin"); @@ -350,11 +353,6 @@ int main(int argc, char *argv[]) slurmctld_conf.backup_controller); exit(0); } - - acct_storage_g_set_msg_port(assoc_mgr_server()); - acct_db_conn = acct_storage_g_get_connection(); - assoc_mgr_init(acct_db_conn); - info("Running as primary controller"); _gold_cluster_ready(); if (slurm_sched_init() != SLURM_SUCCESS) @@ -368,7 +366,7 @@ int main(int argc, char *argv[]) slurm_mutex_unlock(&slurmctld_config.thread_count_lock); slurm_attr_init(&thread_attr); if (pthread_create(&slurmctld_config.thread_id_rpc, - &thread_attr, _slurmctld_rpc_mgr, NULL)) + &thread_attr,_slurmctld_rpc_mgr, NULL)) fatal("pthread_create error %m"); slurm_attr_destroy(&thread_attr); @@ -417,7 +415,6 @@ int main(int argc, char *argv[]) != SLURM_SUCCESS ) error("failed to save node selection state"); switch_save(slurmctld_conf.state_save_location); - assoc_mgr_fini(); if (slurmctld_config.resume_backup == false) break; recover = 2; @@ -432,6 +429,7 @@ int main(int argc, char *argv[]) acct_storage_g_close_connection(acct_db_conn); slurm_acct_storage_fini(); /* Save pending message traffic */ + assoc_mgr_fini(); xfree(slurmctld_cluster_name); #ifdef MEMORY_LEAK_DEBUG /* This should purge all allocated memory, *\ diff --git a/src/slurmdbd/proc_req.c b/src/slurmdbd/proc_req.c index 3255bbfadf7..8275da9fc38 100644 --- a/src/slurmdbd/proc_req.c +++ b/src/slurmdbd/proc_req.c @@ -52,8 +52,8 @@ static int _cluster_procs(void *db_conn, static int _get_assocs(void *db_conn, Buf in_buffer, Buf *out_buffer); static int _get_jobs(void *db_conn, Buf in_buffer, Buf *out_buffer); static int _get_users(void *db_conn, Buf in_buffer, Buf *out_buffer); -static int _init_conn(void *db_conn, Buf in_buffer, Buf *out_buffer, - uint32_t *uid, uint16_t *port, char **cluster_name); +static int _init_conn(void *db_conn, + Buf in_buffer, Buf *out_buffer, uint32_t *uid); static int _job_complete(void *db_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); static int _job_start(void *db_conn, @@ -74,13 +74,10 @@ static int _step_start(void *db_conn, * first IN - set if first message received on the socket * buffer OUT - outgoing response, must be freed by caller * uid IN/OUT - user ID who initiated the RPC - * port OUT - slurmctld port to get update notifications, set for DBD_INIT only - * cluster_name OUT - cluster associated with message, set for DBD_INIT only - * RET SLURM_SUCCESS or error code - */ + * RET SLURM_SUCCESS or error code */ extern int -proc_req(void *db_conn, char *msg, uint32_t msg_size, bool first, - Buf *out_buffer, uint32_t *uid, uint16_t *port, char **cluster_name) +proc_req(void *db_conn, char *msg, uint32_t msg_size, + bool first, Buf *out_buffer, uint32_t *uid) { int rc = SLURM_SUCCESS; uint16_t msg_type; @@ -137,8 +134,7 @@ proc_req(void *db_conn, char *msg, uint32_t msg_size, bool first, case DBD_INIT: if (first) rc = _init_conn(db_conn, - in_buffer, out_buffer, uid, - port, cluster_name); + in_buffer, out_buffer, uid); else { error("DBD_INIT sent after connection " "established"); @@ -328,8 +324,8 @@ static int _get_users(void *db_conn, Buf in_buffer, Buf *out_buffer) return SLURM_SUCCESS; } -static int _init_conn(void *db_conn, Buf in_buffer, Buf *out_buffer, - uint32_t *uid, uint16_t *port, char **cluster_name) +static int _init_conn(void *db_conn, + Buf in_buffer, Buf *out_buffer, uint32_t *uid) { dbd_init_msg_t *init_msg; @@ -345,15 +341,8 @@ static int _init_conn(void *db_conn, Buf in_buffer, Buf *out_buffer, return SLURM_ERROR; } *uid = init_msg->uid; - *port = init_msg->slurmctld_port; - if (init_msg->cluster_name && init_msg->cluster_name[0]) - *cluster_name = xstrdup(init_msg->cluster_name); - else - *cluster_name = NULL; - info("DBD_INIT: VERSION:%u UID:%u CLUSTER:%s PORT:%u", - init_msg->version, init_msg->uid, - init_msg->cluster_name, init_msg->slurmctld_port); + info("DBD_INIT: VERSION:%u UID:%u", init_msg->version, init_msg->uid); slurmdbd_free_init_msg(init_msg); *out_buffer = make_dbd_rc_msg(SLURM_SUCCESS); return SLURM_SUCCESS; diff --git a/src/slurmdbd/proc_req.h b/src/slurmdbd/proc_req.h index f4005904303..3d795ec7719 100644 --- a/src/slurmdbd/proc_req.h +++ b/src/slurmdbd/proc_req.h @@ -44,16 +44,11 @@ /* Process an incoming RPC * msg IN - incoming message - * msg_size IN - size of msg in bytes + * msg_size IN - size of msg (bytes) * first IN - set if first message received on the socket * buffer OUT - outgoing response, must be freed by caller - * uid IN/OUT - user ID who initiated the RPC - * port OUT - slurmctld port to get update notifications, set for DBD_INIT only - * cluster_name OUT - cluster associated with message, set for DBD_INIT only - * RET SLURM_SUCCESS or error code - */ + * RET SLURM_SUCCESS or error code */ extern int proc_req(void *db_conn, char *msg, uint32_t msg_size, - bool first, Buf *buffer, uint32_t *uid, - uint16_t *port, char **cluster_name); + bool first, Buf *buffer, uint32_t *uid); #endif /* !_PROC_REQ */ diff --git a/src/slurmdbd/rpc_mgr.c b/src/slurmdbd/rpc_mgr.c index b020155bdea..d403b18cd61 100644 --- a/src/slurmdbd/rpc_mgr.c +++ b/src/slurmdbd/rpc_mgr.c @@ -48,8 +48,8 @@ #include "src/common/fd.h" #include "src/common/log.h" #include "src/common/macros.h" -#include "src/common/slurm_accounting_storage.h" #include "src/common/slurm_protocol_api.h" +#include "src/common/slurm_accounting_storage.h" #include "src/common/slurmdbd_defs.h" #include "src/common/xmalloc.h" #include "src/common/xsignal.h" @@ -79,7 +79,6 @@ static pthread_cond_t thread_count_cond = PTHREAD_COND_INITIALIZER; typedef struct connection_arg { slurm_fd newsockfd; - char hostname[512]; } connection_arg_t; @@ -222,44 +221,8 @@ static void * _service_connection(void *arg) offset += msg_read; } if (msg_size == offset) { - uint16_t port = 0; - char *cluster_name = NULL; rc = proc_req(db_conn, - msg, msg_size, first, &buffer, - &uid, &port, &cluster_name); - if (first && cluster_name && port) { - slurm_addr ctld_address; - slurm_get_stream_addr(conn->newsockfd, - &ctld_address); - ((struct sockaddr_in) ctld_address).sin_port = - htons(port); - /* FIXME: Add to recipient list: cluster_name, address */ -#if 0 -{ -/* test code only */ -slurm_fd fd; -fd = slurm_open_stream(&ctld_address); -if (fd < 0) - error("can not open socket back to slurmctld"); -else { - uint32_t msg_size, nw_size; - Buf buffer; - slurmdbd_msg_t req; - dbd_rc_msg_t msg; - msg.return_code = 5; - req.msg_type = DBD_RC; - req.data = &msg; - buffer = pack_slurmdbd_msg(&req); - msg_size = get_buf_offset(buffer); - nw_size = htonl(msg_size); - slurm_write_stream(fd, (char *)&nw_size, sizeof(nw_size)); - slurm_write_stream(fd, get_buf_data(buffer), msg_size); - free_buf(buffer); - slurm_close_stream(fd); -} -} -#endif - } + msg, msg_size, first, &buffer, &uid); first = false; if (rc != SLURM_SUCCESS) { error("Processing message from connection %d", -- GitLab