from ..extension import __Extension__ from networkUtilities import PlatformHandler import numpy as np class Sampler(__Extension__): """ Extends a platform with ADC sampling capability """ def __init__(self, platform, configHandler, dataHandler, bitWidth = 15): super().__init__(self, platform, configHandler, "Sampler") self.configHandler = configHandler self.dataHandler = dataHandler self.adcBitFormatString = bitWidth @property def adcBitFormatString(self): return self._adcBitFormatString @adcBitFormatString.setter def adcBitFormatString(self, new_bitWidth): if new_bitWidth == 7: self._adcBitFormatString = "int8" elif new_bitWidth == 8: self._adcBitFormatString = "uint8" elif new_bitWidth == 16: self._adcBitFormatString = "uint16" else: self._adcBitFormatString = "int16" def configDuration(self, duration): """ Program duration of the sampling """ self.configHandler.communicate(self.interface.sendDouble(["CONFIG", "SAMPLING", "DURATION"], duration)) def configFinished(self): """ Signal end of sampler configuration """ self.configHandler.communicate(self.interface.send(["CONFIG", "SAMPLING", "FINISHED"])) def start(self): """ Make ready for sampling! This connects to the platform and opens the connection. Platform starts sampling automatically. """ self.dataHandler.connect() def stop(self): """ End sampling by disconnecting from the platform. """ self.dataHandler.disconnect() def get(self): """ Single shot sampling and storage on this platform """ # Do the sampling rawBlock = self.dataHandler.receive(leaveOpen=True, sections=[("uint32", 8), (self.adcBitFormatString, None)]) measurement = np.asarray(rawBlock) measurementIndex = measurement[0] errorCounter = measurement[1] data = measurement[2:] try: result = ((data, self.platform.idx), measurementIndex, errorCounter) except AttributeError: result = ((data, 0), measurementIndex, errorCounter) return result def configure(self, duration): """ Configures the sampling settings on the platform """ print("Configure Sampling on %s." % self.platform.name) # Transmit individual configuration settings self.configDuration(duration) self.configFinished()