Commit bdb6de72 authored by root's avatar root

Added scripts for influxdb stress tests.

parent 7a5493de
#/bin/python
import os
import sys
try:
from influxdb.client import InfluxDBClient
except ImportError:
InfluxDBClient = None
influx = None
database = None
"""
Connect to the InfluxDB server
"""
def _connect():
try:
global influx
global database
host = os.environ["STRESS_HOST"]
port = os.environ["INFLUXDB_PORT"]
user = os.environ["INFLUXDB_USER"]
password = os.environ["INFLUXDB_PASSWORD"]
database = "stress"
influx = InfluxDBClient(host=host, port=port, username=user,
password=password, database=database)
#print("InfluxDB: established connection to %s:%s/%s." % (host, port, database) )
except Exception as ex:
# Log Error
#print("InfluxDB: failed to connect to %s:%s/%s. (%s:%s) - %s" % (host, port, database, username, password, ex) )
_close()
"""
Close the socket = do nothing for influx which is http stateless
"""
def _close():
global influx
influx = None
if __name__ == '__main__':
_connect()
measurements = influx.get_list_measurements()
counts = 0
if measurements:
for measurement in measurements:
query_str = "select count(*) from " + measurement['name']
try:
query_result = influx.query(query_str).get_points()
except:
#print_info("Influx query failed! ", query_str)
query_result = "-1"
for point in query_result:
for key, count in point.items():
#print(key, " ", count)
if key != 'time':
counts += int(count)
print(counts)
#!/usr/bin/env python
import time
import argparse
import os
import json
import logging
import collections
import string
import random
import logging
import random
# import urllib2
from influxdb import InfluxDBClient
from time import sleep
def publish_data(influx_con, iteration, json_body):
time_logging = os.environ["TIME_LOGGING"]
if time_logging == "true":
logging.info("publish_data %d: time %d", iteration, int(time.time()))
try:
sent = influx_con.write_points(json_body, time_precision='s')
except Exception as ex:
print (ex)
sent = False
return sent
def get_max_duration(current_max_duration, duration):
if duration > current_max_duration:
return duration
else:
return current_max_duration
def send_data_to_db(influx_con, hostname, send_number, sample_rate):
batch_size = int(os.environ["BATCH_SIZE"])
values_pm = int(os.environ["VALUES_PER_MEASUREMENT"])
json_body = []
max_duration = 0
node_iterator = 0
start_new_measurement = 60 / sample_rate
send_iterator = 0
batch_size_iterator = 0
begin_time = int(time.time())
while True:
timestamp = int(time.time())
#total send duration per sample
send_duration = 0
hostname_it = hostname + str(node_iterator)
#write metrics
#cpu
remaining_cpus = values_pm - 106
for i in range(0, 24 + remaining_cpus):
measurement = "cpu"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it,
"cpu": i},
"time": timestamp,
"fields": {
"used": random.uniform(5.3,3014.56)}}
)
batch_size_iterator += (24 + remaining_cpus)
#disk
measurement = "disk"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it},
"time": timestamp,
"fields": {
"read_bytes": random.uniform(5.3,3014.56),
"read_ops": random.uniform(5.3,3014.56),
"write_bytes": random.uniform(5.3,3014.56),
"write_ops": random.uniform(5.3,3014.56)}}
)
batch_size_iterator += 4
#infiniband
measurement = "infiniband"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it},
"time": timestamp,
"fields": {
"bw": random.uniform(5.3,3014.56)}}
)
batch_size_iterator += 1
#likwid_cpu
for i in range(0, 24):
measurement = "likwid_cpu"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it,
"cpu": i},
"time": timestamp,
"fields": {
"flops_any": random.uniform(5.3,3014.56),
"ipc": random.uniform(5.3,3014.56)}}
)
batch_size_iterator += 48
#likwid_socket
for i in range(0, 2):
measurement = "likwid_socket"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it,
"cpu": i},
"time": timestamp,
"fields": {
"mem_bw": random.uniform(5.3,3014.56),
"rapl_power": random.uniform(5.3,3014.56)}}
)
batch_size_iterator += 4
#lustre_highiops
measurement = "lustre_highiops"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it},
"time": timestamp,
"fields": {
"close": random.uniform(5.3,3014.56),
"fsync": random.uniform(5.3,3014.56),
"open": random.uniform(5.3,3014.56),
"read_bw": random.uniform(5.3,3014.56),
"read_requests": random.uniform(5.3,3014.56),
"seek": random.uniform(5.3,3014.56),
"write_bw": random.uniform(5.3,3014.56),
"write_requests": random.uniform(5.3,3014.56)
}
})
batch_size_iterator += 8
#lustre_scratch2
measurement = "lustre_scratch2"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it},
"time": timestamp,
"fields": {
"close": random.uniform(5.3,3014.56),
"fsync": random.uniform(5.3,3014.56),
"open": random.uniform(5.3,3014.56),
"read_bw": random.uniform(5.3,3014.56),
"read_requests": random.uniform(5.3,3014.56),
"seek": random.uniform(5.3,3014.56),
"write_bw": random.uniform(5.3,3014.56),
"write_requests": random.uniform(5.3,3014.56)
}
})
batch_size_iterator += 8
#memory
measurement = "memory"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it},
"time": timestamp,
"fields": {
"used": random.uniform(5.3,3014.56)}}
)
batch_size_iterator += 1
#nvml
for i in range(0, 2):
measurement = "nvml"
json_body.append({
"measurement": measurement,
"tags": {
"hostname": hostname_it,
"gpu": i},
"time": timestamp,
"fields": {
"gpu_used": random.uniform(5.3,3014.56),
"mem_used": random.uniform(5.3,3014.56),
"power": random.uniform(5.3,3014.56),
"temp": random.uniform(5.3,3014.56),}}
)
batch_size_iterator += 8
if batch_size_iterator == batch_size:
batch_size_iterator = 0
write_start = int(time.time())
sent = publish_data(influx_con, send_iterator, json_body)
write_end = int(time.time())
send_duration += write_end - write_start
#determine max duration of publish_data
max_duration = get_max_duration(max_duration, write_end - write_start)
if sent:
json_body = [] # new data block
elif len(json_body) < (5 * batch_size):
print ("Could not sent batch, buffering for next write!")
else:
print ("Max sent request (buffer) exceeded! Drop metrics.")
json_body = []
node_iterator += 1
if node_iterator == start_new_measurement:
node_iterator = 0
send_iterator += 1
if send_iterator == send_number:
return max_duration
#determine sleep time until next sample
if send_duration <= sample_rate:
sleep(sample_rate - send_duration)
else:
print (str((int(time.time()) - begin_time)/60) + "min': Write duration (" + str(send_duration) +") exceeded!")
return max_duration
def main(database,output,nodeid):
# connection data for InfluxDB
host = os.environ["STRESS_HOST"]
port = os.environ["INFLUXDB_PORT"]
user = os.environ["INFLUXDB_USER"]
password = os.environ["INFLUXDB_PASSWORD"]
time_logging = os.environ["TIME_LOGGING"]
sample_rate = int(os.environ["SAMPLE_RATE"])
send_number = int(os.environ["SEND_NUMBER"])
hostname = nodeid
# connect to database
connect_start = int(time.time())
influx_con = InfluxDBClient(host, port, user, password, database)
connect_end = int(time.time())
#wait until current time has delay value
current_time = int(time.time())
beforeLoop = current_time
start_delay = current_time
if time_logging == "true":
logging_file = output + "/" + hostname + ".out"
logging.basicConfig(filename=logging_file, level=logging.INFO)
# send data
max_duration = send_data_to_db(influx_con, hostname, send_number, sample_rate)
if time_logging == "true":
# logging.info("pytime: %d, inTime: %d, delay: %d", beforeLoop, start_delay, start_delay-beforeLoop)
# logging.info("Connect: %d", connect_end - connect_start)
logging.info("Duration: %d", max_duration)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--db_name', type=str, required=True)
parser.add_argument('--output', type=str, required=True)
parser.add_argument('--nodeid', type=str, required=True)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
main(database=args.db_name,
output=args.output,
nodeid=args.nodeid)
#!/bin/bash
STRESS_HOST="172.24.136.90"
JOB_SCRIPT=${PWD}/job_stress_db.sh
NODE_NUM=10 #11 #14 #42
#determine measurement parameters
SAMPLE_RATE=2
BATCH_SIZE=172 #106
VALUES_PER_MEASUREMENT=172 #106
SEND_NUMBER=$((60/SAMPLE_RATE * 120)) #120 simulation minutes
#fix settings
NTASKS=24
SIM_NODES=$((NODE_NUM*NTASKS))
TIME_LIMIT=$(((SEND_NUMBER/30) + 60))
MEM_PER_CPU=2583
OUTPUT_PATH=${PWD}/result_${NODE_NUM}_`date +%y%m%d-%H%M`
mkdir -p $OUTPUT_PATH
create_job_script()
{
echo "#!/bin/bash" > $JOB_SCRIPT
echo "" >> $JOB_SCRIPT
echo "#SBATCH -A zihforschung" >> $JOB_SCRIPT
echo "#SBATCH --job-name=STRESS_DB" >> $JOB_SCRIPT
echo "#SBATCH --nodes=$NODE_NUM" >> $JOB_SCRIPT
echo "#SBATCH --ntasks-per-node=$NTASKS" >> $JOB_SCRIPT
echo "#SBATCH --partition=haswell64" >> $JOB_SCRIPT
# echo "#SBATCH --reservation=zihforschung_407" >> $JOB_SCRIPT
# echo "#SBATCH -w taurusi5054" >> $JOB_SCRIPT
echo "#SBATCH --exclusive" >> $JOB_SCRIPT
echo "#SBATCH --mem-per-cpu=$MEM_PER_CPU" >> $JOB_SCRIPT
echo "#SBATCH --time=${TIME_LIMIT}" >> $JOB_SCRIPT
echo "#SBATCH --output=$OUTPUT_PATH/stress_test_job.out" >> $JOB_SCRIPT
echo "#SBATCH --comment=no_monitoring" >> $JOB_SCRIPT
echo "" >> $JOB_SCRIPT
echo "export OUTPUT_PATH=${OUTPUT_PATH}" >> $JOB_SCRIPT
echo "export STRESS_HOST=${STRESS_HOST}" >> $JOB_SCRIPT
echo "export SAMPLE_RATE=${SAMPLE_RATE}" >> $JOB_SCRIPT
echo "export BATCH_SIZE=${BATCH_SIZE}" >> $JOB_SCRIPT
echo "export VALUES_PER_MEASUREMENT=${VALUES_PER_MEASUREMENT}" >> $JOB_SCRIPT
echo "export SEND_NUMBER=${SEND_NUMBER}" >> $JOB_SCRIPT
echo "srun -n $SIM_NODES sh stress_start.sh" >> $JOB_SCRIPT
}
run_stress_test()
{
COUNTS_1=$(get_measurement_counts)
JOB_SUBMIT=`sbatch $JOB_SCRIPT`
JOB_ID=`echo $JOB_SUBMIT | cut -d " " -f 4`
echo "$JOB_ID submitted"
#wait until current job has finished
#job is running as long as squeue shows current jobid
JOB_FINISHED=1
while [ "$JOB_FINISHED" -gt "0" ]
do
JOB_IN_PROCESS=`squeue -u $USER -a | grep $JOB_ID`
JOB_FINISHED=`echo ${#JOB_IN_PROCESS}`
sleep 60
done
echo "JOB $JOB_ID has finished!"
cat $OUTPUT_PATH/stress_test_job.out
# get max send duration
# cat $OUTPUT_PATH/taurusi* | grep Duration | cut -d ":" -f 4 | sort -nr | head -n1
COUNTS_2=$(get_measurement_counts)
echo "Before: $COUNTS_1"
echo "After: $COUNTS_2"
DIFF=$((COUNTS_2-COUNTS_1))
echo "Diff: $DIFF"
}
clean_influx()
{
source /sw/taurus/tools/pika/.pika_access
INFLUX_CONF="$STRESS_HOST:8086/query -u ${INFLUXDB_USER}:${INFLUXDB_PASSWORD} --data-urlencode"
curl -POST $INFLUX_CONF "q=DROP DATABASE stress"
sleep 3
curl -POST $INFLUX_CONF "q=CREATE DATABASE stress"
curl -POST $INFLUX_CONF "q=CREATE RETENTION POLICY frob ON stress DURATION 28d REPLICATION 1 SHARD DURATION 7d DEFAULT"
}
get_measurement_counts()
{
echo `python3 get_points.py`
}
clean_influx
create_job_script
run_stress_test
#!/bin/bash
source /sw/taurus/tools/pika/pika-current.conf
PIKA_HOSTNAME=$(hostname | cut -d. -f1)
#echo "$PIKA_HOSTNAME ${SLURM_PROCID}"
#switch time logging on/off
export TIME_LOGGING=true
python3 $PWD/stress_influx_db.py --db_name="stress" --output=$OUTPUT_PATH --nodeid=${PIKA_HOSTNAME}${SLURM_PROCID}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment