Commit d5e46a8d authored by fwinkler's avatar fwinkler

Added new scripts for post proecessing.

parent 41c91f80
......@@ -58,7 +58,7 @@ while IFS= read -r line ; do
# check if
if [ "x$backup_avail" != "x" ] && [ $backup_avail -eq 0 ]; then
#echo "Shard ${shard_id} is not in backup"
echo -e "Backup shard ${shard_id} of ${shard_arr[1]} (${shard_arr[2]}): about $shard_duration hours ($start_seconds s -> $end_seconds s)" |2>&1 tee -a $DEBUG_PATH
echo -e "Backup shard ${shard_id} of ${shard_arr[1]} (${shard_arr[2]}): about $shard_duration hours ($start_seconds s -> $end_seconds s)"
backup_dir="${backup_path}/${shard_id}_${start_seconds}"
......@@ -70,27 +70,27 @@ while IFS= read -r line ; do
#if directory exists, directory is not empty, no file name contains "pending", and a "tar.gz" file exists, then add backup to backup table
if [ -d "${backup_dir}" ] && [ "$(ls -A ${backup_dir})" ] && [ $(ls -l ${backup_dir} | grep -c pending) -eq 0 ] && [ $(ls -l ${backup_dir} | grep -c tar.gz) -gt 0 ]; then
success=1
echo -e "Backup of shard ${shard_id} into ${backup_dir} successful!" |2>&1 tee -a $DEBUG_PATH
echo -e "Backup of shard ${shard_id} into ${backup_dir} successful!"
else
success=0
echo -e "Backup of shard ${shard_id} into ${backup_dir} failed!" |2>&1 tee -a $DEBUG_PATH
echo -e "Backup of shard ${shard_id} into ${backup_dir} failed!"
if [ -d "${backup_dir}" ]; then
echo -e "Backup directory ${backup_dir} exists." |2>&1 tee -a $DEBUG_PATH
echo -e "Backup directory ${backup_dir} exists."
fi
if [ "$(ls -A ${backup_dir})" ]; then
echo -e "Backup directory ${backup_dir} is not empty." |2>&1 tee -a $DEBUG_PATH
echo -e "Backup directory ${backup_dir} is not empty."
else
echo -e "Backup directory ${backup_dir} is empty: ls -A ${backup_dir} = $(ls -A ${backup_dir})" |2>&1 tee -a $DEBUG_PATH
echo -e "Backup directory ${backup_dir} is empty: ls -A ${backup_dir} = $(ls -A ${backup_dir})"
fi
if [ $(ls -l ${backup_dir} | grep -c pending) -eq 0 ]; then
echo -e "No filename contains pending." |2>&1 tee -a $DEBUG_PATH
echo -e "No filename contains pending."
fi
if [ $(ls -l ${backup_dir} | grep -c tar.gz) -gt 0 ]; then
echo -e "An archive tar.gz exists" |2>&1 tee -a $DEBUG_PATH
echo -e "An archive tar.gz exists"
fi
[[ ${backup_dir} = *influx* ]] && [[ ${backup_dir} = *backup* ]] && rm -rf ${backup_dir}
......@@ -100,10 +100,10 @@ while IFS= read -r line ; do
mysql_command "${query}"
else
echo -e "Shard $shard_id is already in backup ($backup_avail)." |2>&1 tee -a $DEBUG_PATH
echo -e "Shard $shard_id is already in backup ($backup_avail)."
fi
else
echo -e "Shard $shard_id is incomplete" |2>&1 tee -a $DEBUG_PATH
echo -e "Shard $shard_id is incomplete"
fi
done <<< "$(echo "$shards")"
......
This diff is collapsed.
......@@ -11,8 +11,8 @@ function create_lock()
{
# lock script
if [ -f ${LOCK_POST_PROCESSING_SCRIPT} ]; then
echo -e "Post processing is already running..." |2>&1 tee -a $DEBUG_PATH
echo -e "exit 0" |2>&1 tee -a $DEBUG_PATH
echo -e "Post processing is already running..." 2>&1 | tee -a $DEBUG_PATH
echo -e "exit 0" 2>&1 | tee -a $DEBUG_PATH
exit 0
else
touch ${LOCK_POST_PROCESSING_SCRIPT}
......@@ -22,31 +22,54 @@ function create_lock()
function release_lock()
{
#check debug output
echo "failed" 2>&1 | tee -a $DEBUG_PATH
echo "halloerrorhallo" 2>&1 | tee -a $DEBUG_PATH
echo "ModuleNotFoundError" 2>&1 | tee -a $DEBUG_PATH
check_debug
if [ -f ${LOCK_POST_PROCESSING_SCRIPT} ]; then
rm -f ${LOCK_POST_PROCESSING_SCRIPT}
echo -e "Post processing finished." |2>&1 tee -a $DEBUG_PATH
echo -e "Post processing finished." 2>&1 | tee -a $DEBUG_PATH
fi
}
function check_debug()
{
#grep for error or failed and send email if necessary
email_text=/tmp/pika_pp_email.txt
touch ${email_text}
egrep -i "error|failed" $DEBUG_PATH >> ${email_text}
#check size of mail text
email_text_size=$(stat -c%s "${email_text}")
if [ ${email_text_size} -gt 0 ]; then
send_email "[PIKA] post processing errors" ${email_text}
fi
rm -f ${email_text}
}
function backup_influx()
{
echo "Backup influxdb..." |2>&1 tee -a $DEBUG_PATH
source ${PIKA_ROOT}/post_processing/backup_influxdb.sh |2>&1 tee -a $DEBUG_PATH
echo "Backup influxdb..." 2>&1 | tee -a $DEBUG_PATH
source ${PIKA_ROOT}/post_processing/backup_influxdb.sh 2>&1 | tee -a $DEBUG_PATH
}
function restore_influx()
{
echo "Restore influxdb..." |2>&1 tee -a $DEBUG_PATH
echo "Restore influxdb..." 2>&1 | tee -a $DEBUG_PATH
# get all shards which are not restored yet
restore_shards=`mysql_command "select shard_id, start from Backup where success=1 and restored=0"`
echo ${restore_shards} |2>&1 tee -a $DEBUG_PATH
#echo ${restore_shards} 2>&1 | tee -a $DEBUG_PATH
# convert string output into indexed array
IFS=', ' read -r -a shard_array <<< "${restore_shards}"
len=${#shard_array[@]}
if [ ${len} -lt 2 ]; then
echo "Cannot find shards for restoring." |2>&1 tee -a $DEBUG_PATH
echo "Cannot find shards for restoring." 2>&1 | tee -a $DEBUG_PATH
return 0
fi
......@@ -56,8 +79,8 @@ function restore_influx()
do
shard_id=${shard_array[i]}
start_seconds=${shard_array[i+1]}
#source ${PIKA_ROOT}/post_processing/restore_influxdb.sh ${shard_id}_${start_seconds} |2>&1 tee -a $DEBUG_PATH
echo "Test $i"
source ${PIKA_ROOT}/post_processing/restore_influxdb.sh ${shard_id}_${start_seconds} 2>&1 | tee -a $DEBUG_PATH
if [ $? -eq 0 ]; then
mysql_command "UPDATE Backup SET restored=1 WHERE shard_id=${shard_id} AND start=${start_seconds};"
fi
......@@ -67,17 +90,117 @@ function restore_influx()
function revise_metadata()
{
echo "Revise metadata..." |2>&1 tee -a $DEBUG_PATH
echo "Revise metadata..." 2>&1 | tee -a $DEBUG_PATH
# get all shards which are not revised yet
revise_shards=`mysql_command "select shard_id,start,end from Backup where success=1 and restored=1 and metadata_corrected=0"`
#echo ${revise_shards} 2>&1 | tee -a $DEBUG_PATH
# convert string output into indexed array
IFS=', ' read -r -a shard_array <<< "${revise_shards}"
len=${#shard_array[@]}
if [ ${len} -lt 3 ]; then
echo "Cannot find shards for metadata revision." 2>&1 | tee -a $DEBUG_PATH
return 0
fi
len=$(( ${len} - 3 ))
i=0
while [ $i -le ${len} ]
do
shard_id=${shard_array[i]}
start_seconds=${shard_array[i+1]}
end_seconds=${shard_array[i+2]}
python3 ${PIKA_ROOT}/post_processing/revise_mariadb.py --start=${start_seconds} --end=${end_seconds} 2>&1 | tee -a $DEBUG_PATH
if [ $? -eq 0 ]; then
#delete jobs which have corrupt data
mysql_command "delete from Job_Data where NAME='n/a' and START>${start_seconds} and START<${end_seconds};"
#update jobs with negative duration or pending time
mysql_command "update Job_Data set START=END-40 where (END-START)<0 and STATUS<>'running' and START>${start_seconds} and START<${end_seconds};"
mysql_command "update Job_Data set SUBMIT=START-1 where (START-SUBMIT)<0 and START>${start_seconds} and START<${end_seconds};"
mysql_command "update Backup set metadata_corrected=1 WHERE shard_id=${shard_id} AND start=${start_seconds};"
fi
i=$(( $i + 3 ))
done
}
function create_footprints()
{
echo "Create footprints..." |2>&1 tee -a $DEBUG_PATH
echo "Create footprints..." 2>&1 | tee -a $DEBUG_PATH
# get all shards without footprints
footprint_shards=`mysql_command "select shard_id,start,end from Backup where success=1 and restored=1 and metadata_corrected=1 and footprints_created=0"`
#echo ${footprint_shards} 2>&1 | tee -a $DEBUG_PATH
# convert string output into indexed array
IFS=', ' read -r -a shard_array <<< "${footprint_shards}"
len=${#shard_array[@]}
if [ ${len} -lt 3 ]; then
echo "Cannot find shards for footprint generation." 2>&1 | tee -a $DEBUG_PATH
return 0
fi
len=$(( ${len} - 3 ))
i=0
while [ $i -le ${len} ]
do
shard_id=${shard_array[i]}
start_seconds=${shard_array[i+1]}
#set start one week before to get footprints for overlapping jobs
if [ $i -eq 0 ]; then
start_footprint=$((start_seconds-(3600*24*7)))
else
start_footprint=${start_seconds}
fi
end_seconds=${shard_array[i+2]}
python3 ${PIKA_ROOT}/post_processing/footprints.py --start=${start_footprint} --end=${end_seconds} --store_to_db 2>&1 | tee -a $DEBUG_PATH
if [ $? -eq 0 ]; then
mysql_command "update Backup set footprints_created=1 WHERE shard_id=${shard_id} AND start=${start_seconds};"
fi
i=$(( $i + 3 ))
done
}
function create_tags()
{
echo "Create tags..." |2>&1 tee -a $DEBUG_PATH
echo "Create tags..." 2>&1 | tee -a $DEBUG_PATH
# get all shards without tags
tag_shards=`mysql_command "select shard_id,start,end from Backup where success=1 and restored=1 and metadata_corrected=1 and footprints_created=1 and tags_created=0"`
#echo ${tag_shards} 2>&1 | tee -a $DEBUG_PATH
# convert string output into indexed array
IFS=', ' read -r -a shard_array <<< "${tag_shards}"
len=${#shard_array[@]}
if [ ${len} -lt 3 ]; then
echo "Cannot find shards for tag generation." 2>&1 | tee -a $DEBUG_PATH
return 0
fi
len=$(( ${len} - 3 ))
i=0
while [ $i -le ${len} ]
do
shard_id=${shard_array[i]}
start_seconds=${shard_array[i+1]}
#set start one week before to get tags for overlapping jobs
if [ $i -eq 0 ]; then
start_tag=$((start_seconds-(3600*24*7)))
else
start_tag=${start_seconds}
fi
end_seconds=${shard_array[i+2]}
python3 ${PIKA_ROOT}/post_processing/tags.py --start=${start_tag} --end=${end_seconds} 2>&1 | tee -a $DEBUG_PATH
if [ $? -eq 0 ]; then
mysql_command "update Backup set tags_created=1 WHERE shard_id=${shard_id} AND start=${start_seconds};"
fi
i=$(( $i + 3 ))
done
}
if [ ! $# -eq 1 ]; then
......@@ -90,6 +213,9 @@ else
create_lock
backup_influx
restore_influx
revise_metadata
create_footprints
create_tags
release_lock
;;
backup)
......@@ -102,6 +228,21 @@ else
restore_influx
release_lock
;;
revise)
create_lock
revise_metadata
release_lock
;;
footprint)
create_lock
create_footprints
release_lock
;;
tag)
create_lock
create_tags
release_lock
;;
*)
echo "$1 is not supported."
;;
......
#!/usr/bin/env python
from __future__ import print_function
import argparse
import os
#import mysql.connector as mariadb
import pymysql as mariadb
def getPartitionCoreNumber(partition):
if 'haswell' in partition: return 24
if 'west' in partition: return 12
if 'sandy' in partition: return 16
if 'broadwell' in partition: return 28
if 'gpu1' in partition: return 16
if 'gpu2' in partition: return 24
if 'smp1' in partition: return 32
if 'smp2' in partition: return 64
if 'knl' in partition: return 64
if 'ml' in partition: return 176
else: return 0
def getStatus(state):
statusdict = {}
statusdict[0] = 'pending'
statusdict[1] = 'running'
statusdict[2] = 'suspended'
statusdict[3] = 'completed'
statusdict[4] = 'cancelled'
statusdict[5] = 'failed'
statusdict[6] = 'timeout'
statusdict[7] = 'failed'
statusdict[8] = 'failed'
statusdict[9] = 'failed'
statusdict[10] = 'failed'
statusdict[11] = 'OOM'
if state >= 0 and state <= 11:
return statusdict[state]
else:
return statusdict[1]
def connect_to_mariadb_slurm():
#connection data for MariaDB
HOST = os.environ["MARIADB_SLURM_HOST"]
PORT = os.environ["MARIADB_SLURM_PORT"]
USER = os.environ["MARIADB_SLURM_USER"]
PASSWORD = os.environ["MARIADB_SLURM_PASSWORD"]
DATABASE = os.environ["MARIADB_SLURM_DATABASE"]
mariadb_connection = mariadb.connect(host=HOST, port=int(PORT),
user=USER, passwd=PASSWORD,
db=DATABASE, connect_timeout=10)
return mariadb_connection
def connect_to_mariadb_prope():
#connection data for MariaDB
HOST = os.environ["MARIADB_HOST"]
PORT = os.environ["MARIADB_PORT"]
USER = os.environ["MARIADB_USER"]
PASSWORD = os.environ["MARIADB_PASSWORD"]
DATABASE = os.environ["MARIADB_DATABASE"]
mariadb_connection = mariadb.connect(host=HOST, port=int(PORT),
user=USER, passwd=PASSWORD,
db=DATABASE, connect_timeout=10)
return mariadb_connection
def get_slurm_metadata(mariadb_connection, start, end):
cursor = mariadb_connection.cursor()
sql_cmd = """select
id_job as JID,
account as PROJECT,
state,
nodes_alloc as NUM_NODES,
nodelist as NODELIST,
cpus_req as NUM_CORES,
time_start as START,
time_end as END,
job_name as NAME,
(timelimit * 60) as WALLTIME,
time_submit as SUBMIT,
`partition` as P_PARTITION,
id_array_job as ARRAY_ID
from taurus_job_table
where time_start >= """ + str(start) + """
and time_start <= """ + str(end) #+ """ limit 10"""
#print(sql_cmd)
try:
cursor.execute(sql_cmd)
except mariadb.Error as error:
print("Error: {}".format(error))
results = cursor.fetchall()
all_meta_data = []
for job_data in results:
meta_data = {}
meta_data['JID'] = job_data[0]
meta_data['PROJECT'] = job_data[1]
meta_data['state'] = job_data[2]
meta_data['NUM_NODES'] = job_data[3]
meta_data['NODELIST'] = job_data[4]
meta_data['NUM_CORES'] = job_data[5]
meta_data['START'] = job_data[6]
meta_data['END'] = job_data[7]
meta_data['NAME'] = job_data[8]
meta_data['WALLTIME'] = job_data[9]
meta_data['SUBMIT'] = job_data[10]
meta_data['P_PARTITION'] = job_data[11]
meta_data['ARRAY_ID'] = job_data[12]
meta_data['STATUS'] = getStatus(meta_data['state'])
core_num = meta_data['NUM_NODES'] * getPartitionCoreNumber(meta_data['P_PARTITION'])
if core_num == meta_data['NUM_CORES']:
meta_data['EXCLUSIVE'] = 1
else:
meta_data['EXCLUSIVE'] = 0
all_meta_data.append(meta_data)
cursor.close()
return all_meta_data
def update_prope(mariadb_connection, slurm_data):
cursor = mariadb_connection.cursor()
sql_filter = " WHERE JID=" + str(slurm_data['JID']) + " AND START BETWEEN " + str(slurm_data['START']) + "-600 AND " + str(slurm_data['START']) + "+600"
sql_cmd = "SELECT JID,EXCLUSIVE FROM Job_Data" + str(sql_filter) + " LIMIT 1"
try:
cursor.execute(sql_cmd)
except mariadb.Error as error:
print("Error: {}".format(error))
results = cursor.fetchone()
if results is not None:
#print('Correct JID:',results[0])
exclusive = results[1]
sql_cmd = "UPDATE Job_Data SET "
sql_cmd += "PROJECT=\"" + str(slurm_data['PROJECT']) + "\","
sql_cmd += "STATUS=\"" + str(slurm_data['STATUS']) + "\","
sql_cmd += "NUM_CORES=" + str(slurm_data['NUM_CORES']) + ","
sql_cmd += "END=" + str(slurm_data['END']) + ","
try:
sql_cmd += "NAME=\"" + str(slurm_data['NAME']) + "\","
except:
sql_cmd += "NAME=\"unreadable\","
sql_cmd += "WALLTIME=" + str(slurm_data['WALLTIME']) + ","
sql_cmd += "SUBMIT=" + str(slurm_data['SUBMIT']) + ","
sql_cmd += "P_PARTITION=\"" + str(slurm_data['P_PARTITION']) + "\","
if exclusive == 0:
sql_cmd += "EXCLUSIVE="+ str(slurm_data['EXCLUSIVE']) + ","
sql_cmd += "ARRAY_ID=" + str(slurm_data['ARRAY_ID']) + str(sql_filter)
try:
cursor.execute(sql_cmd)
mariadb_connection.commit()
except mariadb.Error as error:
print("Error: {}".format(error))
cursor.close()
def main(start, end):
mariadb_slurm_connection = connect_to_mariadb_slurm()
mariadb_prope_connection = connect_to_mariadb_prope()
# get slurm data
slurm_meta_data = get_slurm_metadata(mariadb_slurm_connection, start, end)
#if no data found exit
if len(slurm_meta_data) == 0:
exit(1)
for slurm_data in slurm_meta_data:
# print("--")
# print('JID from Backup:',slurm_data['JID'])
# print('PROJECT:',slurm_data['PROJECT'])
# print('STATUS:',slurm_data['STATUS'])
# print('NUM_NODES:',slurm_data['NUM_NODES'])
# print('NODELIST:',slurm_data['NODELIST'])
# print('NUM_CORES:',slurm_data['NUM_CORES'])
# print('START:',slurm_data['START'])
# print('END:',slurm_data['END'])
# print('NAME:',slurm_data['NAME'])
# print('WALLTIME:',slurm_data['WALLTIME'])
# print('P_PARTITION:',slurm_data['P_PARTITION'])
# print('EXCLUSIVE:',slurm_data['EXCLUSIVE'])
# print('ARRAY_ID:',slurm_data['ARRAY_ID'])
update_prope(mariadb_prope_connection, slurm_data)
mariadb_slurm_connection.close()
mariadb_prope_connection.close()
print("Data correction finished sucessfully.")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--start', type=int, required=True)
parser.add_argument('--end', type=int, required=True)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
main(start=args.start,
end=args.end)
#!/usr/bin/env python
from __future__ import print_function
#!/usr/bin/env python
import argparse
import os
import subprocess
import sys
import hostlist
import pymysql as mariadb
import ast
from datetime import datetime
from datetime import timedelta
# Taurus partition: maximal/optimal mem_bw, flops and ipc values
partition_max_metric = {
"haswell":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4},
"west":{"MEM_BW":21000000000,"FLOPS":40000000000,"IPC":4},
"sandy":{"MEM_BW":38700000000,"FLOPS":40000000000,"IPC":4},
"broadwell":{"MEM_BW":65600000000,"FLOPS":40000000000,"IPC":4},
"gpu1":{"MEM_BW":38700000000,"FLOPS":40000000000,"IPC":4}, #sandy
"gpu2":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4}, #haswell
"smp1":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4}, #haswell
"smp2":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4}, #haswell
"knl":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4},
"hpdlf":{"MEM_BW":65600000000,"FLOPS":40000000000,"IPC":4}, #skylake
"ml":{"MEM_BW":65600000000,"FLOPS":40000000000,"IPC":4} #power9
}
# Maximal IO bandwidth in B/s
lustre_io_max_bw = {
"SCRATCH2":19171839421,
"HIGHIOPS":24286765005
}
# Maximal network traffic in B/s
network_max_bw = {
"INFINIBAND":7000000000
}
def connect_to_mariadb():
#connection data for MariaDB
HOST = os.environ["MARIADB_HOST"]
PORT = os.environ["MARIADB_PORT"]
USER = os.environ["MARIADB_USER"]
PASSWORD = os.environ["MARIADB_PASSWORD"]
DATABASE = os.environ["MARIADB_DATABASE"]
mariadb_connection = mariadb.connect(host=HOST, port=int(PORT),
user=USER, passwd=PASSWORD,
db=DATABASE, connect_timeout=10)
return mariadb_connection
def get_all_metadata(mariadb_connection, start, end):
cursor = mariadb_connection.cursor()
#get job data for footprint
sql_cmd = """SELECT Job_Data.UID,
JID,
START,
END,
P_PARTITION,
ipc_mean_per_core,
flops_any_mean_per_core,
mem_bw_mean_per_socket,
(lustre_scratch2_read_bytes + lustre_scratch2_write_bytes)/(END-START) AS IO_SCRATCH2_BW,
(lustre_highiops_read_bytes + lustre_highiops_write_bytes)/(END-START) AS IO_HIOPS_BW,
ib_bw_mean_per_node
FROM Job_Data
LEFT OUTER JOIN footprint_base ON Job_Data.UID = footprint_base.uid
LEFT OUTER JOIN footprint_fileio ON Job_Data.UID = footprint_fileio.uid
LEFT OUTER JOIN footprint_gpu ON Job_Data.UID = footprint_gpu.uid
WHERE (STATUS='completed' OR STATUS='timeout') AND
START>=""" + str(start) + " AND START<=" + str(end) #+ " limit 5"
try:
#print (sql_cmd)
cursor.execute(sql_cmd)
except mariadb.Error as error:
print("Error: {}".format(error))
results = cursor.fetchall()
all_meta_data = []
for job_data in results:
meta_data = {}
meta_data['UID'] = job_data[0]
meta_data['JID'] = job_data[1]
meta_data['START'] = job_data[2]
meta_data['END'] = job_data[3]
meta_data['PARTITION'] = job_data[4]
meta_data['IPC'] = job_data[5]
meta_data['FLOPS'] = job_data[6]
meta_data['MEM_BW'] = job_data[7]
meta_data['LUSTRE_SCRATCH2_IO_BW'] = job_data[8]
meta_data['LUSTRE_HIGHIOPS_IO_BW'] = job_data[9]
meta_data['IB_BW'] = job_data[10]
all_meta_data.append(meta_data)
cursor.close()
return all_meta_data
def save_tags(mariadb_connection, uid, tag_id):
cursor = mariadb_connection.cursor()
sql_cmd = "UPDATE Job_Data SET TAGS=" + str(tag_id) + " WHERE UID=" + str(uid)
try:
#print (sql_cmd)
cursor.execute(sql_cmd)
except mariadb.Error as error:
print("Error: {}".format(error))
mariadb_connection.commit()
cursor.close()
def tag_memory_bound(mem_bw_measured, mem_bw_max):
val = float(mem_bw_measured) / float(mem_bw_max) #treshold 0.8
if val > 0.8:
return 1 #0001