Commit ecf16152 authored by Frank Winkler's avatar Frank Winkler

Modified redis script.

parent 82b4d262
......@@ -12,6 +12,7 @@ import argparse
from time import sleep
from itertools import count, groupby
#from pprint import pprint
def list_to_ranges(L):
G=(list(x) for _,x in groupby(L, lambda x,c=count(): next(c)-x))
......@@ -20,7 +21,9 @@ def list_to_ranges(L):
def main(job_id, debug_path, env_file, force):
redis_host = os.environ["REDIS_HOST"]
connection = redis.StrictRedis(host=redis_host, port=6379, socket_timeout=10, socket_connect_timeout=10)
redis_port = os.environ["REDIS_PORT"]
redis_password = os.environ["REDIS_PASSWORD"]
connection = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password, socket_timeout=10, socket_connect_timeout=10)
debug_file = None
if debug_path:
......@@ -31,14 +34,14 @@ def main(job_id, debug_path, env_file, force):
slurm_env_string = None
haveConnectionError = False
try:
slurm_env_string = connection.get("prope_" + str(job_id))
slurm_env_string = connection.get("pika_" + str(job_id))
except: # redis.exceptions.TimeoutError:
haveConnectionError = True
t = 0
while slurm_env_string == None and t < 10:
try:
slurm_env_string = connection.get("prope_" + str(job_id))
slurm_env_string = connection.get("pika_" + str(job_id))
except: # redis.exceptions.TimeoutError:
haveConnectionError = True
continue
......@@ -63,6 +66,8 @@ def main(job_id, debug_path, env_file, force):
except:
slurm_env = {}
#pprint(slurm_env)
if debug_path and debug_file:
if slurm_env:
debug_file.write(str(slurm_env))
......@@ -72,13 +77,13 @@ def main(job_id, debug_path, env_file, force):
# check if this job is exclusive and no monitoring is requested
monitoring_on = -1
if slurm_env:
nodes_shared = str(slurm_env['shared'])
nodes_exclusive = str(slurm_env['JOB_RECORD__WHOLE_NODE'])
if nodes_shared == "OK":
if nodes_exclusive == "0":
monitoring_on = 1
else:
#check no_monitoring comment
slurm_comment = str(slurm_env['comment'])
slurm_comment = str(slurm_env['JOB_RECORD__COMMENT'])
if 'no_monitoring' in slurm_comment:
monitoring_on = 0
else:
......@@ -91,115 +96,63 @@ def main(job_id, debug_path, env_file, force):
# print result as return value of the script
print(monitoring_on)
def save_job_env(env_file, slurm_env, connection, debug_file):
# set default values
start_time = 0
work_dir = "'n/a'"
exclusive = 0
partition_name = "'n/a'"
num_cpus = 0
cpu_allocated = "'n/a'"
job_name = "'n/a'"
walltime = 0
walltime_formatted = "'n/a'"
cpu_allocated = "'n/a'"
job_array_id = "None"
account = "'n/a'"
num_cores = 0
job_user = None
if "SLURM_JOB_USER" in os.environ:
job_user = os.environ["SLURM_JOB_USER"]
# get selected job information
if slurm_env:
start_time = str(slurm_env['start_time'])
partition_name = str(slurm_env['partition'])
nodes_shared = str(slurm_env['shared'])
work_dir = str(slurm_env['work_dir'])
if nodes_shared == "OK":
total_cpus_allocated = int(slurm_env['num_cpus'])
node_count = int(slurm_env['num_nodes'])
#print "Shared: " + str(total_cpus_allocated / node_count) + " cpus per node on partition " + partition_name
# get partition data
partition_data_string = connection.get(partition_name)
partition_data = cPickle.loads(partition_data_string)
#partition_data = ast.literal_eval(str(partition_data_string))
try:
cpus_avail = int(partition_data['max_cpus_per_node'])
#print(str(partition_data))
except:
cpus_avail = -1
#print(str(partition_data))
if (total_cpus_allocated / node_count) == cpus_avail:
#print "Exclusive with " + str(total_cpus_allocated / node_count) + " cpus per node on partition " + partition_name
exclusive = 1
else:
#print "Exclusive flag set by user"
exclusive = 1
exclusive = str(slurm_env['JOB_RECORD__WHOLE_NODE'])
#determine number of CPUs (hardware threads)
num_cpus = int(slurm_env['JOB_RECORD__CPU_CNT'])
cpu_allocated = ''
if exclusive == 0:
for key, value in slurm_env['cpus_alloc_layout'].items():
cpu_allocated += str(key) + str('[') + str(list_to_ranges(value)) + str('],')
for idx, val in enumerate(slurm_env['JOB_RECORD__CPU_IDS']):
cpu_allocated += slurm_env['JOB_RECORD__NODE_NAMES'][idx] + str('[') + str(list_to_ranges(val)) + str('],')
#remove last comma from string
cpu_allocated = cpu_allocated[:-1]
#remove last comma from string
cpu_allocated = cpu_allocated[:-1]
try:
job_name = str(slurm_env['name'])
except:
job_name = 'corrupt'
# try:
# job_name = str(slurm_env['name'])
# except:
# job_name = 'corrupt'
#determine job user
if not job_user:
job_user = pwd.getpwuid(slurm_env['user_id']).pw_name
try:
walltime = slurm_env['time_limit']
walltime = slurm_env['JOB_RECORD__TIME_LIMIT']
except:
walltime = 0
#convert walltime from minutes to seconds
walltime *= 60
walltime_formatted = slurm_env['time_limit_str']
job_array_id = slurm_env['array_job_id']
#determine account
account = str(slurm_env['account'])
#determine number of cores
num_cores = str(slurm_env['num_cpus'])
#account = str(slurm_env['account'])
if env_file:
f = open(env_file,'w')
print( "#!/bin/bash", file=f )
print( "export PIKA_JOB_START=" + str(start_time), file=f )
print( "export PIKA_JOB_USER=" + str(job_user), file=f )
print( "export PIKA_JOB_EXCLUSIVE=" + str(exclusive), file=f )
print( "export PIKA_JOB_PARTITION=" + partition_name, file=f )
print( "export PIKA_JOB_NUM_CORES=" + str(num_cpus), file=f )
print( "export PIKA_JOB_CPUS_ALLOCATED=" + str(cpu_allocated), file=f )
print( "export PIKA_JOB_NAME='" + job_name + "'", file=f )
print( "export PIKA_JOB_WALLTIME=" + str(walltime), file=f )
print( "export PIKA_JOB_WALLTIME_FORMATTED=" + str(walltime_formatted), file=f )
print( "export PIKA_JOB_CPUS_ALLOCATED=" + str(cpu_allocated), file=f )
print( "export PIKA_JOB_ARRAY_ID=" + str(job_array_id), file=f )
print( "export PIKA_JOB_ACCOUNT=" + str(account), file=f )
print( "export PIKA_JOB_NUM_CORES=" + str(num_cores), file=f )
print( "export PIKA_JOB_WORK_DIR=" + str(work_dir), file=f )
f.close()
if debug_file:
debug_file.write("\nReservation: " + str(exclusive))
debug_file.write("\nPartition: " + partition_name)
debug_file.write("\nJob Name: " + str(slurm_env['name']))
debug_file.write("\nWalltime: " + str(slurm_env['time_limit']))
#debug_file.write("\n" + str(cpu_allocated))
debug_file.write("\n\n")
debug_file.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment