diff --git a/src/common/slurm_protocol_api.c b/src/common/slurm_protocol_api.c index d7c679d8cf9fe07869b3701ef959dad0d1288c63..a3fdd58ee1b7771cf2acd32272c92e6f350d8b7a 100644 --- a/src/common/slurm_protocol_api.c +++ b/src/common/slurm_protocol_api.c @@ -3648,6 +3648,50 @@ static void _print_data(char *data, int len) #endif +/* + * slurm_forward_data - forward arbitrary data to unix domain sockets on nodes + * IN nodelist: nodes to forward data to + * IN address: address of unix domain socket + * IN len: length of data + * IN data: real data + * RET: error code + */ +extern int +slurm_forward_data(char *nodelist, char *address, uint32_t len, char *data) +{ + List ret_list = NULL; + int temp_rc = 0, rc = 0; + ret_data_info_t *ret_data_info = NULL; + slurm_msg_t *msg = xmalloc(sizeof(slurm_msg_t)); + forward_data_msg_t req; + + slurm_msg_t_init(msg); + + debug("slurm_forward_data: nodelist=%s, address=%s, len=%u", + nodelist, address, len); + req.address = address; + req.len = len; + req.data = data; + + msg->msg_type = REQUEST_FORWARD_DATA; + msg->data = &req; + + if ((ret_list = slurm_send_recv_msgs(nodelist, msg, 0, false))) { + while ((ret_data_info = list_pop(ret_list))) { + temp_rc = slurm_get_return_code(ret_data_info->type, + ret_data_info->data); + if (temp_rc) + rc = temp_rc; + } + } else { + error("slurm_forward_data: no list was returned"); + rc = SLURM_ERROR; + } + + slurm_free_msg(msg); + return rc; +} + /* * vi: shiftwidth=8 tabstop=8 expandtab diff --git a/src/common/slurm_protocol_defs.c b/src/common/slurm_protocol_defs.c index 62d0249bada2c0c991e7d6d6efab928fc2ca112a..df45a90d27f0bc7cdcb4f08c2603d6ec7a93130b 100644 --- a/src/common/slurm_protocol_defs.c +++ b/src/common/slurm_protocol_defs.c @@ -968,6 +968,15 @@ extern void slurm_free_will_run_response_msg(will_run_response_msg_t *msg) } } +inline void slurm_free_forward_data_msg(forward_data_msg_t *msg) +{ + if (msg) { + xfree(msg->address); + xfree(msg->data); + xfree(msg); + } +} + extern char *preempt_mode_string(uint16_t preempt_mode) { char *gang_str; diff --git a/src/common/slurm_protocol_defs.h b/src/common/slurm_protocol_defs.h index 155527dcf2eec5fddbacb547c80198ac9ec2e10a..52dbe6a2958a555fd0a0e7f98e18e4ef8f0b61ad 100644 --- a/src/common/slurm_protocol_defs.h +++ b/src/common/slurm_protocol_defs.h @@ -284,7 +284,8 @@ typedef enum { RESPONSE_SLURMD_STATUS, RESPONSE_SLURMCTLD_STATUS, REQUEST_JOB_STEP_PIDS, - RESPONSE_JOB_STEP_PIDS, + RESPONSE_JOB_STEP_PIDS, + REQUEST_FORWARD_DATA, REQUEST_LAUNCH_TASKS = 6001, RESPONSE_LAUNCH_TASKS, @@ -900,6 +901,12 @@ typedef struct will_run_response_msg { time_t start_time; /* time when job will start */ } will_run_response_msg_t; +typedef struct forward_data_msg { + char *address; + uint32_t len; + char *data; +} forward_data_msg_t; + /*****************************************************************************\ * Slurm API Message Types \*****************************************************************************/ @@ -1000,6 +1007,7 @@ extern void slurm_free_priority_factors_request_msg( priority_factors_request_msg_t *msg); extern void slurm_free_priority_factors_response_msg( priority_factors_response_msg_t *msg); +extern void slurm_free_forward_data_msg(forward_data_msg_t *msg); #define slurm_free_timelimit_msg(msg) \ slurm_free_kill_job_msg(msg) diff --git a/src/common/slurm_protocol_pack.c b/src/common/slurm_protocol_pack.c index 0184391c531f82e819a0121a556db2bb9c5d39aa..45eee976d520f05b8496d04e819a9a05c2d22c63 100644 --- a/src/common/slurm_protocol_pack.c +++ b/src/common/slurm_protocol_pack.c @@ -613,6 +613,11 @@ static int _unpack_stats_request_msg(stats_info_request_msg_t **msg_ptr, static int _unpack_stats_response_msg(stats_info_response_msg_t **msg_ptr, Buf buffer, uint16_t protocol_version); +static void _pack_forward_data_msg(forward_data_msg_t *msg, + Buf buffer, uint16_t protocol_version); +static int _unpack_forward_data_msg(forward_data_msg_t **msg_ptr, + Buf buffer, uint16_t protocol_version); + /* pack_header * packs a slurm protocol header that precedes every slurm message * IN header - the header structure to pack @@ -1188,6 +1193,11 @@ pack_msg(slurm_msg_t const *msg, Buf buffer) _pack_stats_response_msg((slurm_msg_t *)msg, buffer); break; + case REQUEST_FORWARD_DATA: + _pack_forward_data_msg((forward_data_msg_t *)msg->data, + buffer, msg->protocol_version); + break; + default: debug("No pack method for msg type %u", msg->msg_type); return EINVAL; @@ -1749,7 +1759,12 @@ unpack_msg(slurm_msg_t * msg, Buf buffer) &msg->data, buffer, msg->protocol_version); break; - + + case REQUEST_FORWARD_DATA: + rc = _unpack_forward_data_msg((forward_data_msg_t **)&msg->data, + buffer, msg->protocol_version); + break; + default: debug("No unpack method for msg type %u", msg->msg_type); return EINVAL; @@ -10130,6 +10145,35 @@ unpack_error: return SLURM_ERROR; } +static void _pack_forward_data_msg(forward_data_msg_t *msg, + Buf buffer, uint16_t protocol_version) +{ + xassert (msg != NULL); + packstr(msg->address, buffer); + pack32(msg->len, buffer); + packmem(msg->data, msg->len, buffer); +} + +static int _unpack_forward_data_msg(forward_data_msg_t **msg_ptr, + Buf buffer, uint16_t protocol_version) +{ + forward_data_msg_t *msg; + uint32_t temp32; + + xassert (msg_ptr != NULL); + msg = xmalloc(sizeof(forward_data_msg_t)); + *msg_ptr = msg; + safe_unpackstr_xmalloc(&msg->address, &temp32, buffer); + safe_unpack32(&msg->len, buffer); + safe_unpackmem_xmalloc(&msg->data, &temp32, buffer); + + return SLURM_SUCCESS; + +unpack_error: + slurm_free_forward_data_msg(msg); + *msg_ptr = NULL; + return SLURM_ERROR; +} static void _pack_checkpoint_msg(checkpoint_msg_t *msg, Buf buffer, diff --git a/src/slurmd/slurmd/req.c b/src/slurmd/slurmd/req.c index 3c94540460ed42fad902d66fbc5dd40ce6e0ba63..cc8d5fe1a9621aa4278fd6bd99a89d0617746c82 100644 --- a/src/slurmd/slurmd/req.c +++ b/src/slurmd/slurmd/req.c @@ -53,6 +53,7 @@ #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> +#include <sys/un.h> #include <utime.h> #include <grp.h> @@ -161,6 +162,8 @@ static int _run_prolog(uint32_t jobid, uid_t uid, char *resv_id, char **spank_job_env, uint32_t spank_job_env_size); static int _run_epilog(uint32_t jobid, uid_t uid, char *resv_id, char **spank_job_env, uint32_t spank_job_env_size); +static void _rpc_forward_data(slurm_msg_t *msg); + static bool _pause_for_job_completion(uint32_t jobid, char *nodes, int maxtime); @@ -367,6 +370,10 @@ slurmd_req(slurm_msg_t *msg) _rpc_job_notify(msg); slurm_free_job_notify_msg(msg->data); break; + case REQUEST_FORWARD_DATA: + _rpc_forward_data(msg); + slurm_free_forward_data_msg(msg->data); + break; default: error("slurmd_req: invalid request msg type %d", msg->msg_type); @@ -4417,3 +4424,59 @@ static void _wait_for_job_running_prolog(uint32_t job_id) slurm_mutex_unlock(&conf->prolog_running_lock); debug( "Finished wait for job %d's prolog to complete", job_id); } + + +static void +_rpc_forward_data(slurm_msg_t *msg) +{ + forward_data_msg_t *req = (forward_data_msg_t *)msg->data; + uint32_t req_uid; + struct sockaddr_un sa; + int fd = -1, rc; + + debug3("Entering _rpc_forward_data, address: %s, len: %u", + req->address, req->len); + + /* sanity check */ + if (strlen(req->address) > sizeof(sa.sun_path) - 1) { + rc = EINVAL; + goto done; + } + + /* connect to specified address */ + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) { + error("failed creating UNIX domain socket: %m"); + goto done; + } + memset(&sa, 0, sizeof(sa)); + sa.sun_family = AF_UNIX; + strcpy(sa.sun_path, req->address); + while ((rc = connect(fd, (struct sockaddr *)&sa, SUN_LEN(&sa)) < 0) && + (errno == EINTR)); + if (rc < 0) { + debug2("failed connecting to specified socket '%s': %m", + req->address); + goto done; + } + + req_uid = (uint32_t)g_slurm_auth_get_uid(msg->auth_cred, NULL); + /* + * although always in localhost, we still convert it to network + * byte order, to make it consistent with pack/unpack. + */ + req_uid = htonl(req_uid); + safe_write(fd, &req_uid, sizeof(uint32_t)); + req_uid = htonl(req->len); + safe_write(fd, &req_uid, sizeof(uint32_t)); + safe_write(fd, req->data, req->len); + +rwfail: +done: + if (fd >= 0) + close(fd); + rc = errno; + slurm_send_rc_msg(msg, rc); +} + +