From 31128b27a05060283ed2c782e2bdad454685ab07 Mon Sep 17 00:00:00 2001 From: Danny Auble <da@llnl.gov> Date: Thu, 15 Apr 2010 00:01:18 +0000 Subject: [PATCH] Speed up job start when using the slurmdbd. --- src/common/assoc_mgr.c | 24 +- src/common/slurm_protocol_api.c | 4 +- src/common/slurmdbd_defs.c | 437 ++++++++------ src/common/slurmdbd_defs.h | 200 ++++--- .../slurmdbd/accounting_storage_slurmdbd.c | 293 +++++++-- .../multifactor/priority_multifactor.c | 6 +- src/slurmctld/controller.c | 10 +- src/slurmctld/job_mgr.c | 11 +- src/slurmctld/job_scheduler.c | 3 +- src/slurmctld/node_scheduler.c | 5 +- src/slurmctld/slurmctld.h | 1 + src/slurmdbd/proc_req.c | 560 ++++++++++-------- src/slurmdbd/rpc_mgr.c | 2 +- 13 files changed, 935 insertions(+), 621 deletions(-) diff --git a/src/common/assoc_mgr.c b/src/common/assoc_mgr.c index a272e1a75e1..41d9a20d732 100644 --- a/src/common/assoc_mgr.c +++ b/src/common/assoc_mgr.c @@ -2498,8 +2498,8 @@ extern int dump_assoc_mgr_state(char *state_save_location) msg.my_list = assoc_mgr_user_list; /* let us know what to unpack */ pack16(DBD_ADD_USERS, buffer); - slurmdbd_pack_list_msg(SLURMDBD_VERSION, - DBD_ADD_USERS, &msg, buffer); + slurmdbd_pack_list_msg(&msg, SLURMDBD_VERSION, + DBD_ADD_USERS, buffer); slurm_mutex_unlock(&assoc_mgr_user_lock); } @@ -2509,8 +2509,8 @@ extern int dump_assoc_mgr_state(char *state_save_location) msg.my_list = assoc_mgr_qos_list; /* let us know what to unpack */ pack16(DBD_ADD_QOS, buffer); - slurmdbd_pack_list_msg(SLURMDBD_VERSION, - DBD_ADD_QOS, &msg, buffer); + slurmdbd_pack_list_msg(&msg, SLURMDBD_VERSION, + DBD_ADD_QOS, buffer); slurm_mutex_unlock(&assoc_mgr_qos_lock); } @@ -2520,8 +2520,8 @@ extern int dump_assoc_mgr_state(char *state_save_location) msg.my_list = assoc_mgr_wckey_list; /* let us know what to unpack */ pack16(DBD_ADD_WCKEYS, buffer); - slurmdbd_pack_list_msg(SLURMDBD_VERSION, - DBD_ADD_WCKEYS, &msg, buffer); + slurmdbd_pack_list_msg(&msg, SLURMDBD_VERSION, + DBD_ADD_WCKEYS, buffer); slurm_mutex_unlock(&assoc_mgr_wckey_lock); } /* this needs to be done last so qos is set up @@ -2532,8 +2532,8 @@ extern int dump_assoc_mgr_state(char *state_save_location) msg.my_list = assoc_mgr_association_list; /* let us know what to unpack */ pack16(DBD_ADD_ASSOCS, buffer); - slurmdbd_pack_list_msg(SLURMDBD_VERSION, - DBD_ADD_ASSOCS, &msg, buffer); + slurmdbd_pack_list_msg(&msg, SLURMDBD_VERSION, + DBD_ADD_ASSOCS, buffer); slurm_mutex_unlock(&assoc_mgr_association_lock); } @@ -2998,7 +2998,7 @@ extern int load_assoc_mgr_state(char *state_save_location) switch(type) { case DBD_ADD_ASSOCS: error_code = slurmdbd_unpack_list_msg( - SLURMDBD_VERSION, DBD_ADD_ASSOCS, &msg, buffer); + &msg, SLURMDBD_VERSION, DBD_ADD_ASSOCS, buffer); if (error_code != SLURM_SUCCESS) goto unpack_error; else if(!msg->my_list) { @@ -3019,7 +3019,7 @@ extern int load_assoc_mgr_state(char *state_save_location) break; case DBD_ADD_USERS: error_code = slurmdbd_unpack_list_msg( - SLURMDBD_VERSION, DBD_ADD_USERS, &msg, buffer); + &msg, SLURMDBD_VERSION, DBD_ADD_USERS, buffer); if (error_code != SLURM_SUCCESS) goto unpack_error; else if(!msg->my_list) { @@ -3039,7 +3039,7 @@ extern int load_assoc_mgr_state(char *state_save_location) break; case DBD_ADD_QOS: error_code = slurmdbd_unpack_list_msg( - SLURMDBD_VERSION, DBD_ADD_QOS, &msg, buffer); + &msg, SLURMDBD_VERSION, DBD_ADD_QOS, buffer); if (error_code != SLURM_SUCCESS) goto unpack_error; else if(!msg->my_list) { @@ -3059,7 +3059,7 @@ extern int load_assoc_mgr_state(char *state_save_location) break; case DBD_ADD_WCKEYS: error_code = slurmdbd_unpack_list_msg( - SLURMDBD_VERSION, DBD_ADD_WCKEYS, &msg, buffer); + &msg, SLURMDBD_VERSION, DBD_ADD_WCKEYS, buffer); if (error_code != SLURM_SUCCESS) goto unpack_error; else if(!msg->my_list) { diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index 1110b0f51af..e9e1384b189 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -1155,7 +1155,9 @@ int slurm_get_is_association_based_accounting(void) if(!strcasecmp(conf->accounting_storage_type, "accounting_storage/slurmdbd") || !strcasecmp(conf->accounting_storage_type, - "accounting_storage/mysql")) + "accounting_storage/mysql") + || !strcasecmp(conf->accounting_storage_type, + "accounting_storage/pgsql")) enforce = 1; slurm_conf_unlock(); } diff --git a/src/common/slurmdbd_defs.c b/src/common/slurmdbd_defs.c index 433ba9dc6b6..ae77a06306d 100644 --- a/src/common/slurmdbd_defs.c +++ b/src/common/slurmdbd_defs.c @@ -206,7 +206,8 @@ extern int slurm_close_slurmdbd_conn(void) * The RPC will not be queued if an error occurs. * Returns SLURM_SUCCESS or an error code */ extern int slurm_send_slurmdbd_recv_rc_msg(uint16_t rpc_version, - slurmdbd_msg_t *req, int *resp_code) + slurmdbd_msg_t *req, + int *resp_code) { int rc; slurmdbd_msg_t *resp; @@ -278,7 +279,7 @@ extern int slurm_send_recv_slurmdbd_msg(uint16_t rpc_version, } } - if(!(buffer = pack_slurmdbd_msg(rpc_version, req))) { + if(!(buffer = pack_slurmdbd_msg(req, rpc_version))) { rc = SLURM_ERROR; goto end_it; } @@ -299,7 +300,7 @@ extern int slurm_send_recv_slurmdbd_msg(uint16_t rpc_version, goto end_it; } - rc = unpack_slurmdbd_msg(rpc_version, resp, buffer); + rc = unpack_slurmdbd_msg(resp, rpc_version, buffer); /* check for the rc of the start job message */ if (rc == SLURM_SUCCESS && resp->msg_type == DBD_ID_RC) rc = ((dbd_id_rc_msg_t *)resp->data)->return_code; @@ -324,7 +325,7 @@ extern int slurm_send_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) static time_t syslog_time = 0; - buffer = pack_slurmdbd_msg(rpc_version, req); + buffer = pack_slurmdbd_msg(req, rpc_version); slurm_mutex_lock(&agent_lock); if ((agent_tid == 0) || (agent_list == NULL)) { @@ -421,7 +422,7 @@ again: xfree(slurmdbd_host); } -extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) +extern Buf pack_slurmdbd_msg(slurmdbd_msg_t *req, uint16_t rpc_version) { Buf buffer = init_buf(MAX_DBD_MSG_LEN); pack16(req->msg_type, buffer); @@ -446,15 +447,16 @@ extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) case DBD_GOT_TXN: case DBD_GOT_USERS: case DBD_GOT_CONFIG: + case DBD_SEND_MULT_JOB_START: + case DBD_GOT_MULT_JOB_START: slurmdbd_pack_list_msg( - rpc_version, req->msg_type, - (dbd_list_msg_t *)req->data, buffer); + (dbd_list_msg_t *)req->data, rpc_version, + req->msg_type, buffer); break; case DBD_ADD_ACCOUNT_COORDS: case DBD_REMOVE_ACCOUNT_COORDS: slurmdbd_pack_acct_coord_msg( - rpc_version, - (dbd_acct_coord_msg_t *)req->data, + (dbd_acct_coord_msg_t *)req->data, rpc_version, buffer); break; case DBD_ARCHIVE_LOAD: @@ -463,8 +465,8 @@ extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) case DBD_CLUSTER_CPUS: case DBD_FLUSH_JOBS: slurmdbd_pack_cluster_cpus_msg( - rpc_version, - (dbd_cluster_cpus_msg_t *)req->data, buffer); + (dbd_cluster_cpus_msg_t *)req->data, rpc_version, + buffer); break; case DBD_GET_ACCOUNTS: case DBD_GET_ASSOCS: @@ -485,8 +487,8 @@ extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) case DBD_REMOVE_USERS: case DBD_ARCHIVE_DUMP: slurmdbd_pack_cond_msg( - rpc_version, req->msg_type, - (dbd_cond_msg_t *)req->data, buffer); + (dbd_cond_msg_t *)req->data, rpc_version, req->msg_type, + buffer); break; case DBD_GET_ASSOC_USAGE: case DBD_GOT_ASSOC_USAGE: @@ -495,42 +497,37 @@ extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) case DBD_GET_WCKEY_USAGE: case DBD_GOT_WCKEY_USAGE: slurmdbd_pack_usage_msg( - rpc_version, req->msg_type, - (dbd_usage_msg_t *)req->data, buffer); + (dbd_usage_msg_t *)req->data, rpc_version, + req->msg_type, buffer); break; case DBD_GET_JOBS: - slurmdbd_pack_get_jobs_msg( - rpc_version, - (dbd_get_jobs_msg_t *)req->data, buffer); + slurmdbd_pack_get_jobs_msg((dbd_get_jobs_msg_t *)req->data, + rpc_version, + buffer); break; case DBD_INIT: - slurmdbd_pack_init_msg(rpc_version, - (dbd_init_msg_t *)req->data, buffer, - slurmdbd_auth_info); + slurmdbd_pack_init_msg((dbd_init_msg_t *)req->data, rpc_version, + buffer, slurmdbd_auth_info); break; case DBD_FINI: - slurmdbd_pack_fini_msg(rpc_version, - (dbd_fini_msg_t *)req->data, buffer); + slurmdbd_pack_fini_msg((dbd_fini_msg_t *)req->data, + rpc_version, buffer); break; case DBD_JOB_COMPLETE: - slurmdbd_pack_job_complete_msg(rpc_version, - (dbd_job_comp_msg_t *)req->data, + slurmdbd_pack_job_complete_msg((dbd_job_comp_msg_t *)req->data, + rpc_version, buffer); break; case DBD_JOB_START: - slurmdbd_pack_job_start_msg(rpc_version, - (dbd_job_start_msg_t *)req->data, - buffer); + slurmdbd_pack_job_start_msg(req->data, rpc_version, buffer); break; case DBD_ID_RC: - slurmdbd_pack_id_rc_msg( - rpc_version, - (dbd_id_rc_msg_t *)req->data, buffer); + slurmdbd_pack_id_rc_msg(req->data, rpc_version, buffer); break; case DBD_JOB_SUSPEND: slurmdbd_pack_job_suspend_msg( - rpc_version, - (dbd_job_suspend_msg_t *)req->data, buffer); + (dbd_job_suspend_msg_t *)req->data, rpc_version, + buffer); break; case DBD_MODIFY_ACCOUNTS: case DBD_MODIFY_ASSOCS: @@ -538,44 +535,44 @@ extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) case DBD_MODIFY_QOS: case DBD_MODIFY_USERS: slurmdbd_pack_modify_msg( - rpc_version, req->msg_type, - (dbd_modify_msg_t *)req->data, buffer); + (dbd_modify_msg_t *)req->data, rpc_version, + req->msg_type, buffer); break; case DBD_NODE_STATE: slurmdbd_pack_node_state_msg( - rpc_version, - (dbd_node_state_msg_t *)req->data, buffer); + (dbd_node_state_msg_t *)req->data, rpc_version, + buffer); break; case DBD_RC: - slurmdbd_pack_rc_msg(rpc_version, - (dbd_rc_msg_t *)req->data, buffer); + slurmdbd_pack_rc_msg((dbd_rc_msg_t *)req->data, + rpc_version, buffer); break; case DBD_STEP_COMPLETE: slurmdbd_pack_step_complete_msg( - rpc_version, - (dbd_step_comp_msg_t *)req->data, buffer); + (dbd_step_comp_msg_t *)req->data, rpc_version, + buffer); break; case DBD_STEP_START: - slurmdbd_pack_step_start_msg(rpc_version, - (dbd_step_start_msg_t *)req->data, + slurmdbd_pack_step_start_msg((dbd_step_start_msg_t *)req->data, + rpc_version, buffer); break; case DBD_REGISTER_CTLD: slurmdbd_pack_register_ctld_msg( - rpc_version, - (dbd_register_ctld_msg_t *)req->data, buffer); + (dbd_register_ctld_msg_t *)req->data, rpc_version, + buffer); break; case DBD_ROLL_USAGE: - slurmdbd_pack_roll_usage_msg(rpc_version, - (dbd_roll_usage_msg_t *) - req->data, buffer); + slurmdbd_pack_roll_usage_msg((dbd_roll_usage_msg_t *)req->data, + rpc_version, + buffer); break; case DBD_ADD_RESV: case DBD_REMOVE_RESV: case DBD_MODIFY_RESV: slurmdbd_pack_rec_msg( - rpc_version, req->msg_type, - (dbd_rec_msg_t *)req->data, buffer); + (dbd_rec_msg_t *)req->data, rpc_version, req->msg_type, + buffer); break; case DBD_GET_CONFIG: /* No message to pack */ @@ -591,8 +588,8 @@ extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req) return buffer; } -extern int unpack_slurmdbd_msg(uint16_t rpc_version, - slurmdbd_msg_t *resp, Buf buffer) +extern int unpack_slurmdbd_msg(slurmdbd_msg_t *resp, + uint16_t rpc_version, Buf buffer) { int rc = SLURM_SUCCESS; @@ -618,24 +615,27 @@ extern int unpack_slurmdbd_msg(uint16_t rpc_version, case DBD_GOT_TXN: case DBD_GOT_USERS: case DBD_GOT_CONFIG: + case DBD_SEND_MULT_JOB_START: + case DBD_GOT_MULT_JOB_START: rc = slurmdbd_unpack_list_msg( - rpc_version, resp->msg_type, - (dbd_list_msg_t **)&resp->data, buffer); + (dbd_list_msg_t **)&resp->data, rpc_version, + resp->msg_type, buffer); break; case DBD_ADD_ACCOUNT_COORDS: case DBD_REMOVE_ACCOUNT_COORDS: rc = slurmdbd_unpack_acct_coord_msg( - rpc_version, - (dbd_acct_coord_msg_t **)&resp->data, buffer); + (dbd_acct_coord_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_ARCHIVE_LOAD: - rc = slurmdb_unpack_archive_rec(&resp->data, rpc_version, buffer); + rc = slurmdb_unpack_archive_rec( + &resp->data, rpc_version, buffer); break; case DBD_CLUSTER_CPUS: case DBD_FLUSH_JOBS: rc = slurmdbd_unpack_cluster_cpus_msg( - rpc_version, - (dbd_cluster_cpus_msg_t **)&resp->data, buffer); + (dbd_cluster_cpus_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_GET_ACCOUNTS: case DBD_GET_ASSOCS: @@ -656,8 +656,8 @@ extern int unpack_slurmdbd_msg(uint16_t rpc_version, case DBD_REMOVE_USERS: case DBD_ARCHIVE_DUMP: rc = slurmdbd_unpack_cond_msg( - rpc_version, resp->msg_type, - (dbd_cond_msg_t **)&resp->data, buffer); + (dbd_cond_msg_t **)&resp->data, rpc_version, + resp->msg_type, buffer); break; case DBD_GET_ASSOC_USAGE: case DBD_GOT_ASSOC_USAGE: @@ -666,14 +666,13 @@ extern int unpack_slurmdbd_msg(uint16_t rpc_version, case DBD_GET_WCKEY_USAGE: case DBD_GOT_WCKEY_USAGE: rc = slurmdbd_unpack_usage_msg( - rpc_version, - resp->msg_type, (dbd_usage_msg_t **)&resp->data, - buffer); + (dbd_usage_msg_t **)&resp->data, rpc_version, + resp->msg_type, buffer); break; case DBD_GET_JOBS: rc = slurmdbd_unpack_get_jobs_msg( - rpc_version, - (dbd_get_jobs_msg_t **)&resp->data, buffer); + (dbd_get_jobs_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_INIT: rc = slurmdbd_unpack_init_msg((dbd_init_msg_t **)&resp->data, @@ -681,29 +680,27 @@ extern int unpack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_auth_info); break; case DBD_FINI: - rc = slurmdbd_unpack_fini_msg(rpc_version, - (dbd_fini_msg_t **)&resp->data, + rc = slurmdbd_unpack_fini_msg((dbd_fini_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_JOB_COMPLETE: rc = slurmdbd_unpack_job_complete_msg( - rpc_version, - (dbd_job_comp_msg_t **)&resp->data, buffer); + (dbd_job_comp_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_JOB_START: rc = slurmdbd_unpack_job_start_msg( - rpc_version, - (dbd_job_start_msg_t **)&resp->data, buffer); + &resp->data, rpc_version, buffer); break; case DBD_ID_RC: rc = slurmdbd_unpack_id_rc_msg( - rpc_version, - (dbd_id_rc_msg_t **)&resp->data, buffer); + &resp->data, rpc_version, buffer); break; case DBD_JOB_SUSPEND: rc = slurmdbd_unpack_job_suspend_msg( - rpc_version, - (dbd_job_suspend_msg_t **)&resp->data, buffer); + (dbd_job_suspend_msg_t **)&resp->data, rpc_version, + buffer); break; case DBD_MODIFY_ACCOUNTS: case DBD_MODIFY_ASSOCS: @@ -711,46 +708,47 @@ extern int unpack_slurmdbd_msg(uint16_t rpc_version, case DBD_MODIFY_QOS: case DBD_MODIFY_USERS: rc = slurmdbd_unpack_modify_msg( + (dbd_modify_msg_t **)&resp->data, rpc_version, - resp->msg_type, (dbd_modify_msg_t **)&resp->data, + resp->msg_type, buffer); break; case DBD_NODE_STATE: rc = slurmdbd_unpack_node_state_msg( - rpc_version, - (dbd_node_state_msg_t **)&resp->data, buffer); + (dbd_node_state_msg_t **)&resp->data, rpc_version, + buffer); break; case DBD_RC: - rc = slurmdbd_unpack_rc_msg(rpc_version, - (dbd_rc_msg_t **)&resp->data, + rc = slurmdbd_unpack_rc_msg((dbd_rc_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_STEP_COMPLETE: rc = slurmdbd_unpack_step_complete_msg( - rpc_version, - (dbd_step_comp_msg_t **)&resp->data, buffer); + (dbd_step_comp_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_STEP_START: rc = slurmdbd_unpack_step_start_msg( - rpc_version, - (dbd_step_start_msg_t **)&resp->data, buffer); + (dbd_step_start_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_REGISTER_CTLD: rc = slurmdbd_unpack_register_ctld_msg( - rpc_version, - (dbd_register_ctld_msg_t **)&resp->data, buffer); + (dbd_register_ctld_msg_t **)&resp->data, + rpc_version, buffer); break; case DBD_ROLL_USAGE: rc = slurmdbd_unpack_roll_usage_msg( - rpc_version, - (dbd_roll_usage_msg_t **)&resp->data, buffer); + (dbd_roll_usage_msg_t **)&resp->data, rpc_version, + buffer); break; case DBD_ADD_RESV: case DBD_REMOVE_RESV: case DBD_MODIFY_RESV: rc = slurmdbd_unpack_rec_msg( - rpc_version, resp->msg_type, - (dbd_rec_msg_t **)&resp->data, buffer); + (dbd_rec_msg_t **)&resp->data, rpc_version, + resp->msg_type, buffer); break; case DBD_GET_CONFIG: /* No message to unpack */ @@ -912,6 +910,10 @@ extern slurmdbd_msg_type_t str_2_slurmdbd_msg_type(char *msg_type) return DBD_GET_CONFIG; } else if(!strcasecmp(msg_type, "Got Config")) { return DBD_GOT_CONFIG; + } else if(!strcasecmp(msg_type, "Send Multiple Job Starts")) { + return DBD_SEND_MULT_JOB_START; + } else if(!strcasecmp(msg_type, "Got Multiple Job Starts")) { + return DBD_GOT_MULT_JOB_START; } else { return NO_VAL; } @@ -1342,6 +1344,18 @@ extern char *slurmdbd_msg_type_2_str(slurmdbd_msg_type_t msg_type, int get_enum) } else return "Got Config"; break; + case DBD_SEND_MULT_JOB_START: + if(get_enum) { + return "DBD_SEND_MULT_JOB_START"; + } else + return "Send Multiple Job Starts"; + break; + case DBD_GOT_MULT_JOB_START: + if(get_enum) { + return "DBD_GOT_MULT_JOB_START"; + } else + return "Got Multiple Job Starts"; + break; default: return "Unknown"; break; @@ -1364,7 +1378,7 @@ static int _send_init_msg() req.cluster_name = slurmdbd_cluster; req.rollback = rollback_started; req.version = SLURMDBD_VERSION; - slurmdbd_pack_init_msg(SLURMDBD_VERSION, &req, buffer, + slurmdbd_pack_init_msg(&req, SLURMDBD_VERSION, buffer, slurmdbd_auth_info); /* if we have an issue with the pack we want to log the errno, but send anyway so we get it logged on the slurmdbd also */ @@ -1395,7 +1409,7 @@ static int _send_fini_msg(void) pack16((uint16_t) DBD_FINI, buffer); req.commit = 0; req.close_conn = 1; - slurmdbd_pack_fini_msg(SLURMDBD_VERSION, &req, buffer); + slurmdbd_pack_fini_msg(&req, SLURMDBD_VERSION, buffer); _send_msg(buffer); free_buf(buffer); @@ -1483,7 +1497,8 @@ static int _get_return_code(uint16_t rpc_version, int read_timeout) safe_unpack16(&msg_type, buffer); switch(msg_type) { case DBD_ID_RC: - if (slurmdbd_unpack_id_rc_msg(rpc_version, &id_msg, buffer) + if (slurmdbd_unpack_id_rc_msg( + (void **)&id_msg, rpc_version, buffer) == SLURM_SUCCESS) { rc = id_msg->return_code; slurmdbd_free_id_rc_msg(id_msg); @@ -1493,8 +1508,8 @@ static int _get_return_code(uint16_t rpc_version, int read_timeout) error("slurmdbd: unpack message error"); break; case DBD_RC: - if (slurmdbd_unpack_rc_msg(rpc_version, - &msg, buffer) == SLURM_SUCCESS) { + if (slurmdbd_unpack_rc_msg(&msg, rpc_version, buffer) + == SLURM_SUCCESS) { rc = msg->return_code; if (rc != SLURM_SUCCESS) { if(msg->sent_type == DBD_REGISTER_CTLD && @@ -2109,8 +2124,8 @@ void inline slurmdbd_free_cluster_cpus_msg(dbd_cluster_cpus_msg_t *msg) } } -void inline slurmdbd_free_rec_msg(slurmdbd_msg_type_t type, - dbd_rec_msg_t *msg) +void inline slurmdbd_free_rec_msg(dbd_rec_msg_t *msg, + slurmdbd_msg_type_t type) { void (*my_destroy) (void *object); @@ -2131,8 +2146,8 @@ void inline slurmdbd_free_rec_msg(slurmdbd_msg_type_t type, } } -void inline slurmdbd_free_cond_msg(slurmdbd_msg_type_t type, - dbd_cond_msg_t *msg) +void inline slurmdbd_free_cond_msg(dbd_cond_msg_t *msg, + slurmdbd_msg_type_t type) { void (*my_destroy) (void *object); @@ -2222,8 +2237,9 @@ void inline slurmdbd_free_job_complete_msg(dbd_job_comp_msg_t *msg) } } -void inline slurmdbd_free_job_start_msg(dbd_job_start_msg_t *msg) +void inline slurmdbd_free_job_start_msg(void *in) { + dbd_job_start_msg_t *msg = (dbd_job_start_msg_t *)in; if (msg) { xfree(msg->account); xfree(msg->block_id); @@ -2236,8 +2252,9 @@ void inline slurmdbd_free_job_start_msg(dbd_job_start_msg_t *msg) } } -void inline slurmdbd_free_id_rc_msg(dbd_id_rc_msg_t *msg) +void inline slurmdbd_free_id_rc_msg(void *in) { + dbd_id_rc_msg_t *msg = (dbd_id_rc_msg_t *)in; xfree(msg); } @@ -2255,8 +2272,8 @@ void inline slurmdbd_free_list_msg(dbd_list_msg_t *msg) } } -void inline slurmdbd_free_modify_msg(slurmdbd_msg_type_t type, - dbd_modify_msg_t *msg) +void inline slurmdbd_free_modify_msg(dbd_modify_msg_t *msg, + slurmdbd_msg_type_t type) { void (*destroy_cond) (void *object); void (*destroy_rec) (void *object); @@ -2341,8 +2358,8 @@ void inline slurmdbd_free_step_start_msg(dbd_step_start_msg_t *msg) } } -void inline slurmdbd_free_usage_msg(slurmdbd_msg_type_t type, - dbd_usage_msg_t *msg) +void inline slurmdbd_free_usage_msg(dbd_usage_msg_t *msg, + slurmdbd_msg_type_t type) { void (*destroy_rec) (void *object); if (msg) { @@ -2374,8 +2391,8 @@ void inline slurmdbd_free_usage_msg(slurmdbd_msg_type_t type, * Pack and unpack data structures \****************************************************************************/ void inline -slurmdbd_pack_acct_coord_msg(uint16_t rpc_version, - dbd_acct_coord_msg_t *msg, Buf buffer) +slurmdbd_pack_acct_coord_msg(dbd_acct_coord_msg_t *msg, + uint16_t rpc_version, Buf buffer) { char *acct = NULL; ListIterator itr = NULL; @@ -2398,8 +2415,8 @@ slurmdbd_pack_acct_coord_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_acct_coord_msg(uint16_t rpc_version, - dbd_acct_coord_msg_t **msg, Buf buffer) +slurmdbd_unpack_acct_coord_msg(dbd_acct_coord_msg_t **msg, + uint16_t rpc_version, Buf buffer) { uint32_t uint32_tmp; int i; @@ -2429,8 +2446,8 @@ unpack_error: } void inline -slurmdbd_pack_cluster_cpus_msg(uint16_t rpc_version, - dbd_cluster_cpus_msg_t *msg, Buf buffer) +slurmdbd_pack_cluster_cpus_msg(dbd_cluster_cpus_msg_t *msg, + uint16_t rpc_version, Buf buffer) { if(rpc_version >= 8) { packstr(msg->cluster_nodes, buffer); @@ -2445,8 +2462,8 @@ slurmdbd_pack_cluster_cpus_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_cluster_cpus_msg(uint16_t rpc_version, - dbd_cluster_cpus_msg_t **msg, Buf buffer) +slurmdbd_unpack_cluster_cpus_msg(dbd_cluster_cpus_msg_t **msg, + uint16_t rpc_version, Buf buffer) { dbd_cluster_cpus_msg_t *msg_ptr; uint32_t uint32_tmp; @@ -2476,9 +2493,9 @@ unpack_error: return SLURM_ERROR; } -void inline slurmdbd_pack_rec_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_rec_msg_t *msg, Buf buffer) +void inline slurmdbd_pack_rec_msg(dbd_rec_msg_t *msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer) { void (*my_function) (void *object, uint16_t rpc_version, Buf buffer); @@ -2496,9 +2513,9 @@ void inline slurmdbd_pack_rec_msg(uint16_t rpc_version, (*(my_function))(msg->rec, rpc_version, buffer); } -int inline slurmdbd_unpack_rec_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_rec_msg_t **msg, Buf buffer) +int inline slurmdbd_unpack_rec_msg(dbd_rec_msg_t **msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer) { dbd_rec_msg_t *msg_ptr = NULL; int (*my_function) (void **object, uint16_t rpc_version, Buf buffer); @@ -2523,14 +2540,14 @@ int inline slurmdbd_unpack_rec_msg(uint16_t rpc_version, return SLURM_SUCCESS; unpack_error: - slurmdbd_free_rec_msg(type, msg_ptr); + slurmdbd_free_rec_msg(msg_ptr, type); *msg = NULL; return SLURM_ERROR; } -void inline slurmdbd_pack_cond_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_cond_msg_t *msg, Buf buffer) +void inline slurmdbd_pack_cond_msg(dbd_cond_msg_t *msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer) { void (*my_function) (void *object, uint16_t rpc_version, Buf buffer); @@ -2583,9 +2600,9 @@ void inline slurmdbd_pack_cond_msg(uint16_t rpc_version, (*(my_function))(msg->cond, rpc_version, buffer); } -int inline slurmdbd_unpack_cond_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_cond_msg_t **msg, Buf buffer) +int inline slurmdbd_unpack_cond_msg(dbd_cond_msg_t **msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer) { dbd_cond_msg_t *msg_ptr = NULL; int (*my_function) (void **object, uint16_t rpc_version, Buf buffer); @@ -2645,13 +2662,13 @@ int inline slurmdbd_unpack_cond_msg(uint16_t rpc_version, return SLURM_SUCCESS; unpack_error: - slurmdbd_free_cond_msg(type, msg_ptr); + slurmdbd_free_cond_msg(msg_ptr, type); *msg = NULL; return SLURM_ERROR; } -void inline slurmdbd_pack_get_jobs_msg(uint16_t rpc_version, - dbd_get_jobs_msg_t *msg, Buf buffer) +void inline slurmdbd_pack_get_jobs_msg(dbd_get_jobs_msg_t *msg, + uint16_t rpc_version, Buf buffer) { uint32_t i = 0; ListIterator itr = NULL; @@ -2693,8 +2710,8 @@ void inline slurmdbd_pack_get_jobs_msg(uint16_t rpc_version, packstr(msg->user, buffer); } -int inline slurmdbd_unpack_get_jobs_msg(uint16_t rpc_version, - dbd_get_jobs_msg_t **msg, Buf buffer) +int inline slurmdbd_unpack_get_jobs_msg(dbd_get_jobs_msg_t **msg, + uint16_t rpc_version, Buf buffer) { int i; uint32_t count = 0; @@ -2743,7 +2760,7 @@ unpack_error: } void inline -slurmdbd_pack_init_msg(uint16_t rpc_version, dbd_init_msg_t *msg, +slurmdbd_pack_init_msg(dbd_init_msg_t *msg, uint16_t rpc_version, Buf buffer, char *auth_info) { int rc; @@ -2822,14 +2839,14 @@ unpack_error: } void inline -slurmdbd_pack_fini_msg(uint16_t rpc_version, dbd_fini_msg_t *msg, Buf buffer) +slurmdbd_pack_fini_msg(dbd_fini_msg_t *msg, uint16_t rpc_version, Buf buffer) { pack16(msg->close_conn, buffer); pack16(msg->commit, buffer); } int inline -slurmdbd_unpack_fini_msg(uint16_t rpc_version, dbd_fini_msg_t **msg, Buf buffer) +slurmdbd_unpack_fini_msg(dbd_fini_msg_t **msg, uint16_t rpc_version, Buf buffer) { dbd_fini_msg_t *msg_ptr = xmalloc(sizeof(dbd_fini_msg_t)); *msg = msg_ptr; @@ -2846,8 +2863,8 @@ unpack_error: } void inline -slurmdbd_pack_job_complete_msg(uint16_t rpc_version, - dbd_job_comp_msg_t *msg, Buf buffer) +slurmdbd_pack_job_complete_msg(dbd_job_comp_msg_t *msg, + uint16_t rpc_version, Buf buffer) { if(rpc_version >= 6) { pack32(msg->assoc_id, buffer); @@ -2864,8 +2881,8 @@ slurmdbd_pack_job_complete_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_job_complete_msg(uint16_t rpc_version, - dbd_job_comp_msg_t **msg, Buf buffer) +slurmdbd_unpack_job_complete_msg(dbd_job_comp_msg_t **msg, + uint16_t rpc_version, Buf buffer) { uint32_t uint32_tmp; dbd_job_comp_msg_t *msg_ptr = xmalloc(sizeof(dbd_job_comp_msg_t)); @@ -2892,9 +2909,10 @@ unpack_error: } void inline -slurmdbd_pack_job_start_msg(uint16_t rpc_version, - dbd_job_start_msg_t *msg, Buf buffer) +slurmdbd_pack_job_start_msg(void *in, + uint16_t rpc_version, Buf buffer) { + dbd_job_start_msg_t *msg = (dbd_job_start_msg_t *)in; if(rpc_version >= 8) { packstr(msg->account, buffer); pack32(msg->alloc_cpus, buffer); @@ -2946,8 +2964,8 @@ slurmdbd_pack_job_start_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_job_start_msg(uint16_t rpc_version, - dbd_job_start_msg_t **msg, Buf buffer) +slurmdbd_unpack_job_start_msg(void **msg, + uint16_t rpc_version, Buf buffer) { uint32_t uint32_tmp; char *tmp_char; @@ -3014,21 +3032,36 @@ unpack_error: } void inline -slurmdbd_pack_id_rc_msg(uint16_t rpc_version, - dbd_id_rc_msg_t *msg, Buf buffer) +slurmdbd_pack_id_rc_msg(void *in, + uint16_t rpc_version, Buf buffer) { - pack32(msg->id, buffer); - pack32(msg->return_code, buffer); + dbd_id_rc_msg_t *msg = (dbd_id_rc_msg_t *)in; + + if(rpc_version >= 8) { + pack32(msg->job_id, buffer); + pack32(msg->id, buffer); + pack32(msg->return_code, buffer); + } else { + pack32(msg->id, buffer); + pack32(msg->return_code, buffer); + } } int inline -slurmdbd_unpack_id_rc_msg(uint16_t rpc_version, - dbd_id_rc_msg_t **msg, Buf buffer) +slurmdbd_unpack_id_rc_msg(void **msg, + uint16_t rpc_version, Buf buffer) { dbd_id_rc_msg_t *msg_ptr = xmalloc(sizeof(dbd_id_rc_msg_t)); + *msg = msg_ptr; - safe_unpack32(&msg_ptr->id, buffer); - safe_unpack32(&msg_ptr->return_code, buffer); + if(rpc_version >= 8) { + safe_unpack32(&msg_ptr->job_id, buffer); + safe_unpack32(&msg_ptr->id, buffer); + safe_unpack32(&msg_ptr->return_code, buffer); + } else { + safe_unpack32(&msg_ptr->id, buffer); + safe_unpack32(&msg_ptr->return_code, buffer); + } return SLURM_SUCCESS; unpack_error: @@ -3038,8 +3071,8 @@ unpack_error: } void inline -slurmdbd_pack_job_suspend_msg(uint16_t rpc_version, - dbd_job_suspend_msg_t *msg, Buf buffer) +slurmdbd_pack_job_suspend_msg(dbd_job_suspend_msg_t *msg, + uint16_t rpc_version, Buf buffer) { pack32(msg->assoc_id, buffer); pack32(msg->db_index, buffer); @@ -3050,8 +3083,8 @@ slurmdbd_pack_job_suspend_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_job_suspend_msg(uint16_t rpc_version, - dbd_job_suspend_msg_t **msg, Buf buffer) +slurmdbd_unpack_job_suspend_msg(dbd_job_suspend_msg_t **msg, + uint16_t rpc_version, Buf buffer) { dbd_job_suspend_msg_t *msg_ptr = xmalloc(sizeof(dbd_job_suspend_msg_t)); *msg = msg_ptr; @@ -3069,9 +3102,10 @@ unpack_error: return SLURM_ERROR; } -void inline slurmdbd_pack_list_msg(uint16_t rpc_version, +void inline slurmdbd_pack_list_msg(dbd_list_msg_t *msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_list_msg_t *msg, Buf buffer) + Buf buffer) { uint32_t count = 0; ListIterator itr = NULL; @@ -3122,6 +3156,12 @@ void inline slurmdbd_pack_list_msg(uint16_t rpc_version, case DBD_GOT_EVENTS: my_function = slurmdb_pack_event_rec; break; + case DBD_SEND_MULT_JOB_START: + my_function = slurmdbd_pack_job_start_msg; + break; + case DBD_GOT_MULT_JOB_START: + my_function = slurmdbd_pack_id_rc_msg; + break; default: fatal("Unknown pack type"); return; @@ -3142,9 +3182,8 @@ void inline slurmdbd_pack_list_msg(uint16_t rpc_version, } } -int inline slurmdbd_unpack_list_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_list_msg_t **msg, Buf buffer) +int inline slurmdbd_unpack_list_msg(dbd_list_msg_t **msg, uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer) { int i; uint32_t count; @@ -3209,6 +3248,14 @@ int inline slurmdbd_unpack_list_msg(uint16_t rpc_version, my_function = slurmdb_unpack_event_rec; my_destroy = slurmdb_destroy_event_rec; break; + case DBD_SEND_MULT_JOB_START: + my_function = slurmdbd_unpack_job_start_msg; + my_destroy = slurmdbd_free_job_start_msg; + break; + case DBD_GOT_MULT_JOB_START: + my_function = slurmdbd_unpack_id_rc_msg; + my_destroy = slurmdbd_free_id_rc_msg; + break; default: fatal("Unknown unpack type"); return SLURM_ERROR; @@ -3238,9 +3285,10 @@ unpack_error: return SLURM_ERROR; } -void inline slurmdbd_pack_modify_msg(uint16_t rpc_version, +void inline slurmdbd_pack_modify_msg(dbd_modify_msg_t *msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_modify_msg_t *msg, Buf buffer) + Buf buffer) { void (*my_cond) (void *object, uint16_t rpc_version, Buf buffer); void (*my_rec) (void *object, uint16_t rpc_version, Buf buffer); @@ -3274,9 +3322,10 @@ void inline slurmdbd_pack_modify_msg(uint16_t rpc_version, (*(my_rec))(msg->rec, rpc_version, buffer); } -int inline slurmdbd_unpack_modify_msg(uint16_t rpc_version, +int inline slurmdbd_unpack_modify_msg(dbd_modify_msg_t **msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_modify_msg_t **msg, Buf buffer) + Buf buffer) { dbd_modify_msg_t *msg_ptr = NULL; int (*my_cond) (void **object, uint16_t rpc_version, Buf buffer); @@ -3319,14 +3368,14 @@ int inline slurmdbd_unpack_modify_msg(uint16_t rpc_version, return SLURM_SUCCESS; unpack_error: - slurmdbd_free_modify_msg(type, msg_ptr); + slurmdbd_free_modify_msg(msg_ptr, type); *msg = NULL; return SLURM_ERROR; } void inline -slurmdbd_pack_node_state_msg(uint16_t rpc_version, - dbd_node_state_msg_t *msg, Buf buffer) +slurmdbd_pack_node_state_msg(dbd_node_state_msg_t *msg, + uint16_t rpc_version, Buf buffer) { if(rpc_version >= 8) { pack32(msg->cpu_count, buffer); @@ -3348,8 +3397,8 @@ slurmdbd_pack_node_state_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_node_state_msg(uint16_t rpc_version, - dbd_node_state_msg_t **msg, Buf buffer) +slurmdbd_unpack_node_state_msg(dbd_node_state_msg_t **msg, + uint16_t rpc_version, Buf buffer) { dbd_node_state_msg_t *msg_ptr; uint32_t uint32_tmp; @@ -3387,8 +3436,8 @@ unpack_error: } void inline -slurmdbd_pack_rc_msg(uint16_t rpc_version, - dbd_rc_msg_t *msg, Buf buffer) +slurmdbd_pack_rc_msg(dbd_rc_msg_t *msg, + uint16_t rpc_version, Buf buffer) { packstr(msg->comment, buffer); pack32(msg->return_code, buffer); @@ -3396,8 +3445,8 @@ slurmdbd_pack_rc_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_rc_msg(uint16_t rpc_version, - dbd_rc_msg_t **msg, Buf buffer) +slurmdbd_unpack_rc_msg(dbd_rc_msg_t **msg, + uint16_t rpc_version, Buf buffer) { uint32_t uint32_tmp; dbd_rc_msg_t *msg_ptr = xmalloc(sizeof(dbd_rc_msg_t)); @@ -3414,8 +3463,8 @@ unpack_error: } void inline -slurmdbd_pack_register_ctld_msg(uint16_t rpc_version, - dbd_register_ctld_msg_t *msg, Buf buffer) +slurmdbd_pack_register_ctld_msg(dbd_register_ctld_msg_t *msg, + uint16_t rpc_version, Buf buffer) { if(rpc_version >= 8) { pack16(msg->port, buffer); @@ -3426,8 +3475,8 @@ slurmdbd_pack_register_ctld_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_register_ctld_msg(uint16_t rpc_version, - dbd_register_ctld_msg_t **msg, Buf buffer) +slurmdbd_unpack_register_ctld_msg(dbd_register_ctld_msg_t **msg, + uint16_t rpc_version, Buf buffer) { uint32_t uint32_tmp; dbd_register_ctld_msg_t *msg_ptr = xmalloc( @@ -3449,8 +3498,8 @@ unpack_error: } void inline -slurmdbd_pack_roll_usage_msg(uint16_t rpc_version, - dbd_roll_usage_msg_t *msg, Buf buffer) +slurmdbd_pack_roll_usage_msg(dbd_roll_usage_msg_t *msg, + uint16_t rpc_version, Buf buffer) { if(rpc_version >= 5) { pack16(msg->archive_data, buffer); @@ -3460,8 +3509,8 @@ slurmdbd_pack_roll_usage_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_roll_usage_msg(uint16_t rpc_version, - dbd_roll_usage_msg_t **msg, Buf buffer) +slurmdbd_unpack_roll_usage_msg(dbd_roll_usage_msg_t **msg, + uint16_t rpc_version, Buf buffer) { dbd_roll_usage_msg_t *msg_ptr = xmalloc(sizeof(dbd_roll_usage_msg_t)); @@ -3481,8 +3530,8 @@ unpack_error: } void inline -slurmdbd_pack_step_complete_msg(uint16_t rpc_version, - dbd_step_comp_msg_t *msg, Buf buffer) +slurmdbd_pack_step_complete_msg(dbd_step_comp_msg_t *msg, + uint16_t rpc_version, Buf buffer) { pack32(msg->assoc_id, buffer); pack32(msg->db_index, buffer); @@ -3499,8 +3548,8 @@ slurmdbd_pack_step_complete_msg(uint16_t rpc_version, } int inline -slurmdbd_unpack_step_complete_msg(uint16_t rpc_version, - dbd_step_comp_msg_t **msg, Buf buffer) +slurmdbd_unpack_step_complete_msg(dbd_step_comp_msg_t **msg, + uint16_t rpc_version, Buf buffer) { dbd_step_comp_msg_t *msg_ptr = xmalloc(sizeof(dbd_step_comp_msg_t)); *msg = msg_ptr; @@ -3525,7 +3574,7 @@ unpack_error: } void inline -slurmdbd_pack_step_start_msg(uint16_t rpc_version, dbd_step_start_msg_t *msg, +slurmdbd_pack_step_start_msg(dbd_step_start_msg_t *msg, uint16_t rpc_version, Buf buffer) { if(rpc_version >= 5) { @@ -3546,8 +3595,8 @@ slurmdbd_pack_step_start_msg(uint16_t rpc_version, dbd_step_start_msg_t *msg, } int inline -slurmdbd_unpack_step_start_msg(uint16_t rpc_version, - dbd_step_start_msg_t **msg, Buf buffer) +slurmdbd_unpack_step_start_msg(dbd_step_start_msg_t **msg, + uint16_t rpc_version, Buf buffer) { uint32_t uint32_tmp; dbd_step_start_msg_t *msg_ptr = xmalloc(sizeof(dbd_step_start_msg_t)); @@ -3576,9 +3625,10 @@ unpack_error: return SLURM_ERROR; } -void inline slurmdbd_pack_usage_msg(uint16_t rpc_version, +void inline slurmdbd_pack_usage_msg(dbd_usage_msg_t *msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_usage_msg_t *msg, Buf buffer) + Buf buffer) { void (*my_rec) (void *object, uint16_t rpc_version, Buf buffer); @@ -3605,9 +3655,10 @@ void inline slurmdbd_pack_usage_msg(uint16_t rpc_version, pack_time(msg->end, buffer); } -int inline slurmdbd_unpack_usage_msg(uint16_t rpc_version, +int inline slurmdbd_unpack_usage_msg(dbd_usage_msg_t **msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_usage_msg_t **msg, Buf buffer) + Buf buffer) { dbd_usage_msg_t *msg_ptr = NULL; int (*my_rec) (void **object, uint16_t rpc_version, Buf buffer); @@ -3643,7 +3694,7 @@ int inline slurmdbd_unpack_usage_msg(uint16_t rpc_version, return SLURM_SUCCESS; unpack_error: - slurmdbd_free_usage_msg(type, msg_ptr); + slurmdbd_free_usage_msg(msg_ptr, type); *msg = NULL; return SLURM_ERROR; } diff --git a/src/common/slurmdbd_defs.h b/src/common/slurmdbd_defs.h index 4dd87d2acf7..72fdf33a705 100644 --- a/src/common/slurmdbd_defs.h +++ b/src/common/slurmdbd_defs.h @@ -161,6 +161,8 @@ typedef enum { DBD_GOT_PROBS, /* Response to DBD_GET_PROBS */ DBD_GET_EVENTS, /* Get event information */ DBD_GOT_EVENTS, /* Response to DBD_GET_EVENTS */ + DBD_SEND_MULT_JOB_START,/* Send multiple job starts */ + DBD_GOT_MULT_JOB_START /* Get response to DBD_SEND_MULT_JOB_START */ } slurmdbd_msg_type_t; /*****************************************************************************\ @@ -276,6 +278,7 @@ typedef struct dbd_job_start_msg { /* returns a uint32_t along with a return code */ typedef struct dbd_id_rc_msg { + uint32_t job_id; uint32_t id; uint32_t return_code; } dbd_id_rc_msg_t; @@ -403,11 +406,12 @@ extern int slurm_send_recv_slurmdbd_msg(uint16_t rpc_version, * The RPC will not be queued if an error occurs. * Returns SLURM_SUCCESS or an error code */ extern int slurm_send_slurmdbd_recv_rc_msg(uint16_t rpc_version, - slurmdbd_msg_t *req, int *rc); + slurmdbd_msg_t *req, + int *rc); -extern Buf pack_slurmdbd_msg(uint16_t rpc_version, slurmdbd_msg_t *req); -extern int unpack_slurmdbd_msg(uint16_t rpc_version, - slurmdbd_msg_t *resp, Buf buffer); +extern Buf pack_slurmdbd_msg(slurmdbd_msg_t *req, uint16_t rpc_version); +extern int unpack_slurmdbd_msg(slurmdbd_msg_t *resp, + uint16_t rpc_version, Buf buffer); extern slurmdbd_msg_type_t str_2_slurmdbd_msg_type(char *msg_type); extern char *slurmdbd_msg_type_2_str(slurmdbd_msg_type_t msg_type, @@ -418,148 +422,152 @@ extern char *slurmdbd_msg_type_2_str(slurmdbd_msg_type_t msg_type, \*****************************************************************************/ void inline slurmdbd_free_acct_coord_msg(dbd_acct_coord_msg_t *msg); void inline slurmdbd_free_cluster_cpus_msg(dbd_cluster_cpus_msg_t *msg); -void inline slurmdbd_free_rec_msg(slurmdbd_msg_type_t type, - dbd_rec_msg_t *msg); -void inline slurmdbd_free_cond_msg(slurmdbd_msg_type_t type, - dbd_cond_msg_t *msg); +void inline slurmdbd_free_rec_msg(dbd_rec_msg_t *msg, slurmdbd_msg_type_t type); +void inline slurmdbd_free_cond_msg(dbd_cond_msg_t *msg, + slurmdbd_msg_type_t type); void inline slurmdbd_free_get_jobs_msg(dbd_get_jobs_msg_t *msg); void inline slurmdbd_free_init_msg(dbd_init_msg_t *msg); void inline slurmdbd_free_fini_msg(dbd_fini_msg_t *msg); void inline slurmdbd_free_job_complete_msg(dbd_job_comp_msg_t *msg); -void inline slurmdbd_free_job_start_msg(dbd_job_start_msg_t *msg); -void inline slurmdbd_free_id_rc_msg(dbd_id_rc_msg_t *msg); +void inline slurmdbd_free_job_start_msg(void *in); +void inline slurmdbd_free_id_rc_msg(void *in); void inline slurmdbd_free_job_suspend_msg(dbd_job_suspend_msg_t *msg); void inline slurmdbd_free_list_msg(dbd_list_msg_t *msg); -void inline slurmdbd_free_modify_msg(slurmdbd_msg_type_t type, - dbd_modify_msg_t *msg); +void inline slurmdbd_free_modify_msg(dbd_modify_msg_t *msg, + slurmdbd_msg_type_t type); void inline slurmdbd_free_node_state_msg(dbd_node_state_msg_t *msg); void inline slurmdbd_free_rc_msg(dbd_rc_msg_t *msg); void inline slurmdbd_free_register_ctld_msg(dbd_register_ctld_msg_t *msg); void inline slurmdbd_free_roll_usage_msg(dbd_roll_usage_msg_t *msg); void inline slurmdbd_free_step_complete_msg(dbd_step_comp_msg_t *msg); void inline slurmdbd_free_step_start_msg(dbd_step_start_msg_t *msg); -void inline slurmdbd_free_usage_msg(slurmdbd_msg_type_t type, - dbd_usage_msg_t *msg); +void inline slurmdbd_free_usage_msg(dbd_usage_msg_t *msg, + slurmdbd_msg_type_t type); /*****************************************************************************\ * Pack various SlurmDBD message structures into a buffer \*****************************************************************************/ -void inline slurmdbd_pack_acct_coord_msg(uint16_t rpc_version, - dbd_acct_coord_msg_t *msg, +void inline slurmdbd_pack_acct_coord_msg(dbd_acct_coord_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_cluster_cpus_msg(uint16_t rpc_version, - dbd_cluster_cpus_msg_t *msg, +void inline slurmdbd_pack_cluster_cpus_msg(dbd_cluster_cpus_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_rec_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_rec_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_cond_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_cond_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_get_jobs_msg(uint16_t rpc_version, - dbd_get_jobs_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_init_msg(uint16_t rpc_version, - dbd_init_msg_t *msg, Buf buffer, - char *auth_info); -void inline slurmdbd_pack_fini_msg(uint16_t rpc_version, - dbd_fini_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_job_complete_msg(uint16_t rpc_version, - dbd_job_comp_msg_t *msg, +void inline slurmdbd_pack_rec_msg(dbd_rec_msg_t *msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer); +void inline slurmdbd_pack_cond_msg(dbd_cond_msg_t *msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer); +void inline slurmdbd_pack_get_jobs_msg(dbd_get_jobs_msg_t *msg, + uint16_t rpc_version, Buf buffer); +void inline slurmdbd_pack_init_msg(dbd_init_msg_t *msg, uint16_t rpc_version, + Buf buffer, char *auth_info); +void inline slurmdbd_pack_fini_msg(dbd_fini_msg_t *msg, + uint16_t rpc_version, Buf buffer); +void inline slurmdbd_pack_job_complete_msg(dbd_job_comp_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_job_start_msg(uint16_t rpc_version, - dbd_job_start_msg_t *msg, +void inline slurmdbd_pack_job_start_msg(void *in, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_id_rc_msg(uint16_t rpc_version, - dbd_id_rc_msg_t *msg, +void inline slurmdbd_pack_id_rc_msg(void *in, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_job_suspend_msg(uint16_t rpc_version, - dbd_job_suspend_msg_t *msg, +void inline slurmdbd_pack_job_suspend_msg(dbd_job_suspend_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_list_msg(uint16_t rpc_version, +void inline slurmdbd_pack_list_msg(dbd_list_msg_t *msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_list_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_modify_msg(uint16_t rpc_version, + Buf buffer); +void inline slurmdbd_pack_modify_msg(dbd_modify_msg_t *msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_modify_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_node_state_msg(uint16_t rpc_version, - dbd_node_state_msg_t *msg, + Buf buffer); +void inline slurmdbd_pack_node_state_msg(dbd_node_state_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_rc_msg(uint16_t rpc_version, - dbd_rc_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_register_ctld_msg(uint16_t rpc_version, - dbd_register_ctld_msg_t *msg, +void inline slurmdbd_pack_rc_msg(dbd_rc_msg_t *msg, + uint16_t rpc_version, Buf buffer); +void inline slurmdbd_pack_register_ctld_msg(dbd_register_ctld_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_roll_usage_msg(uint16_t rpc_version, - dbd_roll_usage_msg_t *msg, Buf buffer); -void inline slurmdbd_pack_step_complete_msg(uint16_t rpc_version, - dbd_step_comp_msg_t *msg, +void inline slurmdbd_pack_roll_usage_msg(dbd_roll_usage_msg_t *msg, + uint16_t rpc_version, Buf buffer); +void inline slurmdbd_pack_step_complete_msg(dbd_step_comp_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_step_start_msg(uint16_t rpc_version, - dbd_step_start_msg_t *msg, +void inline slurmdbd_pack_step_start_msg(dbd_step_start_msg_t *msg, + uint16_t rpc_version, Buf buffer); -void inline slurmdbd_pack_usage_msg(uint16_t rpc_version, +void inline slurmdbd_pack_usage_msg(dbd_usage_msg_t *msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_usage_msg_t *msg, Buf buffer); + Buf buffer); /*****************************************************************************\ * Unpack various SlurmDBD message structures from a buffer \*****************************************************************************/ -int inline slurmdbd_unpack_acct_coord_msg(uint16_t rpc_version, - dbd_acct_coord_msg_t **msg, +int inline slurmdbd_unpack_acct_coord_msg(dbd_acct_coord_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_cluster_cpus_msg(uint16_t rpc_version, - dbd_cluster_cpus_msg_t **msg, +int inline slurmdbd_unpack_cluster_cpus_msg(dbd_cluster_cpus_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_rec_msg(uint16_t rpc_version, +int inline slurmdbd_unpack_rec_msg(dbd_rec_msg_t **msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_rec_msg_t **msg, Buf buffer); -int inline slurmdbd_unpack_cond_msg(uint16_t rpc_version, - slurmdbd_msg_type_t type, - dbd_cond_msg_t **msg, Buf buffer); -int inline slurmdbd_unpack_get_jobs_msg(uint16_t rpc_version, - dbd_get_jobs_msg_t **msg, Buf buffer); + Buf buffer); +int inline slurmdbd_unpack_cond_msg(dbd_cond_msg_t **msg, + uint16_t rpc_version, + slurmdbd_msg_type_t type, Buf buffer); +int inline slurmdbd_unpack_get_jobs_msg(dbd_get_jobs_msg_t **msg, + uint16_t rpc_version, Buf buffer); int inline slurmdbd_unpack_init_msg(dbd_init_msg_t **msg, Buf buffer, char *auth_info); -int inline slurmdbd_unpack_fini_msg(uint16_t rpc_version, - dbd_fini_msg_t **msg, Buf buffer); -int inline slurmdbd_unpack_job_complete_msg(uint16_t rpc_version, - dbd_job_comp_msg_t **msg, +int inline slurmdbd_unpack_fini_msg(dbd_fini_msg_t **msg, + uint16_t rpc_version, Buf buffer); +int inline slurmdbd_unpack_job_complete_msg(dbd_job_comp_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_job_start_msg(uint16_t rpc_version, - dbd_job_start_msg_t **msg, +int inline slurmdbd_unpack_job_start_msg(void **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_id_rc_msg(uint16_t rpc_version, - dbd_id_rc_msg_t **msg, +int inline slurmdbd_unpack_id_rc_msg(void **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_job_suspend_msg(uint16_t rpc_version, - dbd_job_suspend_msg_t **msg, +int inline slurmdbd_unpack_job_suspend_msg(dbd_job_suspend_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_list_msg(uint16_t rpc_version, +int inline slurmdbd_unpack_list_msg(dbd_list_msg_t **msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_list_msg_t **msg, Buf buffer); -int inline slurmdbd_unpack_modify_msg(uint16_t rpc_version, + Buf buffer); +int inline slurmdbd_unpack_modify_msg(dbd_modify_msg_t **msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_modify_msg_t **msg, Buf buffer); -int inline slurmdbd_unpack_node_state_msg(uint16_t rpc_version, - dbd_node_state_msg_t **msg, + Buf buffer); +int inline slurmdbd_unpack_node_state_msg(dbd_node_state_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_rc_msg(uint16_t rpc_version, - dbd_rc_msg_t **msg, Buf buffer); -int inline slurmdbd_unpack_register_ctld_msg(uint16_t rpc_version, - dbd_register_ctld_msg_t **msg, +int inline slurmdbd_unpack_rc_msg(dbd_rc_msg_t **msg, + uint16_t rpc_version, Buf buffer); +int inline slurmdbd_unpack_register_ctld_msg(dbd_register_ctld_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_roll_usage_msg(uint16_t rpc_version, - dbd_roll_usage_msg_t **msg, +int inline slurmdbd_unpack_roll_usage_msg(dbd_roll_usage_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_step_complete_msg(uint16_t rpc_version, - dbd_step_comp_msg_t **msg, +int inline slurmdbd_unpack_step_complete_msg(dbd_step_comp_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_step_start_msg(uint16_t rpc_version, - dbd_step_start_msg_t **msg, +int inline slurmdbd_unpack_step_start_msg(dbd_step_start_msg_t **msg, + uint16_t rpc_version, Buf buffer); -int inline slurmdbd_unpack_usage_msg(uint16_t rpc_version, +int inline slurmdbd_unpack_usage_msg(dbd_usage_msg_t **msg, + uint16_t rpc_version, slurmdbd_msg_type_t type, - dbd_usage_msg_t **msg, Buf buffer); #endif /* !_SLURMDBD_DEFS_H */ diff --git a/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c b/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c index 460766cd5d6..902d4cb5057 100644 --- a/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c +++ b/src/plugins/accounting_storage/slurmdbd/accounting_storage_slurmdbd.c @@ -61,12 +61,14 @@ #include "src/common/uid.h" #include "src/common/xstring.h" #include "src/slurmctld/slurmctld.h" +#include "src/slurmctld/locks.h" /* These are defined here so when we link with something other than * the slurmctld we will have these symbols defined. They will get * overwritten when linking with the slurmctld. */ slurm_ctl_conf_t slurmctld_conf; +List job_list = NULL; /* * These variables are required by the generic plugin interface. If they @@ -103,6 +105,196 @@ const uint32_t plugin_version = 100; static char *slurmdbd_auth_info = NULL; +static pthread_t db_inx_handler_thread; +static pthread_t cleanup_handler_thread; +static pthread_mutex_t db_inx_lock = PTHREAD_MUTEX_INITIALIZER; +static bool running_db_inx = 0; + +extern int jobacct_storage_p_job_start(void *db_conn, + struct job_record *job_ptr); + +static int _setup_job_start_msg(dbd_job_start_msg_t *req, + struct job_record *job_ptr) +{ + char *block_id = NULL; + char temp_bit[BUF_SIZE]; + + if (!job_ptr->details || !job_ptr->details->submit_time) { + error("jobacct_storage_p_job_start: " + "Not inputing this job %u, it has no submit time.", + job_ptr->job_id); + return SLURM_ERROR; + } + memset(req, 0, sizeof(dbd_job_start_msg_t)); + + req->alloc_cpus = job_ptr->total_cpus; + req->account = job_ptr->account; + req->assoc_id = job_ptr->assoc_id; +#ifdef HAVE_BG + select_g_select_jobinfo_get(job_ptr->select_jobinfo, + SELECT_JOBDATA_BLOCK_ID, + &block_id); + select_g_select_jobinfo_get(job_ptr->select_jobinfo, + SELECT_JOBDATA_NODE_CNT, + &req->alloc_nodes); +#else + req->alloc_nodes = job_ptr->node_cnt; +#endif + req->block_id = block_id; + if (job_ptr->resize_time) { + req->eligible_time = job_ptr->resize_time; + req->submit_time = job_ptr->resize_time; + } else if (job_ptr->details) { + req->eligible_time = job_ptr->details->begin_time; + req->submit_time = job_ptr->details->submit_time; + } + + req->start_time = job_ptr->start_time; + req->gid = job_ptr->group_id; + req->job_id = job_ptr->job_id; + + req->db_index = job_ptr->db_index; + + req->job_state = job_ptr->job_state; + req->name = job_ptr->name; + req->nodes = job_ptr->nodes; + if(job_ptr->node_bitmap) { + req->node_inx = bit_fmt(temp_bit, sizeof(temp_bit), + job_ptr->node_bitmap); + } + + req->partition = job_ptr->partition; + if (job_ptr->details) + req->req_cpus = job_ptr->details->min_cpus; + req->resv_id = job_ptr->resv_id; + req->priority = job_ptr->priority; + req->timelimit = job_ptr->time_limit; + req->wckey = job_ptr->wckey; + req->uid = job_ptr->user_id; + + return SLURM_SUCCESS; +} + + +static void *_set_db_inx_thread(void *no_data) +{ + struct job_record *job_ptr = NULL; + ListIterator itr; + /* Read lock on jobs */ + slurmctld_lock_t job_read_lock = + { NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK }; + /* Write lock on jobs */ + slurmctld_lock_t job_write_lock = + { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK }; + /* DEF_TIMERS; */ + + (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + (void) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + while(1) { + List local_job_list = NULL; + /* START_TIMER; */ + /* info("starting db_thread"); */ + slurm_mutex_lock(&db_inx_lock); + /* info("in lock db_thread"); */ + running_db_inx = 1; + if(!job_list) { + error("No job list, exitting"); + break; + } + /* Here we have off loaded starting + * jobs in the database out of band + * from the job submission. This + * is can make submitting jobs much + * faster and not lock up the + * controller waiting for the db inx + * back from the database. */ + lock_slurmctld(job_read_lock); + itr = list_iterator_create(job_list); + while((job_ptr = list_next(itr))) { + if(!job_ptr->db_index) { + dbd_job_start_msg_t *req = + xmalloc(sizeof(dbd_job_start_msg_t)); + if(_setup_job_start_msg(req, job_ptr) + != SLURM_SUCCESS) { + xfree(req); + continue; + } + /* we only want to destory the pointer + here not the contents so just do an + xfree on it. + */ + if(!local_job_list) + local_job_list = list_create( + slurm_destroy_char); + list_append(local_job_list, req); + } + } + list_iterator_destroy(itr); + unlock_slurmctld(job_read_lock); + + if(local_job_list) { + slurmdbd_msg_t req, resp; + dbd_list_msg_t send_msg, *got_msg; + int rc = SLURM_SUCCESS; + + memset(&send_msg, 0, sizeof(dbd_list_msg_t)); + + send_msg.my_list = local_job_list; + + req.msg_type = DBD_SEND_MULT_JOB_START; + req.data = &send_msg; + rc = slurm_send_recv_slurmdbd_msg( + SLURMDBD_VERSION, &req, &resp); + list_destroy(local_job_list); + if (rc != SLURM_SUCCESS) + error("slurmdbd: DBD_SEND_MULT_JOB_START " + "failure: %m"); + else if (resp.msg_type == DBD_RC) { + dbd_rc_msg_t *msg = resp.data; + if(msg->return_code == SLURM_SUCCESS) { + info("%s", msg->comment); + } else + error("%s", msg->comment); + slurmdbd_free_rc_msg(msg); + } else if (resp.msg_type != DBD_GOT_MULT_JOB_START) { + error("slurmdbd: response type not " + "DBD_GOT_MULT_JOB_START: %u", + resp.msg_type); + } else { + dbd_id_rc_msg_t *id_ptr = NULL; + got_msg = (dbd_list_msg_t *) resp.data; + + lock_slurmctld(job_write_lock); + itr = list_iterator_create(got_msg->my_list); + while((id_ptr = list_next(itr))) { + if((job_ptr = find_job_record( + id_ptr->job_id))) + job_ptr->db_index = id_ptr->id; + } + list_iterator_destroy(itr); + unlock_slurmctld(job_write_lock); + + slurmdbd_free_list_msg(got_msg); + } + } + + running_db_inx = 0; + slurm_mutex_unlock(&db_inx_lock); + /* END_TIMER; */ + /* info("set all db_inx's in %s", TIME_STR); */ + sleep(30); + } + + return NULL; +} + +static void *_cleanup_thread(void *no_data) +{ + pthread_join(db_inx_handler_thread, NULL); + return NULL; +} + /* * init() is called when the plugin is loaded, before any other functions * are called. Put global initialization here. @@ -122,6 +314,29 @@ extern int init ( void ) slurmdbd_auth_info = slurm_get_accounting_storage_pass(); verbose("%s loaded with AuthInfo=%s", plugin_name, slurmdbd_auth_info); + + if(job_list) { + /* only do this when job_list is defined + (in the slurmctld) + */ + pthread_attr_t thread_attr; + slurm_attr_init(&thread_attr); + if (pthread_create(&db_inx_handler_thread, &thread_attr, + _set_db_inx_thread, NULL)) + fatal("pthread_create error %m"); + + /* This is here to join the db inx thread so + we don't core dump if in the sleep, since + there is no other place to join we have to + create another thread to do it. + */ + slurm_attr_init(&thread_attr); + if (pthread_create(&cleanup_handler_thread, + &thread_attr, + _cleanup_thread, NULL)) + fatal("pthread_create error %m"); + slurm_attr_destroy(&thread_attr); + } first = 0; } else { debug4("%s loaded", plugin_name); @@ -132,6 +347,19 @@ extern int init ( void ) extern int fini ( void ) { + if(running_db_inx) + debug("Waiting for db_inx thread to finish."); + + slurm_mutex_lock(&db_inx_lock); + + /* cancel the db_inx thread and then join the cleanup thread */ + if(db_inx_handler_thread) + pthread_cancel(db_inx_handler_thread); + if(cleanup_handler_thread) + pthread_join(cleanup_handler_thread, NULL); + + slurm_mutex_unlock(&db_inx_lock); + xfree(slurmdbd_auth_info); return SLURM_SUCCESS; @@ -1455,7 +1683,7 @@ extern int acct_storage_p_get_usage(void *db_conn, uid_t uid, break; } - slurmdbd_free_usage_msg(resp.msg_type, got_msg); + slurmdbd_free_usage_msg(got_msg, resp.msg_type); } return rc; @@ -1595,61 +1823,10 @@ extern int jobacct_storage_p_job_start(void *db_conn, slurmdbd_msg_t msg, msg_rc; dbd_job_start_msg_t req; dbd_id_rc_msg_t *resp; - char *block_id = NULL; int rc = SLURM_SUCCESS; - char temp_bit[BUF_SIZE]; - if (!job_ptr->details || !job_ptr->details->submit_time) { - error("jobacct_storage_p_job_start: " - "Not inputing this job, it has no submit time."); - return SLURM_ERROR; - } - memset(&req, 0, sizeof(dbd_job_start_msg_t)); - - req.alloc_cpus = job_ptr->total_cpus; - req.account = job_ptr->account; - req.assoc_id = job_ptr->assoc_id; -#ifdef HAVE_BG - select_g_select_jobinfo_get(job_ptr->select_jobinfo, - SELECT_JOBDATA_BLOCK_ID, - &block_id); - select_g_select_jobinfo_get(job_ptr->select_jobinfo, - SELECT_JOBDATA_NODE_CNT, - &req.alloc_nodes); -#else - req.alloc_nodes = job_ptr->node_cnt; -#endif - req.block_id = block_id; - if (job_ptr->resize_time) { - req.eligible_time = job_ptr->resize_time; - req.submit_time = job_ptr->resize_time; - } else if (job_ptr->details) { - req.eligible_time = job_ptr->details->begin_time; - req.submit_time = job_ptr->details->submit_time; - } - - req.start_time = job_ptr->start_time; - req.gid = job_ptr->group_id; - req.job_id = job_ptr->job_id; - - req.db_index = job_ptr->db_index; - - req.job_state = job_ptr->job_state; - req.name = job_ptr->name; - req.nodes = job_ptr->nodes; - if(job_ptr->node_bitmap) { - req.node_inx = bit_fmt(temp_bit, sizeof(temp_bit), - job_ptr->node_bitmap); - } - - req.partition = job_ptr->partition; - if (job_ptr->details) - req.req_cpus = job_ptr->details->min_cpus; - req.resv_id = job_ptr->resv_id; - req.priority = job_ptr->priority; - req.timelimit = job_ptr->time_limit; - req.wckey = job_ptr->wckey; - req.uid = job_ptr->user_id; + if((rc = _setup_job_start_msg(&req, job_ptr)) != SLURM_SUCCESS) + return rc; msg.msg_type = DBD_JOB_START; msg.data = &req; @@ -1659,10 +1836,10 @@ extern int jobacct_storage_p_job_start(void *db_conn, */ if(req.db_index && !IS_JOB_RESIZING(job_ptr)) { if (slurm_send_slurmdbd_msg(SLURMDBD_VERSION, &msg) < 0) { - xfree(block_id); + xfree(req.block_id); return SLURM_ERROR; } - xfree(block_id); + xfree(req.block_id); return SLURM_SUCCESS; } @@ -1672,7 +1849,7 @@ extern int jobacct_storage_p_job_start(void *db_conn, rc = slurm_send_recv_slurmdbd_msg(SLURMDBD_VERSION, &msg, &msg_rc); if (rc != SLURM_SUCCESS) { if (slurm_send_slurmdbd_msg(SLURMDBD_VERSION, &msg) < 0) { - xfree(block_id); + xfree(req.block_id); return SLURM_ERROR; } } else if (msg_rc.msg_type != DBD_ID_RC) { @@ -1685,7 +1862,7 @@ extern int jobacct_storage_p_job_start(void *db_conn, //info("here got %d for return code", resp->return_code); slurmdbd_free_id_rc_msg(resp); } - xfree(block_id); + xfree(req.block_id); return rc; } diff --git a/src/plugins/priority/multifactor/priority_multifactor.c b/src/plugins/priority/multifactor/priority_multifactor.c index 7c21fa8dc8d..739ad5ab30d 100644 --- a/src/plugins/priority/multifactor/priority_multifactor.c +++ b/src/plugins/priority/multifactor/priority_multifactor.c @@ -629,13 +629,13 @@ static void *_decay_thread(void *no_data) double decay_factor = 1; uint16_t reset_period = slurm_get_priority_reset_period(); - if(decay_hl > 0) - decay_factor = 1 - (0.693 / decay_hl); - /* Write lock on jobs, read lock on nodes and partitions */ slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK }; + if(decay_hl > 0) + decay_factor = 1 - (0.693 / decay_hl); + (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); (void) pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); diff --git a/src/slurmctld/controller.c b/src/slurmctld/controller.c index 8cb58c8ccfd..4f512ab2814 100644 --- a/src/slurmctld/controller.c +++ b/src/slurmctld/controller.c @@ -157,6 +157,7 @@ int accounting_enforce = 0; int association_based_accounting = 0; bool ping_nodes_now = false; int cluster_cpus = 0; +int with_slurmdbd = 0; /* Local variables */ static int daemonize = DEFAULT_DAEMONIZE; @@ -317,6 +318,13 @@ int main(int argc, char *argv[]) association_based_accounting = slurm_get_is_association_based_accounting(); accounting_enforce = slurmctld_conf.accounting_storage_enforce; + if(!strcasecmp(slurmctld_conf.accounting_storage_type, + "accounting_storage/slurmdbd")) { + with_slurmdbd = 1; + /* we need job_list not to be NULL */ + init_job_conf(); + + } if (accounting_enforce && !association_based_accounting) { slurm_ctl_conf_t *conf = slurm_conf_lock(); @@ -556,6 +564,7 @@ int main(int argc, char *argv[]) /* Save any pending state save RPCs */ acct_storage_g_close_connection(&acct_db_conn); + slurm_acct_storage_fini(); /* join the power save thread after saving all state * since it could wait a while waiting for spawned @@ -618,7 +627,6 @@ int main(int argc, char *argv[]) job_submit_plugin_fini(); slurm_preempt_fini(); g_slurm_jobcomp_fini(); - slurm_acct_storage_fini(); slurm_jobacct_gather_fini(); slurm_select_fini(); slurm_topo_fini(); diff --git a/src/slurmctld/job_mgr.c b/src/slurmctld/job_mgr.c index d7059a40a7c..fd65a2d9724 100644 --- a/src/slurmctld/job_mgr.c +++ b/src/slurmctld/job_mgr.c @@ -383,7 +383,6 @@ int dump_all_job_state(void) /* write header: version, time */ packstr(JOB_STATE_VERSION, buffer); pack_time(time(NULL), buffer); - /* * write header: job id * This is needed so that the job id remains persistent even after @@ -1229,8 +1228,7 @@ static int _load_job_state(Buf buffer, uint16_t protocol_version) if(job_ptr->assoc_id && !job_ptr->db_index && job_ptr->nodes) { debug("starting job %u in accounting", job_ptr->job_id); - jobacct_storage_g_job_start(acct_db_conn, - job_ptr); + jobacct_storage_g_job_start(acct_db_conn, job_ptr); if (IS_JOB_SUSPENDED(job_ptr)) { jobacct_storage_g_job_suspend(acct_db_conn, job_ptr); @@ -7123,13 +7121,6 @@ extern bool job_independent(struct job_record *job_ptr, int will_run) xfree(job_ptr->state_desc); send_acct_rec = true; } - if (send_acct_rec && !will_run) { - /* We want to record when a job becomes eligible in - * order to calculate reserved time (a measure of - * system over-subscription), job really is not - * starting now */ - jobacct_storage_g_job_start(acct_db_conn, job_ptr); - } return true; } else if (rc == 1) { job_ptr->state_reason = WAIT_DEPENDENCY; diff --git a/src/slurmctld/job_scheduler.c b/src/slurmctld/job_scheduler.c index 34fbefe478f..3551e95c1cb 100644 --- a/src/slurmctld/job_scheduler.c +++ b/src/slurmctld/job_scheduler.c @@ -344,6 +344,7 @@ extern int schedule(void) debug3("sched: Processing job queue..."); for (i = 0; i < job_queue_size; i++) { job_ptr = job_queue[i].job_ptr; + if (job_ptr->priority == 0) { /* held */ debug3("sched: JobId=%u. State=%s. Reason=%s. " "Priority=%u.", @@ -400,7 +401,7 @@ extern int schedule(void) * disabled between when the job was submitted and * the time we consider running it. It should be * very rare. */ - info("sched: JobId=%u has invalid account", + info("sched: JobId=%u has invalid account", job_ptr->job_id); last_job_update = time(NULL); job_ptr->job_state = JOB_FAILED; diff --git a/src/slurmctld/node_scheduler.c b/src/slurmctld/node_scheduler.c index ad9ac24317b..c8152b4f2f2 100644 --- a/src/slurmctld/node_scheduler.c +++ b/src/slurmctld/node_scheduler.c @@ -1247,7 +1247,10 @@ extern int select_nodes(struct job_record *job_ptr, bool test_only, acct_policy_job_begin(job_ptr); - jobacct_storage_g_job_start(acct_db_conn, job_ptr); + /* If ran with slurmdbd this is handled out of band in the job */ + if(!with_slurmdbd) + jobacct_storage_g_job_start(acct_db_conn, job_ptr); + prolog_slurmctld(job_ptr); slurm_sched_newalloc(job_ptr); diff --git a/src/slurmctld/slurmctld.h b/src/slurmctld/slurmctld.h index 43ff280d9e3..2402686a6cd 100644 --- a/src/slurmctld/slurmctld.h +++ b/src/slurmctld/slurmctld.h @@ -157,6 +157,7 @@ extern void *acct_db_conn; extern int accounting_enforce; extern int association_based_accounting; extern int cluster_cpus; +extern int with_slurmdbd; /*****************************************************************************\ * NODE parameters and data structures, mostly in src/common/node_conf.h diff --git a/src/slurmdbd/proc_req.c b/src/slurmdbd/proc_req.c index 0f6c2b96eb9..c9972cbb188 100644 --- a/src/slurmdbd/proc_req.c +++ b/src/slurmdbd/proc_req.c @@ -129,6 +129,9 @@ static int _modify_reservation(slurmdbd_conn_t *slurmdbd_conn, static int _node_state(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); static char *_node_state_string(uint16_t node_state); +static void _process_job_start(slurmdbd_conn_t *slurmdbd_conn, + dbd_job_start_msg_t *job_start_msg, + dbd_id_rc_msg_t *id_rc_msg); static int _register_ctld(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); static int _remove_accounts(slurmdbd_conn_t *slurmdbd_conn, @@ -150,6 +153,9 @@ static int _remove_reservation(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); static int _roll_usage(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); +static int _send_mult_job_start(slurmdbd_conn_t *slurmdbd_conn, + Buf in_buffer, Buf *out_buffer, + uint32_t *uid); static int _step_complete(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid); static int _step_start(slurmdbd_conn_t *slurmdbd_conn, @@ -390,6 +396,10 @@ proc_req(slurmdbd_conn_t *slurmdbd_conn, rc = _roll_usage(slurmdbd_conn, in_buffer, out_buffer, uid); break; + case DBD_SEND_MULT_JOB_START: + rc = _send_mult_job_start(slurmdbd_conn, + in_buffer, out_buffer, uid); + break; case DBD_STEP_COMPLETE: rc = _step_complete(slurmdbd_conn, in_buffer, out_buffer, uid); @@ -474,8 +484,8 @@ static int _add_accounts(slurmdbd_conn_t *slurmdbd_conn, */ } - if (slurmdbd_unpack_list_msg(slurmdbd_conn->rpc_version, - DBD_ADD_ACCOUNTS, &get_msg, in_buffer) != + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ADD_ACCOUNTS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_ACCOUNTS message"; error("%s", comment); @@ -498,8 +508,8 @@ static int _add_account_coords(slurmdbd_conn_t *slurmdbd_conn, dbd_acct_coord_msg_t *get_msg = NULL; char *comment = NULL; - if (slurmdbd_unpack_acct_coord_msg(slurmdbd_conn->rpc_version, - &get_msg, in_buffer) != + if (slurmdbd_unpack_acct_coord_msg(&get_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_ACCOUNT_COORDS message"; error("%s", comment); @@ -576,8 +586,8 @@ static int _add_assocs(slurmdbd_conn_t *slurmdbd_conn, debug2("DBD_ADD_ASSOCS: called"); - if (slurmdbd_unpack_list_msg(slurmdbd_conn->rpc_version, - DBD_ADD_ASSOCS, &get_msg, in_buffer) != + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ADD_ASSOCS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_ASSOCS message"; error("%s", comment); @@ -662,8 +672,8 @@ static int _add_clusters(slurmdbd_conn_t *slurmdbd_conn, goto end_it; } - if (slurmdbd_unpack_list_msg(slurmdbd_conn->rpc_version, - DBD_ADD_CLUSTERS, &get_msg, in_buffer) != + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ADD_CLUSTERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_CLUSTERS message"; error("%s", comment); @@ -700,8 +710,8 @@ static int _add_qos(slurmdbd_conn_t *slurmdbd_conn, goto end_it; } - if (slurmdbd_unpack_list_msg(slurmdbd_conn->rpc_version, - DBD_ADD_QOS, &get_msg, in_buffer) != + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ADD_QOS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_QOS message"; error("%s", comment); @@ -756,8 +766,8 @@ static int _add_users(slurmdbd_conn_t *slurmdbd_conn, */ } - if (slurmdbd_unpack_list_msg(slurmdbd_conn->rpc_version, - DBD_ADD_USERS, &get_msg, in_buffer) != + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ADD_USERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_USERS message"; error("%s", comment); @@ -792,8 +802,8 @@ static int _add_wckeys(slurmdbd_conn_t *slurmdbd_conn, goto end_it; } - if (slurmdbd_unpack_list_msg(slurmdbd_conn->rpc_version, - DBD_ADD_WCKEYS, &get_msg, in_buffer) != + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ADD_WCKEYS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_WCKEYS message"; error("%s", comment); @@ -824,8 +834,8 @@ static int _add_reservation(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_rec_msg(slurmdbd_conn->rpc_version, DBD_ADD_RESV, - &rec_msg, in_buffer) != SLURM_SUCCESS) { + if (slurmdbd_unpack_rec_msg(&rec_msg, slurmdbd_conn->rpc_version, + DBD_ADD_RESV, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ADD_RESV message"; error("%s", comment); rc = SLURM_ERROR; @@ -837,7 +847,7 @@ static int _add_reservation(slurmdbd_conn_t *slurmdbd_conn, rec_msg->rec); end_it: - slurmdbd_free_rec_msg(DBD_ADD_RESV, rec_msg); + slurmdbd_free_rec_msg(rec_msg, DBD_ADD_RESV); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_ADD_RESV); return rc; @@ -861,8 +871,8 @@ static int _archive_dump(slurmdbd_conn_t *slurmdbd_conn, goto end_it; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_ARCHIVE_DUMP, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_ARCHIVE_DUMP, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ARCHIVE_DUMP message"; error("%s", comment); @@ -890,7 +900,7 @@ static int _archive_dump(slurmdbd_conn_t *slurmdbd_conn, comment = "Error with request."; } end_it: - slurmdbd_free_cond_msg(DBD_ARCHIVE_DUMP, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_ARCHIVE_DUMP); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_ARCHIVE_DUMP); return rc; @@ -949,8 +959,8 @@ static int _cluster_cpus(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_cluster_cpus_msg(slurmdbd_conn->rpc_version, - &cluster_cpus_msg, in_buffer) != + if (slurmdbd_unpack_cluster_cpus_msg(&cluster_cpus_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_CLUSTER_CPUS message"; error("%s", comment); @@ -982,8 +992,8 @@ static int _get_accounts(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_ACCOUNTS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_ACCOUNTS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_ACCOUNTS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_ACCOUNTS message"; error("%s", comment); @@ -1000,8 +1010,8 @@ static int _get_accounts(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_ACCOUNTS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_ACCOUNTS, &list_msg, + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_ACCOUNTS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, @@ -1010,7 +1020,7 @@ static int _get_accounts(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_ACCOUNTS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_ACCOUNTS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1026,8 +1036,8 @@ static int _get_assocs(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_ASSOCS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_ASSOCS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_ASSOCS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_ASSOCS message"; error("%s", comment); @@ -1045,8 +1055,8 @@ static int _get_assocs(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_ASSOCS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_ASSOCS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_ASSOCS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1054,7 +1064,7 @@ static int _get_assocs(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_ASSOCS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_ASSOCS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1071,8 +1081,8 @@ static int _get_clusters(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_CLUSTERS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_CLUSTERS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_CLUSTERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_CLUSTERS message"; error("%s", comment); @@ -1090,8 +1100,8 @@ static int _get_clusters(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_CLUSTERS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_CLUSTERS, &list_msg, + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_CLUSTERS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, @@ -1100,7 +1110,7 @@ static int _get_clusters(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_CLUSTERS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_CLUSTERS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1119,8 +1129,8 @@ static int _get_config(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = dump_config(); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_CONFIG, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_CONFIG, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_CONFIG, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1136,8 +1146,8 @@ static int _get_events(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_EVENTS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_EVENTS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_EVENTS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_EVENTS message"; error("%s", comment); @@ -1155,8 +1165,8 @@ static int _get_events(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_EVENTS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_EVENTS, &list_msg, + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_EVENTS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, @@ -1165,7 +1175,7 @@ static int _get_events(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_EVENTS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_EVENTS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1183,8 +1193,8 @@ static int _get_jobs(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_JOBS: called"); - if (slurmdbd_unpack_get_jobs_msg(slurmdbd_conn->rpc_version, - &get_jobs_msg, in_buffer) != + if (slurmdbd_unpack_get_jobs_msg(&get_jobs_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_JOBS message"; error("%s", comment); @@ -1228,8 +1238,8 @@ static int _get_jobs(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_JOBS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_JOBS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_JOBS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1261,8 +1271,8 @@ static int _get_jobs_cond(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_JOBS_COND: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_JOBS_COND, &cond_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&cond_msg, slurmdbd_conn->rpc_version, + DBD_GET_JOBS_COND, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_JOBS_COND message"; error("%s", comment); @@ -1280,8 +1290,8 @@ static int _get_jobs_cond(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_JOBS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_JOBS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_JOBS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1289,7 +1299,7 @@ static int _get_jobs_cond(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_JOBS_COND, cond_msg); + slurmdbd_free_cond_msg(cond_msg, DBD_GET_JOBS_COND); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1318,8 +1328,8 @@ static int _get_probs(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_PROBS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_PROBS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_PROBS message"; error("%s", comment); @@ -1337,8 +1347,8 @@ static int _get_probs(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_PROBS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_PROBS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_PROBS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1346,7 +1356,7 @@ static int _get_probs(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_PROBS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_PROBS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1363,8 +1373,8 @@ static int _get_qos(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_QOS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_QOS, &cond_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&cond_msg, slurmdbd_conn->rpc_version, + DBD_GET_QOS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_QOS message"; error("%s", comment); @@ -1385,8 +1395,8 @@ static int _get_qos(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_QOS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_QOS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_QOS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1394,7 +1404,7 @@ static int _get_qos(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_QOS, cond_msg); + slurmdbd_free_cond_msg(cond_msg, DBD_GET_QOS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1410,8 +1420,8 @@ static int _get_txn(slurmdbd_conn_t *slurmdbd_conn, int rc = SLURM_SUCCESS; debug2("DBD_GET_TXN: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_TXN, &cond_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&cond_msg, slurmdbd_conn->rpc_version, + DBD_GET_TXN, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_TXN message"; error("%s", comment); @@ -1429,8 +1439,8 @@ static int _get_txn(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_TXN, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_TXN, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_TXN, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1438,7 +1448,7 @@ static int _get_txn(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_TXN, cond_msg); + slurmdbd_free_cond_msg(cond_msg, DBD_GET_TXN); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1457,8 +1467,8 @@ static int _get_usage(uint16_t type, slurmdbd_conn_t *slurmdbd_conn, info("DBD_GET_USAGE: called type is %s", slurmdbd_msg_type_2_str(type, 1)); - if (slurmdbd_unpack_usage_msg(slurmdbd_conn->rpc_version, - type, &get_msg, in_buffer) != + if (slurmdbd_unpack_usage_msg(&get_msg, slurmdbd_conn->rpc_version, + type, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_USAGE message"; error("%s", comment); @@ -1487,7 +1497,7 @@ static int _get_usage(uint16_t type, slurmdbd_conn_t *slurmdbd_conn, rc = acct_storage_g_get_usage(slurmdbd_conn->db_conn, *uid, get_msg->rec, type, get_msg->start, get_msg->end); - slurmdbd_free_usage_msg(type, get_msg); + slurmdbd_free_usage_msg(get_msg, type); if(rc != SLURM_SUCCESS) { comment = "Problem getting usage info"; @@ -1502,8 +1512,8 @@ static int _get_usage(uint16_t type, slurmdbd_conn_t *slurmdbd_conn, get_msg->rec = NULL; *out_buffer = init_buf(1024); pack16((uint16_t) ret_type, *out_buffer); - slurmdbd_pack_usage_msg(slurmdbd_conn->rpc_version, - ret_type, &got_msg, *out_buffer); + slurmdbd_pack_usage_msg(&got_msg, slurmdbd_conn->rpc_version, + ret_type, *out_buffer); return SLURM_SUCCESS; } @@ -1518,8 +1528,8 @@ static int _get_users(slurmdbd_conn_t *slurmdbd_conn, debug2("DBD_GET_USERS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_USERS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_USERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_USERS message"; error("%s", comment); @@ -1537,8 +1547,8 @@ static int _get_users(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_USERS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_USERS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_USERS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1546,7 +1556,7 @@ static int _get_users(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_USERS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_USERS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1574,8 +1584,8 @@ static int _get_wckeys(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_WCKEYS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_WCKEYS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_WCKEYS message"; error("%s", comment); @@ -1593,8 +1603,8 @@ static int _get_wckeys(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_WCKEYS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_WCKEYS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_WCKEYS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1602,7 +1612,7 @@ static int _get_wckeys(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_WCKEYS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_WCKEYS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1619,8 +1629,8 @@ static int _get_reservations(slurmdbd_conn_t *slurmdbd_conn, debug2("DBD_GET_RESVS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_GET_RESVS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_GET_RESVS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_GET_RESVS message"; error("%s", comment); @@ -1638,8 +1648,8 @@ static int _get_reservations(slurmdbd_conn_t *slurmdbd_conn, list_msg.my_list = list_create(NULL); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_RESVS, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_RESVS, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_RESVS, *out_buffer); } else { *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, errno, slurm_strerror(errno), @@ -1647,7 +1657,7 @@ static int _get_reservations(slurmdbd_conn_t *slurmdbd_conn, rc = SLURM_ERROR; } - slurmdbd_free_cond_msg(DBD_GET_RESVS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_GET_RESVS); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -1667,8 +1677,8 @@ static int _flush_jobs(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_cluster_cpus_msg(slurmdbd_conn->rpc_version, - &cluster_cpus_msg, in_buffer) != + if (slurmdbd_unpack_cluster_cpus_msg(&cluster_cpus_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_FLUSH_JOBS message"; error("%s", comment); @@ -1741,8 +1751,8 @@ static int _fini_conn(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, char *comment = NULL; int rc = SLURM_SUCCESS; - if (slurmdbd_unpack_fini_msg(slurmdbd_conn->rpc_version, - &fini_msg, in_buffer) != SLURM_SUCCESS) { + if (slurmdbd_unpack_fini_msg(&fini_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_FINI message"; error("%s", comment); rc = SLURM_ERROR; @@ -1780,8 +1790,8 @@ static int _job_complete(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_job_complete_msg(slurmdbd_conn->rpc_version, - &job_comp_msg, in_buffer) != + if (slurmdbd_unpack_job_complete_msg(&job_comp_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_JOB_COMPLETE message"; error("%s", comment); @@ -1830,8 +1840,6 @@ static int _job_start(slurmdbd_conn_t *slurmdbd_conn, { dbd_job_start_msg_t *job_start_msg = NULL; dbd_id_rc_msg_t id_rc_msg; - struct job_record job; - struct job_details details; char *comment = NULL; if (*uid != slurmdbd_conf->slurm_user_id) { @@ -1842,8 +1850,9 @@ static int _job_start(slurmdbd_conn_t *slurmdbd_conn, DBD_JOB_START); return SLURM_ERROR; } - if (slurmdbd_unpack_job_start_msg(slurmdbd_conn->rpc_version, - &job_start_msg, in_buffer) != + if (slurmdbd_unpack_job_start_msg((void **)&job_start_msg, + slurmdbd_conn->rpc_version, + in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_JOB_START message"; error("%s", comment); @@ -1852,61 +1861,14 @@ static int _job_start(slurmdbd_conn_t *slurmdbd_conn, DBD_JOB_START); return SLURM_ERROR; } - memset(&job, 0, sizeof(struct job_record)); - memset(&details, 0, sizeof(struct job_details)); - memset(&id_rc_msg, 0, sizeof(dbd_id_rc_msg_t)); - - job.total_cpus = job_start_msg->alloc_cpus; - job.node_cnt = job_start_msg->alloc_nodes; - job.account = _replace_double_quotes(job_start_msg->account); - job.assoc_id = job_start_msg->assoc_id; - job.comment = job_start_msg->block_id; - job.db_index = job_start_msg->db_index; - details.begin_time = job_start_msg->eligible_time; - job.user_id = job_start_msg->uid; - job.group_id = job_start_msg->gid; - job.job_id = job_start_msg->job_id; - job.job_state = job_start_msg->job_state; - job.name = _replace_double_quotes(job_start_msg->name); - job.nodes = job_start_msg->nodes; - job.network = job_start_msg->node_inx; - job.partition = job_start_msg->partition; - details.min_cpus = job_start_msg->req_cpus; - job.resv_id = job_start_msg->resv_id; - job.priority = job_start_msg->priority; - job.start_time = job_start_msg->start_time; - job.time_limit = job_start_msg->timelimit; - job.wckey = _replace_double_quotes(job_start_msg->wckey); - details.submit_time = job_start_msg->submit_time; - - job.details = &details; - - if(job.job_state & JOB_RESIZING) { - job.resize_time = job_start_msg->eligible_time; - debug2("DBD_JOB_START: RESIZE CALL ID:%u NAME:%s INX:%u", - job_start_msg->job_id, job_start_msg->name, - job.db_index); - } else if(job.start_time) { - debug2("DBD_JOB_START: START CALL ID:%u NAME:%s INX:%u", - job_start_msg->job_id, job_start_msg->name, - job.db_index); - } else { - debug2("DBD_JOB_START: ELIGIBLE CALL ID:%u NAME:%s", - job_start_msg->job_id, job_start_msg->name); - } - id_rc_msg.return_code = jobacct_storage_g_job_start( - slurmdbd_conn->db_conn, &job); - id_rc_msg.id = job.db_index; - /* just incase job.wckey was set because we didn't send one */ - if(!job_start_msg->wckey) - xfree(job.wckey); + _process_job_start(slurmdbd_conn, job_start_msg, &id_rc_msg); slurmdbd_free_job_start_msg(job_start_msg); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_ID_RC, *out_buffer); - slurmdbd_pack_id_rc_msg(slurmdbd_conn->rpc_version, - &id_rc_msg, *out_buffer); + slurmdbd_pack_id_rc_msg(&id_rc_msg, + slurmdbd_conn->rpc_version, *out_buffer); return SLURM_SUCCESS; } @@ -1925,9 +1887,9 @@ static int _job_suspend(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_job_suspend_msg(slurmdbd_conn->rpc_version, - &job_suspend_msg, in_buffer) != - SLURM_SUCCESS) { + if (slurmdbd_unpack_job_suspend_msg(&job_suspend_msg, + slurmdbd_conn->rpc_version, + in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_JOB_SUSPEND message"; error("%s", comment); rc = SLURM_ERROR; @@ -1984,8 +1946,8 @@ static int _modify_accounts(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_modify_msg(slurmdbd_conn->rpc_version, - DBD_MODIFY_ACCOUNTS, &get_msg, + if (slurmdbd_unpack_modify_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_ACCOUNTS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_ACCOUNTS message"; error("%s", comment); @@ -2017,17 +1979,17 @@ static int _modify_accounts(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_modify_msg(DBD_MODIFY_ACCOUNTS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_ACCOUNTS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_ACCOUNTS); return rc; } - slurmdbd_free_modify_msg(DBD_MODIFY_ACCOUNTS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_ACCOUNTS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); return rc; @@ -2043,8 +2005,8 @@ static int _modify_assocs(slurmdbd_conn_t *slurmdbd_conn, debug2("DBD_MODIFY_ASSOCS: called"); - if (slurmdbd_unpack_modify_msg(slurmdbd_conn->rpc_version, - DBD_MODIFY_ASSOCS, &get_msg, + if (slurmdbd_unpack_modify_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_ASSOCS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_ASSOCS message"; error("%s", comment); @@ -2081,17 +2043,17 @@ static int _modify_assocs(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_modify_msg(DBD_MODIFY_ASSOCS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_ASSOCS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_ASSOCS); return rc; } - slurmdbd_free_modify_msg(DBD_MODIFY_ASSOCS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_ASSOCS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2118,8 +2080,8 @@ static int _modify_clusters(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_modify_msg(slurmdbd_conn->rpc_version, - DBD_MODIFY_CLUSTERS, &get_msg, + if (slurmdbd_unpack_modify_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_CLUSTERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_CLUSTERS message"; error("%s", comment); @@ -2152,17 +2114,17 @@ static int _modify_clusters(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_modify_msg(DBD_MODIFY_CLUSTERS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_CLUSTERS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_CLUSTERS); return rc; } - slurmdbd_free_modify_msg(DBD_MODIFY_CLUSTERS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_CLUSTERS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2189,8 +2151,8 @@ static int _modify_qos(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_modify_msg(slurmdbd_conn->rpc_version, - DBD_MODIFY_QOS, &get_msg, + if (slurmdbd_unpack_modify_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_QOS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_QOS message"; error("%s", comment); @@ -2226,17 +2188,17 @@ static int _modify_qos(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_modify_msg(DBD_MODIFY_QOS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_QOS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_QOS); return rc; } - slurmdbd_free_modify_msg(DBD_MODIFY_QOS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_QOS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2258,8 +2220,8 @@ static int _modify_users(slurmdbd_conn_t *slurmdbd_conn, debug2("DBD_MODIFY_USERS: called"); - if (slurmdbd_unpack_modify_msg(slurmdbd_conn->rpc_version, - DBD_MODIFY_USERS, &get_msg, in_buffer) != + if (slurmdbd_unpack_modify_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_USERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_USERS message"; error("%s", comment); @@ -2342,17 +2304,17 @@ is_same_user: comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_modify_msg(DBD_MODIFY_USERS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_USERS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_USERS); return rc; } - slurmdbd_free_modify_msg(DBD_MODIFY_USERS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_USERS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2379,8 +2341,8 @@ static int _modify_wckeys(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_modify_msg(slurmdbd_conn->rpc_version, - DBD_MODIFY_WCKEYS, &get_msg, + if (slurmdbd_unpack_modify_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_WCKEYS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_WCKEYS message"; error("%s", comment); @@ -2413,17 +2375,17 @@ static int _modify_wckeys(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_modify_msg(DBD_MODIFY_WCKEYS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_WCKEYS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_WCKEYS); return rc; } - slurmdbd_free_modify_msg(DBD_MODIFY_WCKEYS, get_msg); + slurmdbd_free_modify_msg(get_msg, DBD_MODIFY_WCKEYS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2443,8 +2405,8 @@ static int _modify_reservation(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_rec_msg(slurmdbd_conn->rpc_version, DBD_MODIFY_RESV, - &rec_msg, in_buffer) != SLURM_SUCCESS) { + if (slurmdbd_unpack_rec_msg(&rec_msg, slurmdbd_conn->rpc_version, + DBD_MODIFY_RESV, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_MODIFY_RESV message"; error("%s", comment); rc = SLURM_ERROR; @@ -2456,7 +2418,7 @@ static int _modify_reservation(slurmdbd_conn_t *slurmdbd_conn, rec_msg->rec); end_it: - slurmdbd_free_rec_msg(DBD_MODIFY_RESV, rec_msg); + slurmdbd_free_rec_msg(rec_msg, DBD_MODIFY_RESV); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_MODIFY_RESV); return rc; @@ -2477,8 +2439,8 @@ static int _node_state(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_node_state_msg(slurmdbd_conn->rpc_version, - &node_state_msg, in_buffer) != + if (slurmdbd_unpack_node_state_msg(&node_state_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_NODE_STATE message"; error("%s", comment); @@ -2542,6 +2504,65 @@ static char *_node_state_string(uint16_t node_state) return "UNKNOWN"; } +static void _process_job_start(slurmdbd_conn_t *slurmdbd_conn, + dbd_job_start_msg_t *job_start_msg, + dbd_id_rc_msg_t *id_rc_msg) +{ + struct job_record job; + struct job_details details; + + memset(&job, 0, sizeof(struct job_record)); + memset(&details, 0, sizeof(struct job_details)); + memset(id_rc_msg, 0, sizeof(dbd_id_rc_msg_t)); + + job.total_cpus = job_start_msg->alloc_cpus; + job.node_cnt = job_start_msg->alloc_nodes; + job.account = _replace_double_quotes(job_start_msg->account); + job.assoc_id = job_start_msg->assoc_id; + job.comment = job_start_msg->block_id; + job.db_index = job_start_msg->db_index; + details.begin_time = job_start_msg->eligible_time; + job.user_id = job_start_msg->uid; + job.group_id = job_start_msg->gid; + job.job_id = job_start_msg->job_id; + job.job_state = job_start_msg->job_state; + job.name = _replace_double_quotes(job_start_msg->name); + job.nodes = job_start_msg->nodes; + job.network = job_start_msg->node_inx; + job.partition = job_start_msg->partition; + details.min_cpus = job_start_msg->req_cpus; + job.resv_id = job_start_msg->resv_id; + job.priority = job_start_msg->priority; + job.start_time = job_start_msg->start_time; + job.time_limit = job_start_msg->timelimit; + job.wckey = _replace_double_quotes(job_start_msg->wckey); + details.submit_time = job_start_msg->submit_time; + + job.details = &details; + + if(job.job_state & JOB_RESIZING) { + job.resize_time = job_start_msg->eligible_time; + debug2("DBD_JOB_START: RESIZE CALL ID:%u NAME:%s INX:%u", + job_start_msg->job_id, job_start_msg->name, + job.db_index); + } else if(job.start_time) { + debug2("DBD_JOB_START: START CALL ID:%u NAME:%s INX:%u", + job_start_msg->job_id, job_start_msg->name, + job.db_index); + } else { + debug2("DBD_JOB_START: ELIGIBLE CALL ID:%u NAME:%s", + job_start_msg->job_id, job_start_msg->name); + } + id_rc_msg->return_code = jobacct_storage_g_job_start( + slurmdbd_conn->db_conn, &job); + id_rc_msg->job_id = job.job_id; + id_rc_msg->id = job.db_index; + + /* just incase job.wckey was set because we didn't send one */ + if(!job_start_msg->wckey) + xfree(job.wckey); +} + static int _register_ctld(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid) { @@ -2558,8 +2579,8 @@ static int _register_ctld(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_register_ctld_msg(slurmdbd_conn->rpc_version, - ®ister_ctld_msg, in_buffer) != + if (slurmdbd_unpack_register_ctld_msg(®ister_ctld_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REGISTER_CTLD message"; error("%s", comment); @@ -2656,8 +2677,8 @@ static int _remove_accounts(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_REMOVE_ACCOUNTS, &get_msg, + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_ACCOUNTS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_ACCOUNTS message"; error("%s", comment); @@ -2687,17 +2708,17 @@ static int _remove_accounts(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_cond_msg(DBD_REMOVE_ACCOUNTS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_ACCOUNTS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_ACCOUNTS); return rc; } - slurmdbd_free_cond_msg(DBD_REMOVE_ACCOUNTS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_ACCOUNTS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2715,8 +2736,8 @@ static int _remove_account_coords(slurmdbd_conn_t *slurmdbd_conn, debug2("DBD_REMOVE_ACCOUNT_COORDS: called"); - if (slurmdbd_unpack_acct_coord_msg(slurmdbd_conn->rpc_version, - &get_msg, in_buffer) != + if (slurmdbd_unpack_acct_coord_msg(&get_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_ACCOUNT_COORDS message"; error("%s", comment); @@ -2762,8 +2783,8 @@ static int _remove_account_coords(slurmdbd_conn_t *slurmdbd_conn, slurmdbd_free_acct_coord_msg(get_msg); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2779,8 +2800,8 @@ static int _remove_assocs(slurmdbd_conn_t *slurmdbd_conn, char *comment = NULL; debug2("DBD_REMOVE_ASSOCS: called"); - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_REMOVE_ASSOCS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_ASSOCS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_ASSOCS message"; error("%s", comment); @@ -2815,17 +2836,17 @@ static int _remove_assocs(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_cond_msg(DBD_REMOVE_ASSOCS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_ASSOCS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_ASSOCS); return rc; } - slurmdbd_free_cond_msg(DBD_REMOVE_ASSOCS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_ASSOCS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2855,8 +2876,8 @@ static int _remove_clusters(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_REMOVE_CLUSTERS, &get_msg, + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_CLUSTERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_CLUSTERS message"; error("%s", comment); @@ -2886,17 +2907,17 @@ static int _remove_clusters(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_cond_msg(DBD_REMOVE_CLUSTERS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_CLUSTERS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_CLUSTERS); return rc; } - slurmdbd_free_cond_msg(DBD_REMOVE_CLUSTERS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_CLUSTERS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2925,8 +2946,8 @@ static int _remove_qos(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_REMOVE_QOS, &get_msg, + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_QOS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_QOS message"; error("%s", comment); @@ -2956,17 +2977,17 @@ static int _remove_qos(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_cond_msg(DBD_REMOVE_QOS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_QOS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_QOS); return rc; } - slurmdbd_free_cond_msg(DBD_REMOVE_QOS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_QOS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -2995,8 +3016,8 @@ static int _remove_users(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_REMOVE_USERS, &get_msg, in_buffer) != + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_USERS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_USERS message"; error("%s", comment); @@ -3026,17 +3047,17 @@ static int _remove_users(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_cond_msg(DBD_REMOVE_USERS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_USERS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_USERS); return rc; } - slurmdbd_free_cond_msg(DBD_REMOVE_USERS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_USERS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -3065,8 +3086,8 @@ static int _remove_wckeys(slurmdbd_conn_t *slurmdbd_conn, return ESLURM_ACCESS_DENIED; } - if (slurmdbd_unpack_cond_msg(slurmdbd_conn->rpc_version, - DBD_REMOVE_WCKEYS, &get_msg, + if (slurmdbd_unpack_cond_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_WCKEYS, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_WCKEYS message"; error("%s", comment); @@ -3096,17 +3117,17 @@ static int _remove_wckeys(slurmdbd_conn_t *slurmdbd_conn, comment = "Unknown issue"; } error("%s", comment); - slurmdbd_free_cond_msg(DBD_REMOVE_WCKEYS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_WCKEYS); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_WCKEYS); return rc; } - slurmdbd_free_cond_msg(DBD_REMOVE_WCKEYS, get_msg); + slurmdbd_free_cond_msg(get_msg, DBD_REMOVE_WCKEYS); *out_buffer = init_buf(1024); pack16((uint16_t) DBD_GOT_LIST, *out_buffer); - slurmdbd_pack_list_msg(slurmdbd_conn->rpc_version, - DBD_GOT_LIST, &list_msg, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_LIST, *out_buffer); if(list_msg.my_list) list_destroy(list_msg.my_list); @@ -3126,8 +3147,9 @@ static int _remove_reservation(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_rec_msg(slurmdbd_conn->rpc_version, DBD_REMOVE_RESV, - &rec_msg, in_buffer) != SLURM_SUCCESS) { + if (slurmdbd_unpack_rec_msg(&rec_msg, slurmdbd_conn->rpc_version, + DBD_REMOVE_RESV, + in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_REMOVE_RESV message"; error("%s", comment); rc = SLURM_ERROR; @@ -3139,7 +3161,7 @@ static int _remove_reservation(slurmdbd_conn_t *slurmdbd_conn, rec_msg->rec); end_it: - slurmdbd_free_rec_msg(DBD_REMOVE_RESV, rec_msg); + slurmdbd_free_rec_msg(rec_msg, DBD_REMOVE_RESV); *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, rc, comment, DBD_REMOVE_RESV); return rc; @@ -3163,8 +3185,8 @@ static int _roll_usage(slurmdbd_conn_t *slurmdbd_conn, goto end_it; } - if (slurmdbd_unpack_roll_usage_msg(slurmdbd_conn->rpc_version, - &get_msg, in_buffer) != + if (slurmdbd_unpack_roll_usage_msg(&get_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_ROLL_USAGE message"; error("%s", comment); @@ -3183,6 +3205,56 @@ end_it: return rc; } +static int _send_mult_job_start(slurmdbd_conn_t *slurmdbd_conn, + Buf in_buffer, Buf *out_buffer, + uint32_t *uid) +{ + dbd_list_msg_t *get_msg = NULL; + dbd_list_msg_t list_msg; + char *comment = NULL; + ListIterator itr = NULL; + dbd_job_start_msg_t *job_start_msg; + dbd_id_rc_msg_t *id_rc_msg; + + if (*uid != slurmdbd_conf->slurm_user_id) { + comment = "DBD_SEND_MULT_JOB_START message from invalid uid"; + error("%s %u", comment, *uid); + *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, + ESLURM_ACCESS_DENIED, comment, + DBD_JOB_START); + return SLURM_ERROR; + } + + if (slurmdbd_unpack_list_msg(&get_msg, slurmdbd_conn->rpc_version, + DBD_SEND_MULT_JOB_START, + in_buffer) != SLURM_SUCCESS) { + comment = "Failed to unpack DBD_MULT_JOB_START message"; + error("%s", comment); + *out_buffer = make_dbd_rc_msg(slurmdbd_conn->rpc_version, + SLURM_ERROR, comment, + DBD_SEND_MULT_JOB_START); + return SLURM_ERROR; + } + + list_msg.my_list = list_create(slurmdbd_free_id_rc_msg); + + itr = list_iterator_create(get_msg->my_list); + while((job_start_msg = list_next(itr))) { + id_rc_msg = xmalloc(sizeof(dbd_id_rc_msg_t)); + list_append(list_msg.my_list, id_rc_msg); + + _process_job_start(slurmdbd_conn, job_start_msg, id_rc_msg); + } + list_iterator_destroy(itr); + + *out_buffer = init_buf(1024); + pack16((uint16_t) DBD_GOT_MULT_JOB_START, *out_buffer); + slurmdbd_pack_list_msg(&list_msg, slurmdbd_conn->rpc_version, + DBD_GOT_MULT_JOB_START, *out_buffer); + + return SLURM_SUCCESS; +} + static int _step_complete(slurmdbd_conn_t *slurmdbd_conn, Buf in_buffer, Buf *out_buffer, uint32_t *uid) { @@ -3199,8 +3271,8 @@ static int _step_complete(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_step_complete_msg(slurmdbd_conn->rpc_version, - &step_comp_msg, in_buffer) != + if (slurmdbd_unpack_step_complete_msg(&step_comp_msg, + slurmdbd_conn->rpc_version, in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_STEP_COMPLETE message"; error("%s", comment); @@ -3262,9 +3334,9 @@ static int _step_start(slurmdbd_conn_t *slurmdbd_conn, rc = ESLURM_ACCESS_DENIED; goto end_it; } - if (slurmdbd_unpack_step_start_msg(slurmdbd_conn->rpc_version, - &step_start_msg, in_buffer) != - SLURM_SUCCESS) { + if (slurmdbd_unpack_step_start_msg(&step_start_msg, + slurmdbd_conn->rpc_version, + in_buffer) != SLURM_SUCCESS) { comment = "Failed to unpack DBD_STEP_START message"; error("%s", comment); rc = SLURM_ERROR; diff --git a/src/slurmdbd/rpc_mgr.c b/src/slurmdbd/rpc_mgr.c index acc8ea8e12f..93d8c19a819 100644 --- a/src/slurmdbd/rpc_mgr.c +++ b/src/slurmdbd/rpc_mgr.c @@ -266,7 +266,7 @@ extern Buf make_dbd_rc_msg(uint16_t rpc_version, msg.return_code = rc; msg.comment = comment; msg.sent_type = sent_type; - slurmdbd_pack_rc_msg(rpc_version, &msg, buffer); + slurmdbd_pack_rc_msg(&msg, rpc_version, buffer); return buffer; } -- GitLab