Commit 154c337c authored by Frank Winkler's avatar Frank Winkler
Browse files

New test version.

parent f51815cf
......@@ -8,7 +8,7 @@ PLUGIN_TYPE = prep
PLUGIN_NAME = pika
PLUGIN_FILE = $(PLUGIN_TYPE)_$(PLUGIN_NAME).so
SRC_FILE = slurm-prep-pika.c
SRC_FILE = slurm-prep-pika_v1.c
CC = gcc
CFLAGS ?= -Wall -fPIC -g -I$(SLURM_INC_DIR) -I$(SLURM_BUILD_DIR)
......
/*****************************************************************************\
* prep_script.c - PrEp script plugin, handles Prolog / Epilog /
* PrologSlurmctld / EpilogSlurmctld scripts
*****************************************************************************
* Written by Frank Winkler
*
* LICENSE NOTICE ?!?!?!?!
\*****************************************************************************/
#include "slurm/slurm.h"
#include "slurm/slurm_errno.h"
#include "src/common/prep.h"
#include "src/common/macros.h"
#include "src/common/xmalloc.h"
#include "src/common/xstring.h"
#include "src/common/parse_time.h"
#include "src/common/uid.h"
/*
* These variables are required by the generic plugin interface. If they
* are not found in the plugin, the plugin loader will ignore it.
*
* plugin_name - a string giving a human-readable description of the
* plugin. There is no maximum length, but the symbol must refer to
* a valid string.
*
* plugin_type - a string suggesting t#include <time.h>he type of the plugin or its
* applicability to a particular form of data or method of data handling.
* If the low-level plugin API is used, the contents of this string are
* unimportant and may be anything. Slurm uses the higher-level plugin
* interface which requires this string to be of the form
*
* <application>/<method>
*
* where <application> is a description of the intended application of
* the plugin (e.g., "auth" for Slurm authentication) and <method> is a
* description of how this plugin satisfies that application. Slurm will
* only load authentication plugins if the plugin_type string has a prefix
* of "auth/".
*
* plugin_version - an unsigned 32-bit integer containing the Slurm version
* (major.minor.micro combined into a single number).
*/
#define PLUGIN_NAME "PrEp-pika: "
#define JSON_LEN 128
const char plugin_name[] = "PrEp plugin pika";
const char plugin_type[] = "prep/pika";
const uint32_t plugin_version = SLURM_VERSION_NUMBER;
static bool have_prolog_slurmctld = false;
static bool have_epilog_slurmctld = false;
void (*prolog_slurmctld_callback)(int rc, uint32_t job_id) = NULL;
void (*epilog_slurmctld_callback)(int rc, uint32_t job_id) = NULL;
static pthread_mutex_t plugin_log_lock = PTHREAD_MUTEX_INITIALIZER;
/* json helper data types */
typedef enum
{
SUCCESS = 0,
JSON_OUT_OF_MEMORY = -2,
E_HOST_LIST_SHIFT = -3,
E_HOST_LIST_CREATE = -4,
} internal_error_t;
typedef enum
{
NONE = -1,
U16,
U32,
U64,
U16PP,
CHARPP,
CHARP,
TIMEP,
} value_type_t;
typedef struct {
uint16_t **items;
uint16_t *item_counts; // common type for sub count in slurm?
uint32_t count; // common type for over all count in slurm?
} uint16_2d_t;
typedef struct {
char **items;
int count;
} char_2d_t;
typedef union
{
uint16_t uint16;
uint32_t uint32;
uint64_t uint64;
char *charp;
char_2d_t charpp;
uint16_2d_t uint16pp;
time_t *timep;
} value_t;
typedef struct
{
const char *key;
value_type_t type;
value_t value;
} key_value_pair_t;
/* Added spank_like output functions for convenience */
extern void slurm_info (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_error (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_verbose (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_debug (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_debug2 (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
extern void slurm_debug3 (const char *format, ...)
__attribute__ ((format (printf, 1, 2)));
/* Forward declarations */
static int job_ptr_to_json(job_record_t *job_ptr, char **json, bool is_epilog);
static const char *error_to_string(internal_error_t error);
static int pika_metadata_log(int job_id, char *json, bool is_epilog);
/* Helper macro.
* Uses _expr (usually an expression returning a number/error)
* to either advance an offset value _o by that number or
* return the number as error _e if it is negative */
#define ADVANCE_OR_RETURN(_o, _e, _expr) \
{\
_e = _expr; \
if (_e < 0) return _e; \
else _o += _e; \
}
extern int init(void)
{
slurm_info(PLUGIN_NAME "init\n");
//gpu_plugin_init();
return SLURM_SUCCESS;
}
extern void fini(void)
{
slurm_info(PLUGIN_NAME "fini\n");
//gpu_plugin_fini();
}
extern void prep_p_register_callbacks(prep_callbacks_t *callbacks)
{
slurm_info(PLUGIN_NAME "prep_p_register_callbacks\n");
/*
* Cannot safely run these without a valid callback, so disable
* them.
*/
if (!(prolog_slurmctld_callback = callbacks->prolog_slurmctld))
have_prolog_slurmctld = false;
if (!(epilog_slurmctld_callback = callbacks->epilog_slurmctld))
have_epilog_slurmctld = false;
}
extern int prep_p_prolog(job_env_t *job_env, slurm_cred_t *cred)
{
slurm_info(PLUGIN_NAME "prep_p_prolog\n");
return SLURM_SUCCESS;
}
extern int prep_p_epilog(job_env_t *job_env, slurm_cred_t *cred)
{
slurm_info(PLUGIN_NAME "prep_p_epilog\n");
return SLURM_SUCCESS;
}
extern int prep_p_prolog_slurmctld(job_record_t *job_ptr, bool *async)
{
int rc = SLURM_SUCCESS;
char* user = uid_to_string_or_null(job_ptr->user_id);
if ( strcmp(user, "fwinkler") != 0 &&
strcmp(user, "rotscher") != 0 ) {
xfree(user);
return rc;
}
xfree(user);
slurm_info(PLUGIN_NAME "prep_p_prolog_slurmctld\n");
char *json;
internal_error_t _rc = job_ptr_to_json(job_ptr, &json, false);
if (_rc) {
slurm_error(PLUGIN_NAME "%s", error_to_string(_rc));
rc = SLURM_ERROR;
}
*async = have_prolog_slurmctld;
if (*async) {
/* MUST run before async task finishes */
prolog_slurmctld_callback(rc, job_ptr->job_id);
}
// Some async task
if (rc == SLURM_SUCCESS) {
slurm_info(PLUGIN_NAME "prep_p_prolog_slurmctld User %s\n", job_ptr->user_name);
pika_metadata_log(job_ptr->job_id, json, false);
}
return rc;
}
extern int prep_p_epilog_slurmctld(job_record_t *job_ptr, bool *async)
{
int rc = SLURM_SUCCESS;
char* user = uid_to_string_or_null(job_ptr->user_id);
if ( strcmp(user, "fwinkler") != 0 &&
strcmp(user, "rotscher") != 0 ) {
xfree(user);
return rc;
}
xfree(user);
slurm_info(PLUGIN_NAME "prep_p_epilog_slurmctld\n");
char *json;
internal_error_t _rc = job_ptr_to_json(job_ptr, &json, true);
if (_rc) {
slurm_error(PLUGIN_NAME "%s", error_to_string(_rc));
rc = SLURM_ERROR;
}
*async = have_epilog_slurmctld;
if (*async) {
/* MUST run before async task finishes */
epilog_slurmctld_callback(rc, job_ptr->job_id);
}
// Some async task
if (rc == SLURM_SUCCESS) {
slurm_info(PLUGIN_NAME "prep_p_epilog_slurmctld User %s %d\n", (job_ptr->user_name), job_ptr->user_id);
pika_metadata_log(job_ptr->job_id, json, true);
}
return rc;
}
/********************
* plugin functions *
********************/
static const char *error_to_string(internal_error_t error)
{
switch (error) {
case SUCCESS:
return "success";
case JSON_OUT_OF_MEMORY:
return "ran out of memory when building json string";
case E_HOST_LIST_SHIFT:
case E_HOST_LIST_CREATE:
return "failed to retrieve node names";
default:
return "unknown error";
}
}
static void get_memory(job_record_t *job_ptr,
uint64_t *mem_per_cpu,
uint64_t *mem_per_node)
{
uint64_t pn_min_memory = job_ptr->details->pn_min_memory;
*mem_per_cpu = NO_VAL64;
*mem_per_node = NO_VAL64;
if (pn_min_memory & MEM_PER_CPU){
*mem_per_cpu = pn_min_memory & ~MEM_PER_CPU;
} else {
*mem_per_node = pn_min_memory;
}
}
static int core_bitmaps_array(job_record_t *job_ptr,
uint16_t ***dest_ids,
uint16_t **dest_cnts)
{
int rc = SLURM_SUCCESS;
job_resources_t *job_resrcs = job_ptr->job_resrcs;
uint16_t ncores = 0;
uint32_t nhosts = job_resrcs->nhosts;
uint32_t reps_remain = 0;
int h, c, b, sock_ind;
bitoff_t offset;
bitstr_t *bitmap = job_resrcs->core_bitmap;
*dest_ids = xmalloc(sizeof(uint16_t*) * nhosts);
*dest_cnts = xmalloc(sizeof(uint16_t) * nhosts);
for(h = 0, sock_ind = 0, offset = 0; h < nhosts && !rc; ++h){
if(reps_remain == 0){
reps_remain = job_resrcs->sock_core_rep_count[sock_ind];
ncores = job_resrcs->sockets_per_node[sock_ind] *
job_resrcs->cores_per_socket[sock_ind];
++sock_ind;
}
--reps_remain;
(*dest_ids)[h] = xmalloc(sizeof(uint16_t) * ncores);
for(c = 0, b = 0; c < ncores && b < ncores; ++b){
if(bit_test(bitmap, b + offset)){
(*dest_ids)[h][c++] = b;
}
}
(*dest_cnts)[h] = c;
offset += ncores;
}
return rc;
}
static int snprintf_realloc(char **json, size_t offset, const char *fmt, ...)
{
va_list args;
size_t size = xsize(*json);
int rc;
va_start(args, fmt);
rc = vsnprintf(*json + offset, size - offset, fmt, args);
va_end(args);
if (rc >= size - offset) {
do {
if (size >= SIZE_MAX / 2) {
return JSON_OUT_OF_MEMORY;
}
size *= 2;
} while (rc >= size - offset);
*json = xrealloc(*json, sizeof(char) * size);
va_start(args, fmt);
rc = vsnprintf(*json + offset, size - offset, fmt, args);
va_end(args);
}
return rc;
}
static int json_init(char **json, size_t offset, uint32_t job_id)
{
return snprintf_realloc(json, offset, "{\"%"PRIu32"\":{", job_id);
}
static int json_append_uint16(char **json, size_t offset, const char *key, uint16_t value)
{
return snprintf_realloc(json, offset, "\"%s\": %"PRIu16",", key, value);
}
static int json_append_uint32(char **json, size_t offset, const char *key, uint32_t value)
{
return snprintf_realloc(json, offset, "\"%s\": %"PRIu32",", key, value);
}
static int json_append_uint64(char **json, size_t offset, const char *key, uint64_t value)
{
return snprintf_realloc(json, offset, "\"%s\": %"PRIu64",", key, value);
}
static int json_append_uint16_array_2d(char **json, size_t offset, const char *key, uint16_t **values, uint16_t *value_counts, uint32_t count)
{
int i, j;
int error = 0;
int rc = 0;
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset, "\"%s\": [", key));
for (i = 0; i < count; ++i) {
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset + rc, "["));
for (j = 0; j < value_counts[i]; ++j) {
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset + rc, "%i,", values[i][j]));
}
if (j > 0) {
(*json)[strlen(*json) - 1] = '\0';
--rc;
}
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset + rc, "],"));
}
if (i > 0) {
(*json)[strlen(*json) - 1] = '\0';
--rc;
}
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset + rc, "],"));
return rc;
}
static int json_append_string_array(char **json, size_t offset, const char* key, char **values, int count)
{
int rc = 0, error = 0;
int i;
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset, "\"%s\": [", key));
for (i = 0; i < count; ++i) {
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset + rc, "\"%s\",", values[i]));
}
if (i > 0) {
(*json)[offset + rc] = '\0';
--rc;
}
ADVANCE_OR_RETURN(rc, error, snprintf_realloc(json, offset + rc, "],"));
return rc;
}
static int json_append_string(char **json, size_t offset, const char *key, const char *value)
{
return snprintf_realloc(json, offset, "\"%s\": \"%s\",", key, value);
}
static int json_append_time(char **json, size_t offset, const char *key,
time_t *time_ptr)
{
char time_str[32];
slurm_make_time_str(time_ptr, time_str, 32);
return snprintf_realloc(json, offset, "\"%s\": \"%s\",", key, time_str);
}
static int json_fini(char **json)
{
size_t offstet = strlen(*json);
(*json)[offstet - 1] = '}';
return snprintf_realloc(json, offstet, "%c", '}');
}
static int
json_append_key_value_pair(char **json, size_t offset, key_value_pair_t *kvp)
{
switch (kvp->type) {
case U16:
return json_append_uint16(
json, offset, kvp->key,
kvp->value.uint16
);
case U32:
return json_append_uint32(
json, offset, kvp->key,
kvp->value.uint32
);
case U64:
return json_append_uint64(
json, offset, kvp->key,
kvp->value.uint64
);
case CHARPP:
return json_append_string_array(
json, offset, kvp->key,
kvp->value.charpp.items,
kvp->value.charpp.count
);
case CHARP:
return json_append_string(
json, offset, kvp->key,
kvp->value.charp
);
case U16PP:
return json_append_uint16_array_2d(
json, offset, kvp->key,
kvp->value.uint16pp.items,
kvp->value.uint16pp.item_counts,
kvp->value.uint16pp.count
);
case TIMEP:
return json_append_time(
json, offset, kvp->key, kvp->value.timep
);
default:
return snprintf_realloc(
json, offset,
"\"%s\":\"data type not recognised\",", kvp->key
);
}
}
static int get_nodes_names(job_record_t *job_ptr, char ***node_names, int *count)
{
int rc = 0, i;
char *host;
hostlist_t hl;
if ((hl = slurm_hostlist_create(job_ptr->job_resrcs->nodes))) {
*count = slurm_hostlist_count(hl);
*node_names = xmalloc(sizeof(char*) * *count);
for (i = 0; i < *count && !rc; ++i) {
if ((host = slurm_hostlist_shift(hl))) {
(*node_names)[i] = host;
} else {
rc = E_HOST_LIST_SHIFT;
}
}
hostlist_destroy(hl);
return rc;
}
return E_HOST_LIST_CREATE;
}
static void free_nodes_names(char **node_names, int count)
{
int i;
char *item;
for (i = 0; i < count; ++i) {
item = node_names[i];
if (item) {
free(item);
}
}
xfree(node_names);
}
static void get_job_state(job_record_t *job_ptr, char **state)
{
if ( IS_JOB_RUNNING(job_ptr) )
sprintf(*state, "running");
else if ( IS_JOB_COMPLETE(job_ptr) )
sprintf(*state, "completed");
else if ( IS_JOB_CANCELLED(job_ptr) )
sprintf(*state, "cancelled");
else if ( IS_JOB_TIMEOUT(job_ptr) )
sprintf(*state, "timeout");
else if ( IS_JOB_OOM(job_ptr) )
sprintf(*state, "OOM");
else
sprintf(*state, "failed");
}
static void convert_gpu_string(char* gpu_gres, uint16_t **gpu_ids, uint16_t *ngpus)
{
int i, first, last;
first = 0;
last = 0;
bool d_first = false;
//printf("gpu_gres = %s\n", gpu_gres);
char *pch= strdup(gpu_gres);
char *ptr = strtok(pch, ":");
while(ptr != NULL)
{
if ( strstr(ptr, "-") ) {
//printf("'%s'\n", ptr);
char *token = strtok(ptr, "-)");
while(token != NULL)
{
//printf("'%s'\n", token);
if ( d_first == false ) {
d_first = true;
first = atoi(token);
} else {
last = atoi(token);
}
token = strtok(NULL, "-)");
}
free(token);
}
ptr = strtok(NULL, ":");
}
free(ptr);
free(pch);
int temp_ngpus = last - first + 1;
if ( temp_ngpus < 1 ) temp_ngpus = 1;
*ngpus = temp_ngpus;
*gpu_ids = xmalloc(sizeof(uint16_t*) * (*ngpus));
for (i = 0; i < (*ngpus); i++) {
(*gpu_ids)[i] = first++;
}
}
static int get_gpu_data( job_record_t *job_ptr,
uint32_t *gpu_alloc_cnt,
uint16_t ***gpu_ids,
uint16_t **gpu_cnts)
{
int rc = SLURM_SUCCESS;
//check gpu_alloc_cnt
char *gres_alloc = strdup(job_ptr->gres_alloc);
if ( gres_alloc != NULL ) {
char* ptr = strtok(gres_alloc, "gpu:");
while(ptr != NULL)
{
*gpu_alloc_cnt = atoi(ptr);
ptr = strtok(NULL, "gpu:");
}
free(ptr);
}
free(gres_alloc);
//determine gpu ids
int i,j;
uint16_t *temp_gpu_ids;
uint16_t temp_ngpus = 0;