From b3d07a4617ab2a452214a19faa101b924ef8b760 Mon Sep 17 00:00:00 2001 From: Moe Jette <jette1@llnl.gov> Date: Wed, 26 Mar 2008 15:11:50 +0000 Subject: [PATCH] gang scheduling suspend/resume patch from Chris Holmes (slurmd.suspend.patch) --- src/slurmd/slurmd/req.c | 118 +++++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 25 deletions(-) diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 45f8be89e35..f98831fe47f 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -158,6 +158,14 @@ static pthread_mutex_t job_limits_mutex = PTHREAD_MUTEX_INITIALIZER; static List job_limits_list = NULL; static bool job_limits_loaded = false; +/* NUM_PARALLEL_SUSPEND controls the number of jobs suspended/resumed + * at one time as well as the number of jobsteps per job that can be + * suspended at one time */ +#define NUM_PARALLEL_SUSPEND 8 +static pthread_mutex_t suspend_mutex = PTHREAD_MUTEX_INITIALIZER; +static uint32_t job_suspend_array[NUM_PARALLEL_SUSPEND]; +static int job_suspend_size = 0; + void slurmd_req(slurm_msg_t *msg) { @@ -2253,7 +2261,51 @@ _rpc_signal_job(slurm_msg_t *msg) } } -#define NUM_PARALLEL_SUSPEND 8 +/* if a lock is granted to the job then return 1; else return 0 if + * the lock for the job is already taken or there's no more locks */ +static int +_get_suspend_job_lock(uint32_t jobid) +{ + int i, spot = -1; + pthread_mutex_lock(&suspend_mutex); + + for (i = 0; i < job_suspend_size; i++) { + if (job_suspend_array[i] == -1) { + spot = i; + continue; + } + if (job_suspend_array[i] == jobid) { + /* another thread already has the lock */ + pthread_mutex_unlock(&suspend_mutex); + return 0; + } + } + i = 0; + if (spot != -1) { + /* nobody has the lock and here's an available used lock */ + job_suspend_array[spot] = jobid; + i = 1; + } else if (job_suspend_size < NUM_PARALLEL_SUSPEND) { + /* a new lock is available */ + job_suspend_array[job_suspend_size++] = jobid; + i = 1; + } + pthread_mutex_unlock(&suspend_mutex); + return i; +} + +static void +_unlock_suspend_job(uint32_t jobid) +{ + int i; + pthread_mutex_lock(&suspend_mutex); + for (i = 0; i < job_suspend_size; i++) { + if (job_suspend_array[i] == jobid) + job_suspend_array[i] = -1; + } + pthread_mutex_unlock(&suspend_mutex); +} + /* * Send a job suspend/resume request through the appropriate slurmstepds for * each job step belonging to a given job allocation. @@ -2263,32 +2315,60 @@ _rpc_suspend_job(slurm_msg_t *msg) { suspend_msg_t *req = msg->data; uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred, NULL); - long job_uid; List steps; ListIterator i; step_loc_t *stepd; int step_cnt = 0; - int rc = SLURM_SUCCESS; + int first_time, rc = SLURM_SUCCESS; if (req->op != SUSPEND_JOB && req->op != RESUME_JOB) { error("REQUEST_SUSPEND: bad op code %u", req->op); rc = ESLURM_NOT_SUPPORTED; - goto fini; } - debug("_rpc_suspend_job jobid=%u uid=%d action=%s", req->job_id, - req_uid, req->op == SUSPEND_JOB ? "suspend" : "resume"); - job_uid = _get_job_uid(req->job_id); - if (job_uid < 0) - goto no_job; + /* * check that requesting user ID is the SLURM UID or root */ if (!_slurm_authorized_user(req_uid)) { - error("Security violation: signal_job(%u) from uid %ld", + error("Security violation: suspend_job(%u) from uid %ld", req->job_id, (long) req_uid); rc = ESLURM_USER_ID_MISSING; - goto fini; - } + } + + /* send a response now, which will include any errors + * detected with the request */ + if (msg->conn_fd >= 0) { + slurm_send_rc_msg(msg, rc); + if (slurm_close_accepted_conn(msg->conn_fd) < 0) + error ("_rpc_suspend_job: close(%d): %m", msg->conn_fd); + msg->conn_fd = -1; + } + if (rc != SLURM_SUCCESS) + return; + + /* now we can focus on performing the requested action, + * which could take a few seconds to complete */ + debug("_rpc_suspend_job jobid=%u uid=%d action=%s", req->job_id, + req_uid, req->op == SUSPEND_JOB ? "suspend" : "resume"); + + /* Try to get a thread lock for this job. If the lock + * is not available then sleep and try again */ + first_time = 1; + while (!_get_suspend_job_lock(req->job_id)) { + first_time = 0; + debug3("suspend lock sleep for %u", req->job_id); + sleep(1); + } + + /* If suspending and you got the lock on the first try then + * sleep for 1 second to give any launch requests a chance + * to get started and avoid a race condition that would + * effectively cause the suspend request to get ignored + * because "there's no job to suspend" */ + if (first_time && req->op == SUSPEND_JOB) { + debug3("suspend first sleep for %u", req->job_id); + sleep(1); + } /* * Loop through all job steps and call stepd_suspend or stepd_resume @@ -2350,24 +2430,12 @@ _rpc_suspend_job(slurm_msg_t *msg) } list_iterator_destroy(i); list_destroy(steps); + _unlock_suspend_job(req->job_id); - no_job: if (step_cnt == 0) { debug2("No steps in jobid %u to suspend/resume", req->job_id); } - - /* - * At this point, if connection still open, we send controller - * a reply. - */ -fini: - if (msg->conn_fd >= 0) { - slurm_send_rc_msg(msg, rc); - if (slurm_close_accepted_conn(msg->conn_fd) < 0) - error ("_rpc_signal_job: close(%d): %m", msg->conn_fd); - msg->conn_fd = -1; - } } static void -- GitLab