Commit 5187573c authored by elagil's avatar elagil
Browse files

Remade vital sensing

parent aef62cbe
from .correlator import Correlator
from .sensor import Sensor, SensorSettings
from typing import Type
import numpy as np
from collections import deque
class SensorSettings():
def __init__(self, frequencyRange, measurementRate):
self.frequencyRange = frequencyRange
self.measurementRate = measurementRate
@property
def frequencyRange(self):
return self._frequencyRange
@frequencyRange.setter
def frequencyRange(self, new_frequencyRange):
if isinstance(new_frequencyRange, tuple) and len(new_frequencyRange) == 2:
if new_frequencyRange[0] < new_frequencyRange[1]:
self._frequencyRange = new_frequencyRange
else:
raise ValueError(
"Tuple is expected to have rising values in its range.")
else:
raise TypeError(
"Expected a tuple of size 2 for the frequency range.")
class Sensor():
def __init__(self, sensorSettings):
self.sensorSettings = sensorSettings
self.buffer = deque(maxlen=sensorSettings.bufferLength)
def correlate(self, data):
self.buffer.append(data)
import numpy as np
import scipy.signal as ss
from collections import deque
class SensorSettings():
def __init__(self, frequencyRange, measurementRate, distanceRangeBins=None, points=100):
self.frequencyRange = frequencyRange
self.distanceRangeBins = distanceRangeBins
self.measurementRate = measurementRate
self.points = points
self.fftPoints = 1024
self.bufferDuration = 0
self.calculateSettings()
def frequenciesToIndices(self, f):
return (self.fftPoints * np.array(f) / self.measurementRate).astype("int")
def indicesToFrequencies(self, i):
return (np.array(i) * self.measurementRate / self.fftPoints).astype("float")
def calculateSettings(self):
self.bufferDuration = self.points / self.measurementRate
self.actualfrequencyRangeBins = self.frequenciesToIndices(
self.frequencyRange)
self.actualFrequencyRange = self.indicesToFrequencies(
self.actualfrequencyRangeBins)
self.frequencyScale = np.linspace(
0, self.measurementRate/2, num=int(self.fftPoints/2+1), endpoint=False)
@ property
def points(self):
return self._points
@ points.setter
def points(self, new_points):
if isinstance(new_points, int):
self._points = new_points
else:
raise TypeError(
"An integer is expected for the amount of sampling points")
@ property
def frequencyRange(self):
return self._frequencyRange
@ frequencyRange.setter
def frequencyRange(self, new_frequencyRange):
if isinstance(new_frequencyRange, tuple) and len(new_frequencyRange) == 2:
if new_frequencyRange[0] < new_frequencyRange[1]:
self._frequencyRange = new_frequencyRange
else:
raise ValueError(
"Tuple is expected to have rising values in its range.")
else:
raise TypeError(
"Expected a tuple of size 2 for the frequency range.")
@ property
def distanceRangeBins(self):
return self._distanceRangeBins
@ distanceRangeBins.setter
def distanceRangeBins(self, new_distanceRangeBins):
if new_distanceRangeBins is None:
self._distanceRangeBins = None
else:
if isinstance(new_distanceRangeBins, tuple) and len(new_distanceRangeBins) == 2:
if new_distanceRangeBins[0] < new_distanceRangeBins[1]:
self._distanceRangeBins = new_distanceRangeBins
else:
raise ValueError(
"Tuple is expected to have rising values in its range.")
else:
raise TypeError(
"Expected a tuple of size 2 for the distance range.")
class Sensor():
def __init__(self, sensorSettings=None):
if sensorSettings is not None:
self.setup(sensorSettings)
else:
self.sensorSettings = None
self.buffer = None
self.filtered = None
def setup(self, sensorSettings, fmcwHelper):
self.fmcwHelper = fmcwHelper
if isinstance(sensorSettings, SensorSettings):
self.sensorSettings = sensorSettings
self.buffer = deque(maxlen=sensorSettings.points)
self.window = ss.windows.hamming(self.sensorSettings.points)
def run(self, data):
self.__feed__(data)
self.__sense__()
self.__findMaximumVibration__()
return self.__bufferFilled__()
def __bufferFilled__(self):
filled = False
if self.buffer is not None:
if len(self.buffer) >= self.sensorSettings.points:
filled = True
return filled
def __findMaximumVibration__(self):
if self.__bufferFilled__():
lowFrequency, highFrequency = self.sensorSettings.actualfrequencyRangeBins
lowDistance, highDistance = self.sensorSettings.distanceRangeBins
reducedVibration = self.vibration[lowDistance:highDistance,
lowFrequency:highFrequency]
energy = np.sum(np.square(np.abs(reducedVibration)), axis=-1)
localMaxEnergyIndex = np.argmax(energy)
localPeaks, properties = ss.find_peaks(
np.abs(reducedVibration[localMaxEnergyIndex, :]))
self.maxEnergyIndex = localMaxEnergyIndex + lowDistance
self.peaks = localPeaks + lowFrequency
def __feed__(self, data):
"""
Adds a dataset to the circular buffer.
If the maximum number of elements is reached, this discards the oldest one
"""
if self.buffer is not None:
self.buffer.append(data)
else:
raise AttributeError(
"Sensor has not been setup yet. Feeding data not possible.")
def __sense__(self):
"""
Correlation functionality for filtering the dataset
"""
if self.__bufferFilled__():
data = np.array(self.buffer).swapaxes(0, -1)
shape = data.shape
reduced = data[:, int(shape[1]/2), :]
self.phaseProgression = np.unwrap(np.angle(reduced))
self.vibration = np.fft.rfft(
self.phaseProgression, n=self.sensorSettings.fftPoints)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment