Commit d84bf602 authored by Sebastian Oeste's avatar Sebastian Oeste 🐧
Browse files

Add pika collectd python plugins to package

parent b66868bf
......@@ -11,6 +11,9 @@ URL:
Source1: collectd.conf
BuildRequires: perl(ExtUtils::MakeMaker)
BuildRequires: perl(ExtUtils::Embed)
......@@ -625,6 +628,10 @@ make install DESTDIR="%{buildroot}"
install -Dp -m0644 %{SOURCE1} %{buildroot}%{_sysconfdir}/collectd.conf
install -Dp -m0644 contrib/systemd.collectd.service %{buildroot}%{_unitdir}/collectd.service
install -d %{buildroot}%{_datadir}/collectd/plugins/
install -Dp -m0644 %{SOURCE2} %{buildroot}%{_datadir}/collectd/plugins/
install -Dp -m0644 %{SOURCE3} %{buildroot}%{_datadir}/collectd/plugins/
install -Dp -m0644 %{SOURCE4} %{buildroot}%{_datadir}/collectd/plugins/
install -d %{buildroot}%{_sharedstatedir}/collectd/
install -d %{buildroot}%{_sysconfdir}/collectd.d/
......@@ -677,6 +684,10 @@ make check
# coding=utf-8
Collect data from InfiniBand devices.
!!! This plugin resets the InfiniBand counters, when initialized!!!
by Robert Dietrich ( for the ProPE project
#### Dependencies
* [subprocess](
import time
import os
import sys
import subprocess
import re
import collectd
except ImportError:
import dummy_collectd as collectd"Using dummy collectd for testing")
# get available file systems
from subprocess import Popen, PIPE, STDOUT
### global variables ###
directory = "/sys/class/infiniband"
devices = None
enabled = False
num_reads = 0
recheck_limit = 0 # number of intervals/collects after re-checking the available IB devices (default is off: 0)
ibPortList = []
# values of previous counter read
recv_prev = sys.maxsize
send_prev = sys.maxsize
time_prev = 0
perfquery_filepath = "/usr/sbin/perfquery"
### END: global variables ###
### utility functions
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
def which(program):
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
# reset counters, if perfquery is available
def _reset_counters():
if perfquery_filepath:
collectd.debug("ib_bw plugin: Reset counters!")
proc = subprocess.Popen([perfquery_filepath], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
except subprocess.CalledProcessError as e:"ib_bw plugin: %s error launching: %s; skipping" % (perfquery_filepath, e))
return -1
if proc.returncode:
collectd.error("ib_bw plugin: %s return exit value %s; skipping" % (perfquery_filepath, proc.returncode))
return -1
if err:
collectd.error("ib_bw plugin: %s return error output: %s" % (perfquery_filepath, err))
return -1
else:"ib_bw plugin: Cannot reset counters!" )
brief Determine the files and paths where the IB counters are read from.
Find infiniband devices, if they have not been specified in the collectd.conf.
Directory where infiniband devices are located, default: /sys/class/infiniband
def _setupSourcefiles():
global enabled
enabled = False
# if no devices are explicitly specified, detect them
if devices == None:
if not os.path.isdir( directory ):
collectd.error("ib_bw plugin: Infiniband directory %s does not exist!" % (directory,))
# find all infiniband devices
cmd = "find " + directory + "/* -maxdepth 0"
p = Popen( cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE )
detectedDevices, stderr = p.communicate()
except subprocess.CalledProcessError as e:"ib_bw plugin: %s; Error launching '%s'! Disable plugin." % (repr(e), cmd))
detectedDevices = detectedDevices.decode('utf-8')
ibDevices = filter( None, detectedDevices.split('\n') )
# devices from collectd.conf are a comma-separated list
ibDevices = devices.split(',')
# find ports for all devices and add them to the list
global ibPortList
ibPortList = []
for ibDevice in ibDevices:
if not os.path.isdir( ibDevice + "/ports" ):"ib_bw plugin: No ports for device %s found" % (ibDevice,))
collectd.debug("ib_bw plugin: Found device with ports: " + ibDevice)
cmd = "find " + ibDevice + "/ports/* -maxdepth 0 -type d 2>/dev/null"
p = Popen( cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE )
ibDevicePorts, stderr = p.communicate()
except subprocess.CalledProcessError as e:"ib_bw plugin: %s error launching: %s; Disable plugin." % (repr(e), cmd))
ibDevicePorts = ibDevicePorts.decode('utf-8')
for ibDevicePort in filter( None, ibDevicePorts.split('\n') ):
if not os.path.isdir( ibDevicePort + "/counters" ):"ib_bw plugin: No counters for device port %s found." % (ibDevicePort,))
ibPortList.append( ibDevicePort )
collectd.debug("ib_bw plugin: Found port with counters: " + ibDevicePort)
if len(ibPortList) == 0:"ib_bw plugin: No devices/ports found!" )
enabled = True
def _read_counter(file):
f = open( file, "r" )
finput =
except IOError as ioe:
collectd.error("ib_bw plugin: Cannot read %s (%s)" % (file, repr(ioe)) )
return float(finput)
return float(-1)
def ib_plugin_config(config):
if config.values[0] == 'ib_bw':"ib_bw plugin: Get configuration")
for value in config.children:
if value.key == 'directory':
global directory
directory = value.values[0]"ib_bw plugin: Use directory %s from config file" % (directory,))
elif value.key == 'devices': # InfiniBand devices (comma separated)
global devices
devices = value.values[0]"ib_bw plugin: Use ib_devices %s from config file" % (devices,))
elif value.key == 'recheck_limit':
global recheck_limit
recheck_limit = int(value.values[0])
if recheck_limit > 0:"ib_bw plugin: Check for available IB devices every %d collects" % (recheck_limit,))
def ib_plugin_initialize():
collectd.debug("ib_bw plugin: Initialize ...")
# check for perfquery
global perfquery_filepath
if not perfquery_filepath or not is_exe(perfquery_filepath):
perfquery_filepath = which("perfquery")
if perfquery_filepath:
collectd.debug("ib_bw plugin: %s is available to reset counters" % (perfquery_filepath,))
# add -R option to reset counters
perfquery_filepath += " -R"
# initial reset of IB counters
# determine the paths to the IB counter files
brief Read send and receive counters from Infiniband devices
def ib_plugin_read(data=None):
# check for available IB files every #recheck_limit reads
global num_reads
num_reads += 1
if num_reads == recheck_limit:
num_reads = 0
if not enabled:
# set receive and send values to zero
recv = 0
send = 0
overflow = False
value_error = False
# one time stamp for all IB metrics
timestamp = time.time()
# iterate over all ports (of all devices)
for ibPort in ibPortList:
# Total number of data octets, divided by 4 (lanes), transmitted/receied on
# all VLs. This seems to be a 32 bit unsigned counter.
# get port receive data
counter_value = _read_counter(ibPort + "/counters/port_rcv_data")
if counter_value < 0:
value_error = True
# check if counter value stops at 32 bit and reset it
if counter_value == 4294967295:
overflow = True
recv += counter_value * 4
# get port send data
counter_value = _read_counter(ibPort + "/counters/port_xmit_data")
if counter_value < 0:
value_error = True
# check if counter value stops at 32 bit and reset it
if counter_value == 4294967295:
overflow = True
send += counter_value * 4
global send_prev, recv_prev, time_prev
if overflow:
# set new previous values
recv_prev = 0
send_prev = 0
if recv >= recv_prev and send >= send_prev:
ib_bw = ( recv - recv_prev + send - send_prev ) / ( timestamp - time_prev )
# TODO: change to derive type?
vl = collectd.Values(type='gauge')
vl.values = [ib_bw]
vl.time = timestamp
vl.type_instance = 'bw'
# set new previous values
recv_prev = recv
send_prev = send
time_prev = timestamp
# check the IB files again, if an error occurred
if value_error:
# paste on command line
#echo "PUTNOTIF severity=okay time=$(date +%s) message=hello" | socat - UNIX-CLIENT:/home/rdietric/sw/collectd/5.8.0/var/run/collectd-unixsock
def ib_plugin_notify(notification, data=None):"infiniband plugin: Notification: %s" % (str(notification),))
if notification.plugin is None or notification.plugin == "" or notification.plugin == "ib_bw":
global enabled
if notification.message == "check":"ib_bw plugin: Check IB files ...")
global num_reads
num_reads = 0
elif notification.message == "disable":"ib_bw plugin: Disable reading")
enabled = False
elif notification.message == "enable":"ib_bw plugin: Enable reading")
enabled = True
elif notification.message == "unregister":"ib_bw plugin: Unregister read callback ...")
collectd.error("ib_bw plugin: Could not unregister read callback!")
elif notification.message == "register":"ib_bw plugin: Register read callback ...")
collectd.error("ib_bw plugin: Could not register read callback!")
if __name__ != "__main__":
# when running inside plugin register each callback
# always register read plugin, which triggers the file checks once a while
# outside plugin just collect the info
if len(sys.argv) < 2:
while True:
# coding=utf-8
Send metrics to InfluxDB ( using the
InfluxDBClient interface.
Collectd Values are sent/mapped to InfluxDB as follows:
measurement <- plugin
field/metric name <- type instance, if available, otherwise type
tag name/type (metric specific) <- either the plugin name or 'cpu', if the
plugin ends with 'cpu' or '_socket'
tag value (metric specific) <- plugin instance
Additionally, the host name is written as tag for 'hostname'.
A collectd value is identified by plugin, plugin instance, type and type instance.
import collectd
import os
import math
import subprocess
import re
from influxdb.client import InfluxDBClient
except ImportError:
InfluxDBClient = None
influx = None
ssl = False
hostname = 'localhost'
port = 8086
username = None
password = None
database = None # name of the database
conf_batch_size = 200 # number of metrics to be sent in one batch
conf_cache_size = 2000 # maximum number of metrics to store locally (e.g. if sends fail)
batch_count = 0
batch = {} # all unsent value lists are stored here
batch_size = conf_batch_size
store_rates = False
batch_derive = {} # storage for previous value lists of derived/counter types
#### Mapping of HW threads to cores ####
per_core_plugins = None
per_core_avg_plugins = None
# default: SMT is disabled: no. of HW threads == no. of physical cores
threads_per_core = 1
# hardware thread ID is provided by the OS contiguous, starting from zero
coreMapping = None
# timestamp of the current group of values (see write() and _collect()) with seconds precision
currentTimestamp = 0
#num_aggregated = 0
time_precision = 's'
Connect to the InfluxDB server
def _connect():
# Open Connection
global influx
influx = InfluxDBClient(host=hostname, port=port, username=username,
password=password, database=database, ssl=ssl)"InfluxDB write: established connection to %s:%d/%s." % (hostname, port, database) )
except Exception as ex:
# Log Error"InfluxDB write: failed to connect to %s:%s/%s. (%s:%s) - %s" % (hostname, port, database, username, password, ex) )
Close the socket = do nothing for influx which is http stateless
def _close():
global influx
influx = None
Mapping of HW threads to CPU cores (via parsing the output of likwid-topology)
def _setHWThreadMapping():
cmd = 'likwid-topology -O' # comma separated topology output
status, result = subprocess.getstatusoutput(cmd)
except Exception as ex:"InfluxDB write: error launching '%s': %s" % (cmd, repr(ex)))
return False
# a zero status means without errors, 13 means permission denied (maybe only for some mounts)
if status != 0 and status != 13:"InfluxDB write: get HW thread mapping failed (status: %s): %s" % (status, result))
return False
startIdx = 0
num_threads = 0
lines = result.split('\n')
# determine start and end of thread mapping lines
for line in lines:
if line.startswith("Threads per core:"):
global threads_per_core
threads_per_core = int('\d+', line).group())
if threads_per_core == 1:
return False
startIdx += 1
if line.startswith('TABLE,Topology,'):
num_threads = int('\d+', line).group())
startIdx += 1 # skip table header
# initialize and fill mapping array
global coreMapping
coreMapping = [None]*num_threads
for line in lines[startIdx:startIdx+num_threads]:
v = line.split(',')
coreMapping[int(v[0])] = v[2]
#coreMapping[v[0]] = v[2]"InfluxDB write: HW thread {:3d} -> Core {:3d}".format(int(v[0]), int(v[2])))
except:"InfluxDB write: HWThread-to-core mapping out of bound error")
return False
return True
brief: Store values per plugin instance.
Value lists are stored per plugin and plugin instance. The plugin instance is
used as a tag. For per-core plugins (see configuration), the plugin instance is
assumed to be the processor ID (given by the OS).
Return True, if a value has been added to the batch, otherwise False.
def _collect(valueList):
global batch
if valueList.plugin:
plugin_name = valueList.plugin
collectd.error('InfluxDB writer: plugin member is required!')
return False
tag = valueList.plugin_instance
# first check for the tag, which is None for many plugins
is_per_core = tag and per_core_plugins and valueList.plugin in per_core_plugins
# map to core
if is_per_core:"value: " + str(valueList))
tag = coreMapping[int(tag)]
valueList.plugin_instance = tag
# create array for plugin and tag, if it is not available yet
if plugin_name in batch:
if tag in batch[plugin_name]:
# aggregate (sum up) per core, if configured
if is_per_core:
# iterate reversed as matches are most probable at the end of the list
# lists should be very short
valueListTimeInt = int(valueList.time)
for vlStored in reversed(batch[plugin_name][tag]):
if vlStored.type == valueList.type and vlStored.type_instance == valueList.type_instance and int(vlStored.time) == valueListTimeInt:
for idx in range(len(vlStored.values)):
vlStored.values[idx] += valueList.values[idx]
#global num_aggregated
#num_aggregated += 1
return False
# append value
# create array of values for new tag
batch[plugin_name][tag] = [valueList]
# add the plugin and the tag with a new value
batch[plugin_name] = {tag:[valueList]}
return True
Send data to InfluxDB. Data that cannot be sent will be kept in cache.
def _send():
global batch
global batch_count
global batch_derive
global batch_size
#global num_aggregated
if not influx:'InfluxDB write: connection not available. Try reconnect ...')
metrics = _prepare_metrics()
# reset batch which only contains initial values of derived metrics
if len(metrics) == 0:
batch = {}
batch_count = 0
if len(batch_derive) == 0:'InfluxDB write: no metrics to send. '
'No previous values are stored. Should not happen!')
# Send data to InfluxDB (len(metrics) <= batch_count as NaN and inf are not moved from batch to metrics)'InfluxDB write: %d lines (%d series)' % (len(metrics), batch_count))'InfluxDB write: %d lines (%d series incl. %d rates), %d aggregated' % (len(metrics), batch_count, len(batch_derive), num_aggregated) )
ret = False
if influx:
ret = influx.write_points(metrics, time_precision=time_precision)
except Exception as ex: # batch could not be sent
collectd.error("InfluxDB write: error sending metrics(%s)" % (ex,))
# derived metrics batch is created again from metrics in current batch
batch_derive = {}
# increase batch size (but not above the configured cache size)
batch_size += conf_batch_size
if batch_size > conf_cache_size:
batch_size = conf_cache_size
# empty batch buffer for successful writes
if ret:"reset batch")
batch = {}
batch_count = 0
batch_size = conf_batch_size
#num_aggregated = 0
def _prepare_metrics():
global batch
# build metrics data
metrics = []
for measurement in batch:
for tag in batch[measurement]:
last_time = -1
fields = {}