Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Tools
General Python Modules
Vital sensing toolbox
Commits
36d97a92
Commit
36d97a92
authored
Dec 08, 2020
by
elagil
Browse files
various fixes to sensor
parent
5187573c
Changes
1
Hide whitespace changes
Inline
Side-by-side
vitalSensing/sensor.py
View file @
36d97a92
from
typing
import
Type
import
numpy
as
np
import
scipy.signal
as
ss
from
collections
import
deque
class
SensorSettings
():
def
__init__
(
self
,
frequency
Range
,
measurementRate
,
distance
RangeBin
s
=
None
,
points
=
100
):
self
.
frequency
Range
=
frequency
Range
self
.
distance
RangeBin
s
=
distance
RangeBin
s
def
__init__
(
self
,
frequency
Limit
,
measurementRate
,
distance
LimitIndice
s
=
None
,
points
=
100
):
self
.
frequency
Limit
=
frequency
Limit
self
.
distance
LimitIndice
s
=
distance
LimitIndice
s
self
.
measurementRate
=
measurementRate
self
.
points
=
points
self
.
fftPoints
=
1024
self
.
fftPoints
=
4096
self
.
bufferDuration
=
0
...
...
@@ -24,11 +25,11 @@ class SensorSettings():
def
calculateSettings
(
self
):
self
.
bufferDuration
=
self
.
points
/
self
.
measurementRate
self
.
actualfrequency
RangeBin
s
=
self
.
frequenciesToIndices
(
self
.
frequency
Range
)
self
.
actualfrequency
LimitIndice
s
=
self
.
frequenciesToIndices
(
self
.
frequency
Limit
)
self
.
actualFrequency
Range
=
self
.
indicesToFrequencies
(
self
.
actualfrequency
RangeBin
s
)
self
.
actualFrequency
Limit
=
self
.
indicesToFrequencies
(
self
.
actualfrequency
LimitIndice
s
)
self
.
frequencyScale
=
np
.
linspace
(
0
,
self
.
measurementRate
/
2
,
num
=
int
(
self
.
fftPoints
/
2
+
1
),
endpoint
=
False
)
...
...
@@ -46,14 +47,14 @@ class SensorSettings():
"An integer is expected for the amount of sampling points"
)
@
property
def
frequency
Range
(
self
):
return
self
.
_frequency
Range
@
frequency
Range
.
setter
def
frequency
Range
(
self
,
new_frequency
Range
):
if
isinstance
(
new_frequency
Range
,
tuple
)
and
len
(
new_frequency
Range
)
==
2
:
if
new_frequency
Range
[
0
]
<
new_frequency
Range
[
1
]:
self
.
_frequency
Range
=
new_frequency
Range
def
frequency
Limit
(
self
):
return
self
.
_frequency
Limit
@
frequency
Limit
.
setter
def
frequency
Limit
(
self
,
new_frequency
Limit
):
if
isinstance
(
new_frequency
Limit
,
tuple
)
and
len
(
new_frequency
Limit
)
==
2
:
if
new_frequency
Limit
[
0
]
<
new_frequency
Limit
[
1
]:
self
.
_frequency
Limit
=
new_frequency
Limit
else
:
raise
ValueError
(
"Tuple is expected to have rising values in its range."
)
...
...
@@ -62,18 +63,18 @@ class SensorSettings():
"Expected a tuple of size 2 for the frequency range."
)
@
property
def
distance
RangeBin
s
(
self
):
return
self
.
_distance
RangeBin
s
def
distance
LimitIndice
s
(
self
):
return
self
.
_distance
LimitIndice
s
@
distance
RangeBin
s
.
setter
def
distance
RangeBin
s
(
self
,
new_distance
RangeBin
s
):
if
new_distance
RangeBin
s
is
None
:
self
.
_distance
RangeBins
=
None
@
distance
LimitIndice
s
.
setter
def
distance
LimitIndice
s
(
self
,
new_distance
LimitIndice
s
):
if
new_distance
LimitIndice
s
is
None
:
self
.
_distance
LimitIndices
=
(
0
,
-
1
)
else
:
if
isinstance
(
new_distance
RangeBin
s
,
tuple
)
and
len
(
new_distance
RangeBin
s
)
==
2
:
if
new_distance
RangeBin
s
[
0
]
<
new_distance
RangeBin
s
[
1
]:
self
.
_distance
RangeBin
s
=
new_distance
RangeBin
s
if
isinstance
(
new_distance
LimitIndice
s
,
tuple
)
and
len
(
new_distance
LimitIndice
s
)
==
2
:
if
new_distance
LimitIndice
s
[
0
]
<
new_distance
LimitIndice
s
[
1
]:
self
.
_distance
LimitIndice
s
=
new_distance
LimitIndice
s
else
:
raise
ValueError
(
"Tuple is expected to have rising values in its range."
)
...
...
@@ -92,6 +93,8 @@ class Sensor():
self
.
buffer
=
None
self
.
filtered
=
None
self
.
phaseProgressionSignal
=
None
def
setup
(
self
,
sensorSettings
,
fmcwHelper
):
self
.
fmcwHelper
=
fmcwHelper
...
...
@@ -102,10 +105,18 @@ class Sensor():
def
run
(
self
,
data
):
self
.
__feed__
(
data
)
self
.
__sense__
()
self
.
__findMaximumVibration__
()
return
self
.
__bufferFilled__
()
bufferFilled
=
self
.
__bufferFilled__
()
if
bufferFilled
:
self
.
__sense__
()
self
.
__findMaximumVibration__
()
self
.
__extractPhaseProgression__
()
return
bufferFilled
def
__calculateEnergy__
(
self
,
dataset
,
axis
=-
1
):
return
np
.
sum
(
np
.
abs
(
dataset
),
axis
=
axis
)
def
__bufferFilled__
(
self
):
filled
=
False
...
...
@@ -117,23 +128,25 @@ class Sensor():
return
filled
def
__findMaximumVibration__
(
self
):
if
self
.
__bufferFilled__
():
lowFrequency
,
highFrequency
=
self
.
sensorSettings
.
actualfrequencyRangeBins
lowDistance
,
highDistance
=
self
.
sensorSettings
.
distanceRangeBins
reducedVibration
=
self
.
vibration
[
lowDistance
:
highDistance
,
lowFrequency
:
highFrequency
]
reducedVibrationAmplitude
=
np
.
abs
(
self
.
reducedVibration
)
meanVibration
=
np
.
mean
(
reducedVibrationAmplitude
,
axis
=-
1
)
maxVibration
=
np
.
max
(
reducedVibrationAmplitude
,
axis
=-
1
)
self
.
maxEnergyIndex
=
np
.
argmax
(
np
.
divide
(
maxVibration
,
meanVibration
))
energy
=
np
.
sum
(
np
.
square
(
np
.
abs
(
reducedVibration
)),
axis
=-
1
)
localPeaks
,
_
=
ss
.
find_peaks
(
reducedVibrationAmplitude
[
self
.
maxEnergyIndex
,
:])
localMaxEnergyIndex
=
np
.
argmax
(
energy
)
self
.
peakValues
=
reducedVibrationAmplitude
[
self
.
maxEnergyIndex
,
localPeaks
]
localPeaks
,
properties
=
ss
.
find_peaks
(
np
.
abs
(
reducedVibration
[
localMaxEnergyIndex
,
:]))
self
.
rateIndices
=
localPeaks
+
\
self
.
sensorSettings
.
actualfrequencyLimitIndices
[
0
]
self
.
maxEnergyIndex
=
localMaxEnergyIndex
+
lowDistance
self
.
rates
=
self
.
sensorSettings
.
frequencyScale
[
self
.
rateIndices
]
self
.
peaks
=
localPeaks
+
lowFrequency
self
.
dominantRate
=
self
.
rates
[
np
.
argmax
(
self
.
peakValues
)]
def
__feed__
(
self
,
data
):
"""
...
...
@@ -150,12 +163,22 @@ class Sensor():
"""
Correlation functionality for filtering the dataset
"""
if
self
.
__bufferFilled__
():
data
=
np
.
array
(
self
.
buffer
).
swapaxes
(
0
,
-
1
)
data
=
np
.
array
(
self
.
buffer
).
swapaxes
(
0
,
-
1
)
shape
=
data
.
shape
lowDistance
,
highDistance
=
self
.
sensorSettings
.
distanceLimitIndices
reduced
=
data
[
lowDistance
:
highDistance
,
int
(
shape
[
1
]
/
2
),
:]
self
.
rangeEnergies
=
self
.
__calculateEnergy__
(
reduced
)
self
.
phaseProgression
=
np
.
unwrap
(
np
.
angle
(
reduced
))
self
.
vibration
=
np
.
fft
.
rfft
(
self
.
phaseProgression
,
n
=
self
.
sensorSettings
.
fftPoints
)
shape
=
data
.
shape
reduced
=
data
[:,
int
(
shape
[
1
]
/
2
),
:
]
lowFrequency
,
highFrequency
=
self
.
sensorSettings
.
actualfrequencyLimitIndices
self
.
reduced
Vibration
=
self
.
vibration
[:,
lowFrequency
:
highFrequency
]
self
.
phaseProgression
=
np
.
unwrap
(
np
.
angle
(
reduced
))
self
.
vibration
=
np
.
fft
.
rfft
(
self
.
phaseProgression
,
n
=
self
.
sensorSettings
.
fftPoints
)
def
__extractPhaseProgression__
(
self
):
self
.
phaseProgressionSignal
=
self
.
phaseProgression
[
self
.
maxEnergyIndex
,
:]
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment