Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from ..extension import __Extension__
from networkUtilities import PlatformHandler
import numpy as np
class Sampler(__Extension__):
"""
Extends a platform with ADC sampling capability
"""
def __init__(self, platform, configHandler, dataHandler):
super().__init__(self, platform, configHandler, "Sampler")
self.configHandler = configHandler
self.dataHandler = dataHandler
def configDuration(self, duration):
"""
Program duration of the sampling
"""
self.configHandler.communicate(self.interface.sendDouble(["CONFIG", "SAMPLING", "DURATION"], duration))
def configFinished(self):
"""
Signal end of sampler configuration
"""
self.configHandler.communicate(self.interface.send(["CONFIG", "SAMPLING", "FINISHED"]))
def start(self):
"""
Make ready for sampling! This connects to the platform and opens the connection. Platform starts sampling automatically.
"""
self.dataHandler.connect()
def stop(self):
"""
End sampling by disconnecting from the platform.
"""
self.dataHandler.disconnect()
def get(self):
"""
Single shot sampling and storage on this platform
"""
# Do the sampling
self.adcBitFormatString = "int16"
rawBlock = self.dataHandler.receive(leaveOpen=True, sections=[("uint32", 8), (self.adcBitFormatString, None)])
measurement = np.asarray(rawBlock)
measurementIndex = measurement[0]
errorCounter = measurement[1]
data = measurement[2:]
try:
result = ((data, self.platform.idx), measurementIndex, errorCounter)
except AttributeError:
result = ((data, 0), measurementIndex, errorCounter)
return result
def configure(self, duration):
"""
Configures the sampling settings on the platform
"""
print("Configure Sampling on %s." % self.platform.name)
# Transmit individual configuration settings
self.configDuration(duration)
self.configFinished()