diff --git a/src/common/slurm_accounting_storage.c b/src/common/slurm_accounting_storage.c index ea8239ca4afd810f2df8a3e849801a704bbe7a2c..b01ff52d40bab3885e6afee5cfc1ce79fd8341cd 100644 --- a/src/common/slurm_accounting_storage.c +++ b/src/common/slurm_accounting_storage.c @@ -1,5 +1,5 @@ /*****************************************************************************\ - * slurm_accounting_storage.c - account torage plugin wrapper. + * slurm_accounting_storage.c - account storage plugin wrapper. * * $Id: slurm_accounting_storage.c 10744 2007-01-11 20:09:18Z da $ ***************************************************************************** @@ -58,7 +58,8 @@ */ typedef struct slurm_acct_storage_ops { - void *(*get_conn) (); + int (*set_msg_port) (uint16_t port); + void *(*get_conn) (void); int (*close_conn) (void *db_conn); int (*add_users) (void *db_conn, List user_list); @@ -187,6 +188,7 @@ static slurm_acct_storage_ops_t * _acct_storage_get_ops( * Must be synchronized with slurm_acct_storage_ops_t above. */ static const char *syms[] = { + "acct_storage_p_set_msg_port", "acct_storage_p_get_connection", "acct_storage_p_close_connection", "acct_storage_p_add_users", @@ -1240,6 +1242,13 @@ extern int slurm_acct_storage_fini(void) return rc; } +extern int acct_storage_g_set_msg_port(uint16_t port) +{ + if (slurm_acct_storage_init(NULL) < 0) + return SLURM_ERROR; + return (*(g_acct_storage_context->ops.set_msg_port))(port); +} + extern void *acct_storage_g_get_connection() { if (slurm_acct_storage_init(NULL) < 0) diff --git a/src/common/slurm_accounting_storage.h b/src/common/slurm_accounting_storage.h index 92f778ca0571ebda82299843b3a43cd0c9c952f4..5db408f06dc69dd3693866fe5b319f4019890463 100644 --- a/src/common/slurm_accounting_storage.h +++ b/src/common/slurm_accounting_storage.h @@ -194,11 +194,19 @@ extern acct_admin_level_t str_2_acct_admin_level(char *level); extern int slurm_acct_storage_init(char *loc); /* load the plugin */ extern int slurm_acct_storage_fini(void); /* unload the plugin */ +/* + * If running on slurmctld, specify a port for messages from SlurmDBD + * to slurmctld for notification of database changes + * IN: port - socket port open for incoming message + * RET: SLURM_SUCCESS on success SLURM_ERROR else + */ +extern int acct_storage_g_set_msg_port(uint16_t port); + /* * get a new connection to the storage unit * RET: pointer used to access db */ -extern void *acct_storage_g_get_connection(); +extern void *acct_storage_g_get_connection(void); /* * release connection to the storage unit diff --git a/src/common/slurmdbd_defs.c b/src/common/slurmdbd_defs.c index fd26de7bcd8c5b408be7a30c6ee1039e97de0f0b..ab385535859b2ef3b378038c908468cf05e7984d 100644 --- a/src/common/slurmdbd_defs.c +++ b/src/common/slurmdbd_defs.c @@ -83,7 +83,9 @@ static time_t agent_shutdown = 0; static pthread_mutex_t slurmdbd_lock = PTHREAD_MUTEX_INITIALIZER; static slurm_fd slurmdbd_fd = -1; +static uint16_t slurmctld_port = 0; static char * slurmdbd_auth_info = NULL; +static char * slurmctld_cluster_name = NULL; static void * _agent(void *x); static void _agent_queue_del(void *x); @@ -111,7 +113,8 @@ static int _tot_wait (struct timeval *start_time); ****************************************************************************/ /* Open a socket connection to SlurmDbd */ -extern int slurm_open_slurmdbd_conn(char *auth_info) +extern int slurm_open_slurmdbd_conn(char *auth_info, + uint16_t port, char *cluster_name) { slurm_mutex_lock(&agent_lock); if ((agent_tid == 0) || (agent_list == NULL)) @@ -122,6 +125,9 @@ extern int slurm_open_slurmdbd_conn(char *auth_info) xfree(slurmdbd_auth_info); if (auth_info) slurmdbd_auth_info = xstrdup(auth_info); + slurmctld_port = port; + if (cluster_name) + slurmctld_cluster_name = xstrdup(cluster_name); if (slurmdbd_fd < 0) _open_slurmdbd_fd(); slurm_mutex_unlock(&slurmdbd_lock); @@ -138,11 +144,70 @@ extern int slurm_close_slurmdbd_conn(void) slurm_mutex_lock(&slurmdbd_lock); _close_slurmdbd_fd(); xfree(slurmdbd_auth_info); + xfree(slurmctld_cluster_name); + slurmctld_port = 0; slurm_mutex_unlock(&slurmdbd_lock); return SLURM_SUCCESS; } +/* + * Receive a message from the SlurmDBD and authenticate it + * IN: fd - the open file to be read from + * OUT: msg the message from SlurmDBD, must be freed by the caller + * Returns SLURM_SUCCESS or an error code + */ +extern int slurm_recv_slurmdbd_msg(slurm_fd fd, slurmdbd_msg_t *msg) +{ + char *in_msg = NULL; + Buf buffer; + uint32_t nw_size, msg_size; + ssize_t msg_read, offset; + int rc = SLURM_ERROR; + + if (!_fd_readable(fd)) { + error("Premature close from slurmdbd"); + return rc; + } + msg_read = read(fd, &nw_size, sizeof(nw_size)); + if (msg_read == 0) { + error("Premature EOF from slurmdbd"); + return rc; + } + if (msg_read != sizeof(nw_size)) { + error("Could not read msg_size from slurmdbd"); + return rc; + } + msg_size = ntohl(nw_size); + if ((msg_size < 2) || (msg_size > 1000000)) { + error("Invalid msg_size (%u) from slurmdbd", + msg_size); + return SLURM_ERROR; + } + + buffer = init_buf(msg_size); + in_msg = get_buf_data(buffer);; + offset = 0; + while (msg_size > offset) { + if (!_fd_readable(fd)) + break; /* problem with this socket */ + msg_read = read(fd, (in_msg + offset), + (msg_size - offset)); + if (msg_read <= 0) { + error("read(%d): %m", fd); + break; + } + offset += msg_read; + } + if (msg_size != offset) { + error("Could not read full message from slurmdbd"); + } else { + rc = unpack_slurmdbd_msg(msg, buffer); + } + free_buf(buffer); + return rc; +} + /* Send an RPC to the SlurmDBD and wait for the return code reply. * The RPC will not be queued if an error occurs. * Returns SLURM_SUCCESS or an error code */ @@ -533,7 +598,9 @@ static int _send_init_msg(void) buffer = init_buf(1024); pack16((uint16_t) DBD_INIT, buffer); - req.version = SLURMDBD_VERSION; + req.version = SLURMDBD_VERSION; + req.slurmctld_port = slurmctld_port; + req.cluster_name = slurmctld_cluster_name; slurmdbd_pack_init_msg(&req, buffer, slurmdbd_auth_info); rc = _send_msg(buffer); @@ -1191,7 +1258,10 @@ void inline slurmdbd_free_get_jobs_msg(dbd_get_jobs_msg_t *msg) void inline slurmdbd_free_init_msg(dbd_init_msg_t *msg) { - xfree(msg); + if (msg) { + xfree(msg->cluster_name); + xfree(msg); + } } void inline slurmdbd_free_job_complete_msg(dbd_job_comp_msg_t *msg) @@ -1549,6 +1619,8 @@ slurmdbd_pack_init_msg(dbd_init_msg_t *msg, Buf buffer, char *auth_info) int rc; void *auth_cred; + packstr(msg->cluster_name, buffer); + pack16(msg->slurmctld_port, buffer); pack16(msg->version, buffer); auth_cred = g_slurm_auth_create(NULL, 2, auth_info); if (auth_cred == NULL) { @@ -1569,9 +1641,12 @@ int inline slurmdbd_unpack_init_msg(dbd_init_msg_t **msg, Buf buffer, char *auth_info) { void *auth_cred; + uint32_t uint32_tmp; dbd_init_msg_t *msg_ptr = xmalloc(sizeof(dbd_init_msg_t)); *msg = msg_ptr; + safe_unpackstr_xmalloc(&msg_ptr->cluster_name, &uint32_tmp, buffer); + safe_unpack16(&msg_ptr->slurmctld_port, buffer); safe_unpack16(&msg_ptr->version, buffer); auth_cred = g_slurm_auth_unpack(buffer); if (auth_cred == NULL) { @@ -1584,6 +1659,7 @@ slurmdbd_unpack_init_msg(dbd_init_msg_t **msg, Buf buffer, char *auth_info) return SLURM_SUCCESS; unpack_error: + xfree(msg_ptr->cluster_name); xfree(msg_ptr); *msg = NULL; return SLURM_ERROR; diff --git a/src/common/slurmdbd_defs.h b/src/common/slurmdbd_defs.h index 1b3afe5febdc2bc3cb890a866191f74ca8a8a0c1..1292f03dceee8b187ae2f76ac8e514f219df7f4a 100644 --- a/src/common/slurmdbd_defs.h +++ b/src/common/slurmdbd_defs.h @@ -107,7 +107,7 @@ typedef enum { * an account */ DBD_REMOVE_ASSOCS, /* Remove existing association */ DBD_REMOVE_CLUSTERS, /* Remove existing cluster */ - DBD_REMOVE_USERS, /* Remove existing user */ + DBD_REMOVE_USERS, /* Remove existing user */ DBD_STEP_COMPLETE, /* Record step completion */ DBD_STEP_START /* Record step starting */ } slurmdbd_msg_type_t; @@ -145,17 +145,20 @@ typedef struct { } dbd_usage_msg_t; typedef struct dbd_get_jobs_msg { - char *cluster_name; /* name of cluster to query */ - uint32_t gid; /* group id */ - List selected_steps; /* List of jobacct_selected_step_t *'s */ - List selected_parts; /* List of char *'s */ - char *user; /* user name */ + char *cluster_name; /* name of cluster to query */ + uint32_t gid; /* group id */ + List selected_steps; /* List of jobacct_selected_step_t *'s */ + List selected_parts; /* List of char *'s */ + char *user; /* user name */ } dbd_get_jobs_msg_t; typedef struct dbd_init_msg { - uint16_t version; /* protocol version */ + char *cluster_name; /* name of cluster to query */ + uint16_t slurmctld_port;/* port on slurmctld to process messages + * originating on slurmdbd */ uint32_t uid; /* UID originating connection, * filled by authtentication plugin*/ + uint16_t version; /* protocol version */ } dbd_init_msg_t; typedef struct dbd_job_comp_msg { @@ -207,8 +210,8 @@ typedef struct dbd_job_suspend_msg { } dbd_job_suspend_msg_t; typedef struct { - List my_list; /* this list could be of any type as long as it - * is handled correctly on both ends */ + List my_list; /* this list could be of any type as long as it + * is handled correctly on both ends */ } dbd_list_msg_t; typedef struct { @@ -261,12 +264,32 @@ typedef struct dbd_step_start_msg { * Slurm DBD message processing functions \*****************************************************************************/ -/* Open a socket connection to SlurmDbd using SlurmdbdAuthInfo specified */ -extern int slurm_open_slurmdbd_conn(char *auth_info); +/* + * Open a socket connection to SlurmDbd using SlurmdbdAuthInfo specified + * IN: auth_info - If Munge is used for authentication, this will be the + * pathname to the named socket used if not the default + * socket. Typically used if one Munge key and credential + * is used within a cluster and a different for site-wide + * authentication. + * IN: port - socket port used for message originating in SlurmDdb and + * sent to slurmctld or zero if connect is not from slurmctld + * IN: cluster_name - cluster name or NULL if connect is not from slurmctld + * Returns SLURM_SUCCESS or an error code + */ +extern int slurm_open_slurmdbd_conn(char *auth_info, + uint16_t port, char *cluster_name); /* Close the SlurmDBD socket connection */ extern int slurm_close_slurmdbd_conn(void); +/* + * Receive a message from the SlurmDBD and authenticate it + * IN: fd - the open file to be read from + * OUT: msg the message from SlurmDBD, must be freed by the caller + * Returns SLURM_SUCCESS or an error code + */ +extern int slurm_recv_slurmdbd_msg(slurm_fd fd, slurmdbd_msg_t *msg); + /* Send an RPC to the SlurmDBD. Do not wait for the reply. The RPC * will be queued and processed later if the SlurmDBD is not responding. * Returns SLURM_SUCCESS or an error code */ diff --git a/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c b/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c index 5d5d116d0a64414ba17527fdd546e0bd457c1296..f8bdf84e41d4912f3037df6e2de0d49de5f3efd8 100644 --- a/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c +++ b/src/plugins/accounting_storage/filetxt/accounting_storage_filetxt.c @@ -228,6 +228,12 @@ extern int fini ( void ) return SLURM_SUCCESS; } +extern int acct_storage_p_set_msg_port(uint16_t port) +{ + /* unused */ + return SLURM_SUCCESS; +} + extern void * acct_storage_p_get_connection() { return NULL; diff --git a/src/plugins/accounting_storage/gold/accounting_storage_gold.c b/src/plugins/accounting_storage/gold/accounting_storage_gold.c index 6bbb5124964a4de5406ea5a0e90536239b29f16d..2de6663d9ec24c6681f2fe3ddd08eeeec5dc4517 100644 --- a/src/plugins/accounting_storage/gold/accounting_storage_gold.c +++ b/src/plugins/accounting_storage/gold/accounting_storage_gold.c @@ -713,6 +713,11 @@ extern int fini ( void ) return SLURM_SUCCESS; } +extern int acct_storage_p_set_msg_port(uint16_t port) +{ + return SLURM_SUCCESS; +} + extern void * acct_storage_p_get_connection() { return NULL; diff --git a/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c b/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c index 1d7c62dca32135409b03541203ddb2ff838bf350..b683a15b517d1bd9643ea4f95d0e1cd77d6e9abf 100644 --- a/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c +++ b/src/plugins/accounting_storage/mysql/accounting_storage_mysql.c @@ -469,6 +469,11 @@ extern int fini ( void ) #endif } +extern int acct_storage_p_set_msg_port(uint16_t port) +{ + return SLURM_SUCCESS; +} + extern void *acct_storage_p_get_connection() { #ifdef HAVE_MYSQL diff --git a/src/plugins/accounting_storage/none/accounting_storage_none.c b/src/plugins/accounting_storage/none/accounting_storage_none.c index 9a1759077f6cdb34a68a9274d14630eb27450798..bc7133ccee421183f1d9b9a45c11585b226b45c2 100644 --- a/src/plugins/accounting_storage/none/accounting_storage_none.c +++ b/src/plugins/accounting_storage/none/accounting_storage_none.c @@ -87,6 +87,11 @@ extern int fini ( void ) return SLURM_SUCCESS; } +extern int acct_storage_p_set_msg_port(uint16_t port) +{ + return SLURM_SUCCESS; +} + extern void * acct_storage_p_get_connection() { return NULL; diff --git a/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c b/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c index df3ca96a22382fdcdedd122527d9cba367068d38..2ba66a42266d956ef54b6a86b10ff375201bf997 100644 --- a/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c +++ b/src/plugins/accounting_storage/pgsql/accounting_storage_pgsql.c @@ -665,6 +665,11 @@ extern int fini ( void ) #endif } +extern int acct_storage_p_set_msg_port(uint16_t port) +{ + return SLURM_SUCCESS; +} + extern void *acct_storage_p_get_connection() { #ifdef HAVE_PGSQL diff --git a/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c b/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c index bd618d98b155023ffd8b22e6bc7408fa709736e9..06664221c1b6effe898da4137cd4baff73821015 100644 --- a/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c +++ b/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c @@ -93,6 +93,7 @@ const char plugin_type[] = "accounting_storage/slurmdbd"; const uint32_t plugin_version = 100; static char *cluster_name = NULL; +static uint16_t slurmctld_port = 0; static char *slurmdbd_auth_info = NULL; /* @@ -105,17 +106,11 @@ extern int init ( void ) if (first) { /* since this can be loaded from many different places - only tell us once. */ - if (!(cluster_name = slurm_get_cluster_name())) - fatal("%s requires ClusterName in slurm.conf", - plugin_name); - + * only tell us once. */ slurmdbd_auth_info = slurm_get_accounting_storage_pass(); if(!slurmdbd_auth_info) - verbose("%s loaded AuthInfo=%s", plugin_name, slurmdbd_auth_info); - slurm_open_slurmdbd_conn(slurmdbd_auth_info); first = 0; } else { @@ -134,8 +129,27 @@ extern int fini ( void ) return SLURM_SUCCESS; } -extern void *acct_storage_p_get_connection() +extern int acct_storage_p_set_msg_port(uint16_t port) { + if (!(cluster_name = slurm_get_cluster_name())) { + error("%s requires ClusterName in slurm.conf", + plugin_name); + return SLURM_ERROR; + } + + slurmctld_port = port; + return SLURM_SUCCESS; +} + +extern void *acct_storage_p_get_connection(void) +{ + static int first = 1; + + if (first) { + slurm_open_slurmdbd_conn(slurmdbd_auth_info, slurmctld_port, + cluster_name); + first = 0; + } return NULL; } diff --git a/src/slurmctld/assoc_mgr.c b/src/slurmctld/assoc_mgr.c index 48624d88d639fc98eb2c2729391b2ebb3b4e5bb5..77883f590c24e6ab0f59a291cb1789acba52ce5c 100644 --- a/src/slurmctld/assoc_mgr.c +++ b/src/slurmctld/assoc_mgr.c @@ -34,17 +34,23 @@ * with SLURM; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. \*****************************************************************************/ +#include <pwd.h> +#include <sys/types.h> #include "assoc_mgr.h" +#include "src/common/slurmdbd_defs.h" #include "src/common/xstring.h" -#include <sys/types.h> -#include <pwd.h> + static List local_association_list = NULL; static List local_user_list = NULL; static pthread_mutex_t local_association_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t local_user_lock = PTHREAD_MUTEX_INITIALIZER; +static slurm_fd assoc_server_fd = -1; +static pthread_t assoc_thread = 0; +static time_t assoc_shutdown = 0; + static int _get_local_association_list(void *db_conn) { acct_association_cond_t assoc_q; @@ -131,6 +137,84 @@ static int _get_local_user_list(void *db_conn) return SLURM_SUCCESS; } +static void _process_free_slurmdbd_msg(slurmdbd_msg_t msg) +{ + switch (msg.msg_type) { + case DBD_RC: + /* Sample code only, process the RPC and free msg */ + info("got from SlurmDBD DBD_RC: %u", + ((dbd_rc_msg_t *)msg.data)->return_code); + slurmdbd_free_rc_msg((dbd_rc_msg_t *)msg.data); + break; + default: + error("Invalid msg_type from slurmdbd: %u", + msg.msg_type); + } +} + +static void *_assoc_agent(void *args) +{ + slurm_fd newsockfd; + slurm_addr cli_addr; + slurmdbd_msg_t msg; + + while (assoc_shutdown == 0) { + if((newsockfd = slurm_accept_msg_conn(assoc_server_fd, + &cli_addr)) == + SLURM_SOCKET_ERROR) { + if(errno != EINTR) + error("slurm_accept_msg_conn: %m"); + continue; + } + if(assoc_shutdown) + break; + if(slurm_recv_slurmdbd_msg(newsockfd, &msg) + != SLURM_SUCCESS) { + error("slurm_recv_slurmdbd_msg: %m"); + } else { + info("Received some message from SlurmDBD"); + /* NOTE: authentication should be handle within the + * message un/pack for relevant messages */ + _process_free_slurmdbd_msg(msg); + } + slurm_shutdown_msg_conn(newsockfd); + } + + return NULL; +} + +extern uint16_t assoc_mgr_server(void) +{ + slurm_addr assoc_addr; + uint16_t assoc_port = 0; + pthread_attr_t attr; + + if (assoc_server_fd >= 0) { + error("Association server already spawned"); + return assoc_port; + } + + assoc_server_fd = slurm_init_msg_engine_port(0); + if (assoc_server_fd < 0) + fatal("slurm_init_msg_engine_port: %m"); + if (slurm_get_stream_addr(assoc_server_fd, &assoc_addr) < 0) + fatal("slurm_get_stream_addr: %m"); + assoc_port = ntohs(((struct sockaddr_in) assoc_addr).sin_port); + + slurm_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (pthread_create(&assoc_thread, &attr, _assoc_agent, NULL)) { + error("Unable to start association agent: %m"); + if(slurm_shutdown_msg_engine(assoc_server_fd)) + error("slurm_shutdown_msg_engine %m"); + assoc_server_fd = -1; + assoc_port = 0; + } + slurm_attr_destroy( &attr ); + + return assoc_port; +} + extern int assoc_mgr_init(void *db_conn) { if(!slurmctld_cluster_name) @@ -139,14 +223,14 @@ extern int assoc_mgr_init(void *db_conn) if(!local_association_list) if(_get_local_association_list(db_conn) == SLURM_ERROR) return SLURM_ERROR; - if(!local_user_list) + if(!local_user_list) if(_get_local_user_list(db_conn) == SLURM_ERROR) return SLURM_ERROR; return SLURM_SUCCESS; } -extern int assoc_mgr_fini() +extern int assoc_mgr_fini(void) { if(local_association_list) list_destroy(local_association_list); @@ -155,6 +239,21 @@ extern int assoc_mgr_fini() local_association_list = NULL; local_user_list = NULL; + if(assoc_server_fd >= 0) { + int i; + assoc_shutdown = time(NULL); + for(i=0; i<4; i++) { + if(pthread_cancel(assoc_thread)) + break;; + usleep(1000); + } + if(i >= 4) + error("Could not kill assoc_thread"); + if(slurm_shutdown_msg_engine(assoc_server_fd)) + error("slurm_shutdown_msg_engine %m"); + assoc_server_fd = -1; + } + return SLURM_SUCCESS; } diff --git a/src/slurmctld/assoc_mgr.h b/src/slurmctld/assoc_mgr.h index 79c00d667fa5fef1ebf620d8343d5b9b6c5567c1..df4914f514caf6e32a4edcb6962613e0d30aa756 100644 --- a/src/slurmctld/assoc_mgr.h +++ b/src/slurmctld/assoc_mgr.h @@ -69,7 +69,14 @@ extern int get_default_account(void *db_conn, acct_user_rec_t *user); extern int get_assoc_id(void *db_conn, acct_association_rec_t *assoc); extern int assoc_mgr_init(void *db_conn); -extern int assoc_mgr_fini(); +extern int assoc_mgr_fini(void); + +/* + * Open a socket and spawn a pthread to process request originating in + * the SlurmDBD. The agent will be killed by assoc_mgr_fini(). + * RET: The port that will be used or 0 on error + */ +extern uint16_t assoc_mgr_server(void); /* * remove association from local cache diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index c744d8008161bfe33b65e67b9b9769604a95f901..d1fc290babdd80ef101c907b8e016c84d081a913 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -306,9 +306,6 @@ int main(int argc, char *argv[]) fatal( "failed to initialize node selection plugin"); if (slurm_acct_storage_init(NULL) != SLURM_SUCCESS ) fatal( "failed to initialize accounting_storage plugin"); - - acct_db_conn = acct_storage_g_get_connection(); - assoc_mgr_init(acct_db_conn); if (slurm_jobacct_gather_init() != SLURM_SUCCESS ) fatal( "failed to initialize jobacct_gather plugin"); @@ -353,6 +350,11 @@ int main(int argc, char *argv[]) slurmctld_conf.backup_controller); exit(0); } + + acct_storage_g_set_msg_port(assoc_mgr_server()); + acct_db_conn = acct_storage_g_get_connection(); + assoc_mgr_init(acct_db_conn); + info("Running as primary controller"); _gold_cluster_ready(); if (slurm_sched_init() != SLURM_SUCCESS) @@ -366,7 +368,7 @@ int main(int argc, char *argv[]) slurm_mutex_unlock(&slurmctld_config.thread_count_lock); slurm_attr_init(&thread_attr); if (pthread_create(&slurmctld_config.thread_id_rpc, - &thread_attr,_slurmctld_rpc_mgr, NULL)) + &thread_attr, _slurmctld_rpc_mgr, NULL)) fatal("pthread_create error %m"); slurm_attr_destroy(&thread_attr); @@ -415,6 +417,7 @@ int main(int argc, char *argv[]) != SLURM_SUCCESS ) error("failed to save node selection state"); switch_save(slurmctld_conf.state_save_location); + assoc_mgr_fini(); if (slurmctld_config.resume_backup == false) break; recover = 2; @@ -429,7 +432,6 @@ int main(int argc, char *argv[]) acct_storage_g_close_connection(acct_db_conn); slurm_acct_storage_fini(); /* Save pending message traffic */ - assoc_mgr_fini(); xfree(slurmctld_cluster_name); #ifdef MEMORY_LEAK_DEBUG /* This should purge all allocated memory, *\ diff --git a/src/slurmdbd/proc_req.c b/src/slurmdbd/proc_req.c index 8275da9fc3838c0ab6d5ec42dd83ee557fe4a917..3255bbfadf77f240f1c256f36e7c8a47926ce9d9 100644 --- a/src/slurmdbd/proc_req.c +++ b/src/slurmdbd/proc_req.c @@ -52,8 +52,8 @@ static int _cluster_procs(void *db_conn, static int _get_assocs(void *db_conn, Buf in_buffer, Buf *out_buffer); static int _get_jobs(void *db_conn, Buf in_buffer, Buf *out_buffer); static int _get_users(void *db_conn, Buf in_buffer, Buf *out_buffer); -static int _init_conn(void *db_conn, - Buf in_buffer, Buf *out_buffer, uint32_t *uid); +static int _init_conn(void *db_conn, Buf in_buffer, Buf *out_buffer, + uint32_t *uid, uint16_t *port, char **cluster_name); static int _job_complete(void *db_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); static int _job_start(void *db_conn, @@ -74,10 +74,13 @@ static int _step_start(void *db_conn, * first IN - set if first message received on the socket * buffer OUT - outgoing response, must be freed by caller * uid IN/OUT - user ID who initiated the RPC - * RET SLURM_SUCCESS or error code */ + * port OUT - slurmctld port to get update notifications, set for DBD_INIT only + * cluster_name OUT - cluster associated with message, set for DBD_INIT only + * RET SLURM_SUCCESS or error code + */ extern int -proc_req(void *db_conn, char *msg, uint32_t msg_size, - bool first, Buf *out_buffer, uint32_t *uid) +proc_req(void *db_conn, char *msg, uint32_t msg_size, bool first, + Buf *out_buffer, uint32_t *uid, uint16_t *port, char **cluster_name) { int rc = SLURM_SUCCESS; uint16_t msg_type; @@ -134,7 +137,8 @@ proc_req(void *db_conn, char *msg, uint32_t msg_size, case DBD_INIT: if (first) rc = _init_conn(db_conn, - in_buffer, out_buffer, uid); + in_buffer, out_buffer, uid, + port, cluster_name); else { error("DBD_INIT sent after connection " "established"); @@ -324,8 +328,8 @@ static int _get_users(void *db_conn, Buf in_buffer, Buf *out_buffer) return SLURM_SUCCESS; } -static int _init_conn(void *db_conn, - Buf in_buffer, Buf *out_buffer, uint32_t *uid) +static int _init_conn(void *db_conn, Buf in_buffer, Buf *out_buffer, + uint32_t *uid, uint16_t *port, char **cluster_name) { dbd_init_msg_t *init_msg; @@ -341,8 +345,15 @@ static int _init_conn(void *db_conn, return SLURM_ERROR; } *uid = init_msg->uid; + *port = init_msg->slurmctld_port; + if (init_msg->cluster_name && init_msg->cluster_name[0]) + *cluster_name = xstrdup(init_msg->cluster_name); + else + *cluster_name = NULL; - info("DBD_INIT: VERSION:%u UID:%u", init_msg->version, init_msg->uid); + info("DBD_INIT: VERSION:%u UID:%u CLUSTER:%s PORT:%u", + init_msg->version, init_msg->uid, + init_msg->cluster_name, init_msg->slurmctld_port); slurmdbd_free_init_msg(init_msg); *out_buffer = make_dbd_rc_msg(SLURM_SUCCESS); return SLURM_SUCCESS; diff --git a/src/slurmdbd/proc_req.h b/src/slurmdbd/proc_req.h index 3d795ec7719e9246cf79dba30abcc8a3ec2a2c2c..f4005904303b0a26ec0148fe874bc9b005c1ff83 100644 --- a/src/slurmdbd/proc_req.h +++ b/src/slurmdbd/proc_req.h @@ -44,11 +44,16 @@ /* Process an incoming RPC * msg IN - incoming message - * msg_size IN - size of msg (bytes) + * msg_size IN - size of msg in bytes * first IN - set if first message received on the socket * buffer OUT - outgoing response, must be freed by caller - * RET SLURM_SUCCESS or error code */ + * uid IN/OUT - user ID who initiated the RPC + * port OUT - slurmctld port to get update notifications, set for DBD_INIT only + * cluster_name OUT - cluster associated with message, set for DBD_INIT only + * RET SLURM_SUCCESS or error code + */ extern int proc_req(void *db_conn, char *msg, uint32_t msg_size, - bool first, Buf *buffer, uint32_t *uid); + bool first, Buf *buffer, uint32_t *uid, + uint16_t *port, char **cluster_name); #endif /* !_PROC_REQ */ diff --git a/src/slurmdbd/rpc_mgr.c b/src/slurmdbd/rpc_mgr.c index d403b18cd6172710f39f00cdad4900a371a8f17f..b020155bdea43c766cf68b32e7c5eeb62006b3db 100644 --- a/src/slurmdbd/rpc_mgr.c +++ b/src/slurmdbd/rpc_mgr.c @@ -48,8 +48,8 @@ #include "src/common/fd.h" #include "src/common/log.h" #include "src/common/macros.h" -#include "src/common/slurm_protocol_api.h" #include "src/common/slurm_accounting_storage.h" +#include "src/common/slurm_protocol_api.h" #include "src/common/slurmdbd_defs.h" #include "src/common/xmalloc.h" #include "src/common/xsignal.h" @@ -79,6 +79,7 @@ static pthread_cond_t thread_count_cond = PTHREAD_COND_INITIALIZER; typedef struct connection_arg { slurm_fd newsockfd; + char hostname[512]; } connection_arg_t; @@ -221,8 +222,44 @@ static void * _service_connection(void *arg) offset += msg_read; } if (msg_size == offset) { + uint16_t port = 0; + char *cluster_name = NULL; rc = proc_req(db_conn, - msg, msg_size, first, &buffer, &uid); + msg, msg_size, first, &buffer, + &uid, &port, &cluster_name); + if (first && cluster_name && port) { + slurm_addr ctld_address; + slurm_get_stream_addr(conn->newsockfd, + &ctld_address); + ((struct sockaddr_in) ctld_address).sin_port = + htons(port); + /* FIXME: Add to recipient list: cluster_name, address */ +#if 0 +{ +/* test code only */ +slurm_fd fd; +fd = slurm_open_stream(&ctld_address); +if (fd < 0) + error("can not open socket back to slurmctld"); +else { + uint32_t msg_size, nw_size; + Buf buffer; + slurmdbd_msg_t req; + dbd_rc_msg_t msg; + msg.return_code = 5; + req.msg_type = DBD_RC; + req.data = &msg; + buffer = pack_slurmdbd_msg(&req); + msg_size = get_buf_offset(buffer); + nw_size = htonl(msg_size); + slurm_write_stream(fd, (char *)&nw_size, sizeof(nw_size)); + slurm_write_stream(fd, get_buf_data(buffer), msg_size); + free_buf(buffer); + slurm_close_stream(fd); +} +} +#endif + } first = false; if (rc != SLURM_SUCCESS) { error("Processing message from connection %d",