Skip to content
Snippets Groups Projects
Commit 11fb91dd authored by Moe Jette's avatar Moe Jette
Browse files

Added support for normal job/step completion RPC.

parent 30ecfdb3
No related branches found
No related tags found
No related merge requests found
...@@ -24,14 +24,15 @@ lib_LTLIBRARIES = libslurm.la ...@@ -24,14 +24,15 @@ lib_LTLIBRARIES = libslurm.la
libslurm_la_SOURCES = \ libslurm_la_SOURCES = \
allocate.c \ allocate.c \
config_info.c \ cancel.c \
complete.c \
config_info.c \
job_info.c \ job_info.c \
node_info.c \ node_info.c \
partition_info.c \ partition_info.c \
submit.c \ submit.c \
reconfigure.c \ reconfigure.c \
update_config.c \ update_config.c
cancel.c
common_dir = $(top_srcdir)/src/common common_dir = $(top_srcdir)/src/common
...@@ -62,7 +63,7 @@ libslurm_la_DEPENDENCIES = libslurm.sym $(libslurm_la_LIBADD) ...@@ -62,7 +63,7 @@ libslurm_la_DEPENDENCIES = libslurm.sym $(libslurm_la_LIBADD)
# place of the rules that follow. # place of the rules that follow.
#allocate_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS) #allocate_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS)
#config_info_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS) #config_info_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS)
#cancel_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS) #cancel_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS)
#job_info_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS) #job_info_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS)
#node_info_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS) #node_info_CFLAGS = -DDEBUG_MODULE $(AM_CFLAGS)
...@@ -82,7 +83,7 @@ libslurm_la_DEPENDENCIES = libslurm.sym $(libslurm_la_LIBADD) ...@@ -82,7 +83,7 @@ libslurm_la_DEPENDENCIES = libslurm.sym $(libslurm_la_LIBADD)
maintests : allocate cancel job_info node_info partition_info reconfigure submit update_config maintests : allocate cancel job_info node_info partition_info reconfigure submit update_config
allocate : allocate_d.o allocate : allocate_d.o
config_info : config_info_d.o config_info : config_info_d.o
cancel : cancel_d.o cancel : cancel_d.o
job_info : job_info_d.o job_info : job_info_d.o
node_info : node_info_d.o node_info : node_info_d.o
......
/*****************************************************************************\ /*****************************************************************************\
* cancel.c - cancel a slurm job * cancel.c - cancel a slurm job or job step
***************************************************************************** *****************************************************************************
* Copyright (C) 2002 The Regents of the University of California. * Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
......
/*****************************************************************************\
* complete.c - note the completion a slurm job or job step
*****************************************************************************
* Copyright (C) 2002 The Regents of the University of California.
* Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
* Written by moe jette <jette1@llnl.gov>.
* UCRL-CODE-2002-040.
*
* This file is part of SLURM, a resource management program.
* For details, see <http://www.llnl.gov/linux/slurm/>.
*
* SLURM is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the License, or (at your option)
* any later version.
*
* SLURM is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along
* with SLURM; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
\*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <src/api/slurm.h>
#include <src/common/slurm_protocol_api.h>
/* slurm_complete_job - note the completion of a job and all of its steps */
int
slurm_complete_job ( uint32_t job_id )
{
return slurm_complete_job_step ( job_id, NO_VAL);
}
/* slurm_complete_job_step - note the completion of a specific job step
* (or all steps if step_id==NO_VAL) */
int
slurm_complete_job_step ( uint32_t job_id, uint32_t step_id )
{
int msg_size ;
int rc ;
slurm_fd sockfd ;
slurm_msg_t request_msg ;
slurm_msg_t response_msg ;
job_step_id_msg_t job_step_id_msg ;
return_code_msg_t * slurm_rc_msg ;
/* init message connection for message communication with controller */
if ( ( sockfd = slurm_open_controller_conn ( ) ) == SLURM_SOCKET_ERROR ) {
slurm_seterrno ( SLURM_COMMUNICATIONS_CONNECTION_ERROR );
return SLURM_SOCKET_ERROR ;
}
/* send request message */
job_step_id_msg . job_id = job_id ;
job_step_id_msg . job_step_id = step_id ;
request_msg . msg_type = REQUEST_COMPLETE_JOB_STEP ;
request_msg . data = &job_step_id_msg ;
if ( ( rc = slurm_send_controller_msg ( sockfd , & request_msg ) ) == SLURM_SOCKET_ERROR ) {
slurm_seterrno ( SLURM_COMMUNICATIONS_SEND_ERROR );
return SLURM_SOCKET_ERROR ;
}
/* receive message */
if ( ( msg_size = slurm_receive_msg ( sockfd , & response_msg ) ) == SLURM_SOCKET_ERROR ) {
slurm_seterrno ( SLURM_COMMUNICATIONS_RECEIVE_ERROR );
return SLURM_SOCKET_ERROR ;
}
/* shutdown message connection */
if ( ( rc = slurm_shutdown_msg_conn ( sockfd ) ) == SLURM_SOCKET_ERROR ) {
slurm_seterrno ( SLURM_COMMUNICATIONS_SHUTDOWN_ERROR );
return SLURM_SOCKET_ERROR ;
}
if ( msg_size )
return msg_size;
switch ( response_msg . msg_type )
{
case RESPONSE_SLURM_RC:
slurm_rc_msg = ( return_code_msg_t * ) response_msg . data ;
rc = slurm_rc_msg->return_code;
slurm_free_return_code_msg ( slurm_rc_msg );
if (rc) {
slurm_seterrno ( rc );
return SLURM_PROTOCOL_ERROR;
}
break ;
default:
slurm_seterrno ( SLURM_UNEXPECTED_MSG_ERROR );
return SLURM_PROTOCOL_ERROR;
break ;
}
return SLURM_PROTOCOL_SUCCESS ;
}
...@@ -19,6 +19,9 @@ extern int slurm_allocate_resources (job_desc_msg_t * job_desc_msg , resource_al ...@@ -19,6 +19,9 @@ extern int slurm_allocate_resources (job_desc_msg_t * job_desc_msg , resource_al
extern int slurm_cancel_job (uint32_t job_id); extern int slurm_cancel_job (uint32_t job_id);
extern int slurm_cancel_job_step (uint32_t job_id, uint32_t step_id); extern int slurm_cancel_job_step (uint32_t job_id, uint32_t step_id);
extern int slurm_complete_job (uint32_t job_id);
extern int slurm_complete_job_step (uint32_t job_id, uint32_t step_id);
/*************************** /***************************
* slurm_ctl_conf.c * slurm_ctl_conf.c
......
...@@ -134,6 +134,8 @@ typedef enum { ...@@ -134,6 +134,8 @@ typedef enum {
RESPONSE_SIGNAL_JOB_STEP, RESPONSE_SIGNAL_JOB_STEP,
REQUEST_CANCEL_JOB_STEP, REQUEST_CANCEL_JOB_STEP,
RESPONSE_CANCEL_JOB_STEP, RESPONSE_CANCEL_JOB_STEP,
REQUEST_COMPLETE_JOB_STEP,
RESPONSE_COMPLETE_JOB_STEP,
REQUEST_LAUNCH_TASKS = 6001, REQUEST_LAUNCH_TASKS = 6001,
RESPONSE_LAUNCH_TASKS, RESPONSE_LAUNCH_TASKS,
......
...@@ -163,6 +163,7 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) ...@@ -163,6 +163,7 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len )
break ; break ;
case REQUEST_CANCEL_JOB_STEP : case REQUEST_CANCEL_JOB_STEP :
case REQUEST_COMPLETE_JOB_STEP :
pack_cancel_job_step_msg ( ( job_step_id_msg_t * ) msg->data , pack_cancel_job_step_msg ( ( job_step_id_msg_t * ) msg->data ,
( void ** ) buffer , buf_len ) ; ( void ** ) buffer , buf_len ) ;
break ; break ;
...@@ -172,6 +173,7 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len ) ...@@ -172,6 +173,7 @@ int pack_msg ( slurm_msg_t const * msg , char ** buffer , uint32_t * buf_len )
break ; break ;
case RESPONSE_RECONFIGURE : case RESPONSE_RECONFIGURE :
case RESPONSE_CANCEL_JOB_STEP : case RESPONSE_CANCEL_JOB_STEP :
case RESPONSE_COMPLETE_JOB_STEP :
case RESPONSE_SIGNAL_JOB : case RESPONSE_SIGNAL_JOB :
case RESPONSE_SIGNAL_JOB_STEP : case RESPONSE_SIGNAL_JOB_STEP :
break ; break ;
...@@ -297,6 +299,7 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len ) ...@@ -297,6 +299,7 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len )
( void ** ) buffer , buf_len ) ; ( void ** ) buffer , buf_len ) ;
break ; break ;
case REQUEST_CANCEL_JOB_STEP : case REQUEST_CANCEL_JOB_STEP :
case REQUEST_COMPLETE_JOB_STEP :
unpack_cancel_job_step_msg ( ( job_step_id_msg_t ** ) & ( msg->data ) , unpack_cancel_job_step_msg ( ( job_step_id_msg_t ** ) & ( msg->data ) ,
( void ** ) buffer , buf_len ) ; ( void ** ) buffer , buf_len ) ;
break ; break ;
...@@ -306,6 +309,7 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len ) ...@@ -306,6 +309,7 @@ int unpack_msg ( slurm_msg_t * msg , char ** buffer , uint32_t * buf_len )
break ; break ;
case RESPONSE_RECONFIGURE : case RESPONSE_RECONFIGURE :
case RESPONSE_CANCEL_JOB_STEP : case RESPONSE_CANCEL_JOB_STEP :
case RESPONSE_COMPLETE_JOB_STEP :
case RESPONSE_SIGNAL_JOB : case RESPONSE_SIGNAL_JOB :
case RESPONSE_SIGNAL_JOB_STEP : case RESPONSE_SIGNAL_JOB_STEP :
break ; break ;
......
...@@ -63,6 +63,7 @@ inline static void slurm_rpc_dump_nodes ( slurm_msg_t * msg ) ; ...@@ -63,6 +63,7 @@ inline static void slurm_rpc_dump_nodes ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_partitions ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_partitions ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_dump_jobs ( slurm_msg_t * msg ) ; inline static void slurm_rpc_dump_jobs ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_step_cancel ( slurm_msg_t * msg ) ; inline static void slurm_rpc_job_step_cancel ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_job_step_complete ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ; inline static void slurm_rpc_submit_batch_job ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ; inline static void slurm_rpc_reconfigure_controller ( slurm_msg_t * msg ) ;
inline static void slurm_rpc_node_registration ( slurm_msg_t * msg ) ; inline static void slurm_rpc_node_registration ( slurm_msg_t * msg ) ;
...@@ -212,6 +213,10 @@ slurmctld_req ( slurm_msg_t * msg ) ...@@ -212,6 +213,10 @@ slurmctld_req ( slurm_msg_t * msg )
slurm_rpc_job_step_cancel ( msg ) ; slurm_rpc_job_step_cancel ( msg ) ;
slurm_free_job_step_id_msg ( msg -> data ) ; slurm_free_job_step_id_msg ( msg -> data ) ;
break; break;
case REQUEST_COMPLETE_JOB_STEP:
slurm_rpc_job_step_complete ( msg ) ;
slurm_free_job_step_id_msg ( msg -> data ) ;
break;
case REQUEST_SUBMIT_BATCH_JOB: case REQUEST_SUBMIT_BATCH_JOB:
slurm_rpc_submit_batch_job ( msg ) ; slurm_rpc_submit_batch_job ( msg ) ;
slurm_free_job_desc_msg ( msg -> data ) ; slurm_free_job_desc_msg ( msg -> data ) ;
...@@ -446,6 +451,59 @@ slurm_rpc_job_step_cancel ( slurm_msg_t * msg ) ...@@ -446,6 +451,59 @@ slurm_rpc_job_step_cancel ( slurm_msg_t * msg )
schedule(); schedule();
} }
/* slurm_rpc_job_step_complete - process RPC to note the completion an entire job or
* an individual job step */
void
slurm_rpc_job_step_complete ( slurm_msg_t * msg )
{
/* init */
int error_code;
clock_t start_time;
job_step_id_msg_t * job_step_id_msg = ( job_step_id_msg_t * ) msg-> data ;
start_time = clock ();
/* do RPC call */
if (job_step_id_msg->job_step_id == NO_VAL) {
error_code = job_complete ( job_step_id_msg->job_id );
/* return result */
if (error_code)
{
info ("slurm_rpc_job_step_complete error %d for %u, time=%ld",
error_code, job_step_id_msg->job_id, (long) (clock () - start_time));
slurm_send_rc_msg ( msg , error_code );
}
else
{
info ("slurm_rpc_job_step_complete success for JobId=%u, time=%ld",
job_step_id_msg->job_id, (long) (clock () - start_time));
slurm_send_rc_msg ( msg , SLURM_SUCCESS );
}
}
else {
error_code = job_step_complete ( job_step_id_msg->job_id ,
job_step_id_msg->job_step_id);
/* return result */
if (error_code)
{
info ("slurm_rpc_job_step_complete error %d for %u.%u, time=%ld", error_code,
job_step_id_msg->job_id, job_step_id_msg->job_step_id,
(long) (clock () - start_time));
slurm_send_rc_msg ( msg , error_code );
}
else
{
info ("slurm_rpc_job_step_complete success for %u.%u, time=%ld",
job_step_id_msg->job_id, job_step_id_msg->job_step_id,
(long) (clock () - start_time));
slurm_send_rc_msg ( msg , SLURM_SUCCESS );
}
}
schedule();
}
/* slurm_rpc_update_job - process RPC to update the configuration of a job (e.g. priority) */ /* slurm_rpc_update_job - process RPC to update the configuration of a job (e.g. priority) */
void void
slurm_rpc_update_job ( slurm_msg_t * msg ) slurm_rpc_update_job ( slurm_msg_t * msg )
......
...@@ -519,7 +519,9 @@ job_cancel (uint32_t job_id) ...@@ -519,7 +519,9 @@ job_cancel (uint32_t job_id)
return 0; return 0;
} }
if (job_ptr->job_state == JOB_STAGE_IN) { if ((job_ptr->job_state == JOB_STAGE_IN) ||
(job_ptr->job_state == JOB_RUNNING) ||
(job_ptr->job_state == JOB_STAGE_OUT)) {
last_job_update = time (NULL); last_job_update = time (NULL);
job_ptr->job_state = JOB_FAILED; job_ptr->job_state = JOB_FAILED;
job_ptr->end_time = time(NULL); job_ptr->end_time = time(NULL);
...@@ -536,6 +538,53 @@ job_cancel (uint32_t job_id) ...@@ -536,6 +538,53 @@ job_cancel (uint32_t job_id)
return ESLURM_TRANSITION_STATE_NO_UPDATE; return ESLURM_TRANSITION_STATE_NO_UPDATE;
} }
/*
* job_complete - note the normal termination the specified job
* input: job_id - id of the job which completed
* output: returns 0 on success, otherwise ESLURM error code
* global: job_list - pointer global job list
* last_job_update - time of last job table update
*/
int
job_complete (uint32_t job_id)
{
struct job_record *job_ptr;
/* Locks: Write job, write node */
slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK };
lock_slurmctld (job_write_lock);
job_ptr = find_job_record(job_id);
if (job_ptr == NULL) {
unlock_slurmctld (job_write_lock);
info ("job_complete: invalid job id %u", job_id);
return ESLURM_INVALID_JOB_ID;
}
if ((job_ptr->job_state == JOB_FAILED) ||
(job_ptr->job_state == JOB_COMPLETE) ||
(job_ptr->job_state == JOB_TIMEOUT)) {
unlock_slurmctld (job_write_lock);
return ESLURM_ALREADY_DONE;
}
if ((job_ptr->job_state == JOB_STAGE_IN) ||
(job_ptr->job_state == JOB_RUNNING) ||
(job_ptr->job_state == JOB_STAGE_OUT)) {
deallocate_nodes (job_ptr->node_bitmap);
verbose ("job_complete for job id %u successful", job_id);
}
else {
error ("job_complete for job id %u from bad state", job_id, job_ptr->job_state);
}
last_job_update = time (NULL);
job_ptr->job_state = JOB_COMPLETE;
job_ptr->end_time = time(NULL);
delete_job_details(job_ptr);
unlock_slurmctld (job_write_lock);
return 0;
}
/* /*
* job_create - create a job table record for the supplied specifications. * job_create - create a job table record for the supplied specifications.
* this performs only basic tests for request validity (access to partition, * this performs only basic tests for request validity (access to partition,
...@@ -958,7 +1007,9 @@ job_step_cancel (uint32_t job_id, uint32_t step_id) ...@@ -958,7 +1007,9 @@ job_step_cancel (uint32_t job_id, uint32_t step_id)
return ESLURM_ALREADY_DONE; return ESLURM_ALREADY_DONE;
} }
if (job_ptr->job_state == JOB_STAGE_IN) { if ((job_ptr->job_state == JOB_STAGE_IN) ||
(job_ptr->job_state == JOB_RUNNING) ||
(job_ptr->job_state == JOB_STAGE_OUT)) {
last_job_update = time (NULL); last_job_update = time (NULL);
error_code = delete_step_record (job_ptr, step_id); error_code = delete_step_record (job_ptr, step_id);
unlock_slurmctld (job_write_lock); unlock_slurmctld (job_write_lock);
...@@ -977,6 +1028,50 @@ job_step_cancel (uint32_t job_id, uint32_t step_id) ...@@ -977,6 +1028,50 @@ job_step_cancel (uint32_t job_id, uint32_t step_id)
} }
/*
* job_step_complete - note normal completion the specified job step
* input: job_id, step_id - id of the job to be completed
* output: returns 0 on success, otherwise ESLURM error code
* global: job_list - pointer global job list
* last_job_update - time of last job table update
*/
int
job_step_complete (uint32_t job_id, uint32_t step_id)
{
struct job_record *job_ptr;
int error_code;
/* Locks: Write job */
slurmctld_lock_t job_write_lock = { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
lock_slurmctld (job_write_lock);
job_ptr = find_job_record(job_id);
if (job_ptr == NULL) {
unlock_slurmctld (job_write_lock);
info ("job_step_complete: invalid job id %u", job_id);
return ESLURM_INVALID_JOB_ID;
}
if ((job_ptr->job_state == JOB_FAILED) ||
(job_ptr->job_state == JOB_COMPLETE) ||
(job_ptr->job_state == JOB_TIMEOUT)) {
unlock_slurmctld (job_write_lock);
return ESLURM_ALREADY_DONE;
}
last_job_update = time (NULL);
error_code = delete_step_record (job_ptr, step_id);
unlock_slurmctld (job_write_lock);
if (error_code == ENOENT) {
info ("job_step_complete step %u.%u not found", job_id, step_id);
return ESLURM_ALREADY_DONE;
}
return 0;
unlock_slurmctld (job_write_lock);
return ESLURM_TRANSITION_STATE_NO_UPDATE;
}
/* validate_job_desc - validate that a job descriptor for job submit or /* validate_job_desc - validate that a job descriptor for job submit or
* allocate has valid data, set values to defaults as required */ * allocate has valid data, set values to defaults as required */
int int
......
...@@ -298,6 +298,12 @@ extern int job_cancel (uint32_t job_id); ...@@ -298,6 +298,12 @@ extern int job_cancel (uint32_t job_id);
/* job_step_cancel - cancel the specified job step */ /* job_step_cancel - cancel the specified job step */
extern int job_step_cancel (uint32_t job_id, uint32_t job_step_id); extern int job_step_cancel (uint32_t job_id, uint32_t job_step_id);
/* job_complete - note the completion the specified job */
extern int job_complete (uint32_t job_id);
/* job_step_complete - note the completion the specified job step*/
extern int job_step_complete (uint32_t job_id, uint32_t job_step_id);
/* job_create - create a job table record for the supplied specifications */ /* job_create - create a job table record for the supplied specifications */
extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int allocate, extern int job_create (job_desc_msg_t * job_specs, uint32_t *new_job_id, int allocate,
int will_run, struct job_record **job_rec_ptr); int will_run, struct job_record **job_rec_ptr);
......
AUTOMAKE_OPTIONS = foreign AUTOMAKE_OPTIONS = foreign
noinst_PROGRAMS = cancel-tst \ noinst_PROGRAMS = cancel-tst \
complete-tst \
allocate-tst \ allocate-tst \
job_info-tst \ job_info-tst \
node_info-tst \ node_info-tst \
......
...@@ -21,7 +21,7 @@ main (int argc, char *argv[]) ...@@ -21,7 +21,7 @@ main (int argc, char *argv[])
for (i=1; i<argc; i++) { for (i=1; i<argc; i++) {
error_code = slurm_cancel_job ((uint32_t) atoi(argv[i])); error_code = slurm_cancel_job ((uint32_t) atoi(argv[i]));
if (error_code) if (error_code)
printf ("slurm_cancel error %d for job %s\n", printf ("slurm_cancel_job error %d for job %s\n",
errno, argv[i]); errno, argv[i]);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment