tags.py 8.74 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
#!/usr/bin/env python

from __future__ import print_function

#!/usr/bin/env python

import argparse
import os
import subprocess
import sys
import hostlist
import pymysql as mariadb
import ast

from datetime import datetime
from datetime import timedelta

# Taurus partition: maximal/optimal mem_bw, flops and ipc values
partition_max_metric = {
   "haswell":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4},
   "west":{"MEM_BW":21000000000,"FLOPS":40000000000,"IPC":4},
   "sandy":{"MEM_BW":38700000000,"FLOPS":40000000000,"IPC":4},
   "broadwell":{"MEM_BW":65600000000,"FLOPS":40000000000,"IPC":4},
   "gpu1":{"MEM_BW":38700000000,"FLOPS":40000000000,"IPC":4}, #sandy
   "gpu2":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4}, #haswell
   "smp1":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4}, #haswell
   "smp2":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4}, #haswell
   "knl":{"MEM_BW":58800000000,"FLOPS":40000000000,"IPC":4},
   "hpdlf":{"MEM_BW":65600000000,"FLOPS":40000000000,"IPC":4}, #skylake
   "ml":{"MEM_BW":65600000000,"FLOPS":40000000000,"IPC":4} #power9
}

# Maximal IO bandwidth in B/s
lustre_io_max_bw = {
  "SCRATCH2":19171839421,
  "HIGHIOPS":24286765005
}

# Maximal network traffic in B/s
network_max_bw = {
  "INFINIBAND":7000000000
}


def connect_to_mariadb():
  #connection data for MariaDB 
  HOST = os.environ["MARIADB_HOST"]
  PORT = os.environ["MARIADB_PORT"]
  USER = os.environ["MARIADB_USER"]
  PASSWORD = os.environ["MARIADB_PASSWORD"]
  DATABASE = os.environ["MARIADB_DATABASE"]

  mariadb_connection = mariadb.connect(host=HOST, port=int(PORT),
                                       user=USER, passwd=PASSWORD,
                                       db=DATABASE, connect_timeout=10)
  return mariadb_connection


def get_all_metadata(mariadb_connection, start, end):
  cursor = mariadb_connection.cursor()

  #get job data for footprint
  sql_cmd = """SELECT Job_Data.UID,
                JID,
                START,
                END,
                P_PARTITION,
                ipc_mean_per_core,
                flops_any_mean_per_core,
                mem_bw_mean_per_socket,
                (lustre_scratch2_read_bytes + lustre_scratch2_write_bytes)/(END-START) AS IO_SCRATCH2_BW,
                (lustre_highiops_read_bytes + lustre_highiops_write_bytes)/(END-START) AS IO_HIOPS_BW,
fwinkler's avatar
fwinkler committed
73 74 75
                ib_bw_mean_per_node,
                cpu_used_mean_per_core,
                used_mean_per_gpu
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
                FROM Job_Data
                LEFT OUTER JOIN footprint_base ON Job_Data.UID = footprint_base.uid
                LEFT OUTER JOIN footprint_fileio ON Job_Data.UID = footprint_fileio.uid
                LEFT OUTER JOIN footprint_gpu ON Job_Data.UID = footprint_gpu.uid
                WHERE (STATUS='completed' OR STATUS='timeout') AND
                START>=""" + str(start) + " AND START<=" + str(end) #+ " limit 5"
  try:
    #print (sql_cmd)
    cursor.execute(sql_cmd)
  except mariadb.Error as error:
    print("Error: {}".format(error))

  results = cursor.fetchall()
  all_meta_data = []
  for job_data in results:
    meta_data = {}
    meta_data['UID'] = job_data[0]
    meta_data['JID'] = job_data[1]
    meta_data['START'] = job_data[2]
    meta_data['END'] = job_data[3]
    meta_data['PARTITION'] = job_data[4]
    meta_data['IPC'] = job_data[5]
    meta_data['FLOPS'] = job_data[6]
    meta_data['MEM_BW'] = job_data[7]
    meta_data['LUSTRE_SCRATCH2_IO_BW'] = job_data[8]
    meta_data['LUSTRE_HIGHIOPS_IO_BW'] = job_data[9]
    meta_data['IB_BW'] = job_data[10]
fwinkler's avatar
fwinkler committed
103 104
    meta_data['CPU_USAGE'] = job_data[11]
    meta_data['GPU_USAGE'] = job_data[12]
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
    
    all_meta_data.append(meta_data)

  cursor.close()

  return all_meta_data


def save_tags(mariadb_connection, uid, tag_id):
  cursor = mariadb_connection.cursor()

  sql_cmd = "UPDATE Job_Data SET TAGS=" + str(tag_id) + " WHERE UID=" + str(uid)
  try:
    #print (sql_cmd)
    cursor.execute(sql_cmd)
  except mariadb.Error as error:
    print("Error: {}".format(error))

  mariadb_connection.commit()
  cursor.close()

def tag_memory_bound(mem_bw_measured, mem_bw_max):
  val = float(mem_bw_measured) / float(mem_bw_max) #treshold 0.8
  if val > 0.8:
    return 1 #0001
  else:
    return 0

def tag_compute_bound(flops_measured, flops_max, ipc_measured, ipc_opt):
  flops_val = float(flops_measured) / float(flops_max) #treshold 0.7
  if ipc_measured > 4:
    ipc_measured = 0.0
  ipc_val = float( ipc_measured / ipc_opt ) #treshold 0.6
  if flops_val > 0.7 or ipc_val > 0.6:
    return 2 #0010
  else:
    return 0

def tag_io_heavy(lustre_io_bw_scratch2_measured, lustre_io_bw_scratch2_max,
                 lustre_io_bw_highiops_measured, lustre_io_bw_highiops_max ):
  scratch2_val = 0.0
  highiops_val = 0.0
  if lustre_io_bw_scratch2_measured != None:
    scratch2_val = float(lustre_io_bw_scratch2_measured) / float(lustre_io_bw_scratch2_max) #treshold 0.8
  if lustre_io_bw_highiops_measured != None:
    highiops_val = float(lustre_io_bw_highiops_measured) / float(lustre_io_bw_highiops_max) #treshold 0.8
  # only one value has to be greater than 0.8
  if ( scratch2_val > 0.8 or highiops_val > 0.8 ):
    return 4 #0100
  else:
    return 0

def tag_network_heavy(ib_bw_measured, ib_bw_max):
  val = float(ib_bw_measured) / float(ib_bw_max) #treshold 0.8
  if val > 0.8:
    return 8 #1000
  else:
    return 0

fwinkler's avatar
fwinkler committed
164 165 166 167 168
def tag_gpu_bound(cpu_usage, gpu_usage):
  if gpu_usage > 0.7 or gpu_usage > cpu_usage:
    return 16 #10000
  else:
    return 0
169 170 171 172 173 174 175 176

def main(start, end):
  mariadb_connection = connect_to_mariadb()

  # get job data
  all_meta_data = get_all_metadata(mariadb_connection, start, end)

  for meta_data in all_meta_data:
177
    # print ("--")
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
    # print (meta_data['UID'])
    # print (meta_data['JID'])
    # print (meta_data['START'])
    # print (meta_data['END'])
    # print (meta_data['PARTITION'])
    # print (meta_data['IPC'])
    # print (meta_data['FLOPS'])
    # print (meta_data['MEM_BW'])
    # print (meta_data['LUSTRE_SCRATCH2_IO_BW'])
    # print (meta_data['LUSTRE_HIGHIOPS_IO_BW'])

    # tag_id for normal jobs is zero
    tag_id = 0

    # characterize job for specific partition
    # print ("Job ID: ", meta_data['JID'])
    for partition in partition_max_metric:
      if partition in meta_data['PARTITION']:
196
        # print (partition)
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
        # check for memory bound
        # print ("  Memory bound:")
        # print ("    Measured: ", meta_data['MEM_BW'])
        # print ("    Max: ", partition_max_metric[partition]["MEM_BW"])
        if meta_data['MEM_BW'] != None:
          tag_id += tag_memory_bound(meta_data['MEM_BW'], partition_max_metric[partition]["MEM_BW"])

        # check for compute bound
        # print ("  Compute bound:")
        # print ("    Measured Flops: ", meta_data['FLOPS'])
        # print ("    Measured IPC: ", meta_data['IPC'])
        # print ("    Max FLOPS: ", partition_max_metric[partition]["FLOPS"])
        # print ("    Max IPC: ", partition_max_metric[partition]["IPC"])
        if meta_data['FLOPS'] != None and meta_data['IPC'] != None:
          tag_id += tag_compute_bound(meta_data['FLOPS'], partition_max_metric[partition]["FLOPS"],
                                      meta_data['IPC'], partition_max_metric[partition]["IPC"])

        # check for io heavy
        # print ("  IO Heavy:")
        # print ("    Measured SCRATCH2: ", meta_data['LUSTRE_SCRATCH2_IO_BW'])
        # print ("    Measured HIGHIOPS: ", meta_data['LUSTRE_HIGHIOPS_IO_BW'])
        # print ("    Max SCRATCH2: ", lustre_io_max_bw["SCRATCH2"])
        # print ("    Max HIGHIOPS: ", lustre_io_max_bw["HIGHIOPS"])
        tag_id += tag_io_heavy(meta_data['LUSTRE_SCRATCH2_IO_BW'], lustre_io_max_bw["SCRATCH2"],
                               meta_data['LUSTRE_HIGHIOPS_IO_BW'], lustre_io_max_bw["HIGHIOPS"])

        # check for network heavy
        # print ("  Network Heavy:")
        # print ("    Measured: ", meta_data['IB_BW'])
        # print ("    Max: ", network_max_bw["INFINIBAND"])
        if meta_data['IB_BW'] != None:
          tag_id += tag_network_heavy(meta_data['IB_BW'], network_max_bw["INFINIBAND"])

fwinkler's avatar
fwinkler committed
230 231 232 233 234 235 236
        # check for GPU bound
        # print ("  GPU Bound:")
        # print ("    CPU Usage: ", meta_data['CPU_USAGE'])
        # print ("    GPU Usage: ", meta_data['GPU_USAGE'])
        if meta_data['CPU_USAGE'] != None and meta_data['GPU_USAGE'] != None:
          tag_id += tag_gpu_bound(meta_data['CPU_USAGE'], meta_data['GPU_USAGE'])

237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
        break

    # print ("Tag ID = ", tag_id)

    # save tag id in database
    save_tags(mariadb_connection, meta_data['UID'], tag_id)


  mariadb_connection.close()


def parse_args():
  parser = argparse.ArgumentParser()
  parser.add_argument('--start', type=int, required=True)
  parser.add_argument('--end', type=int, required=True)
  return parser.parse_args()


if __name__ == '__main__':
  args = parse_args()
  main(start=args.start,
       end=args.end)