Data Acquisition with PiMotion™¶
This Python code demonstrates collecting data (data acquisition) with the PiMotion™.
import time
import pilib
import numpy as np
class Pidaq():
""" Class for managing data acquisition """
def __init__(self,pm,chan=2,collection_time=1.0,num_samples=20,trigger=0):
self.pm = pm
self.chan = chan
# calculate the sampling rate in terms of the divisor
self.divisor = int((float(collection_time) * pilib.DEFAULT_CLK_PIMOTION)/float(num_samples))
self.valid = False
self.samples_requested = num_samples
self.samples_collected = 0
self.avg_sum = 0.0
# create a callback to collect the data
self.pm.daq_register_callback(self.chan, self.samples_requested, 10, self.daq_status)
# start the data acquisition and wait on trigger (if enabled)
self.pm.daq_start(self.chan, self.divisor, 0, self.samples_requested, trigger)
# create a place to store the data
self.data = np.zeros(num_samples, dtype=np.int32)
def daq_status(self, chan_id, samples_ready):
""" Data acquisition callback. This method is called to notify that data is ready to be collected """
if samples_ready > 0:
# grab the data
acquired, sample_id = self.pm.daq_acquire_data(chan_id, samples_ready, self.data[self.samples_collected:])
self.samples_collected = self.samples_collected + samples_ready
if self.samples_collected >= self.samples_requested:
self.valid = True
if __name__ == "__main__":
num_samples = 5000 # number of samples to collect
collection_time = 2.0 # the number of seconds to collect data
# make a connection to the PiMotion
pm = pilib.Pidev('192.168.1.200')
# create two daq channels
mydaq = Pidaq(pm,chan=2,collection_time=collection_time,num_samples=num_samples,trigger=pilib.TRIG_SRC_MANUAL)
mydaq2 = Pidaq(pm,chan=3,collection_time=collection_time,num_samples=num_samples,trigger=pilib.TRIG_SRC_MANUAL)
# trigger the start of data collection
pm.pidev_trigger_manual()
# wait for all the data to be collected
print ('Collecting...')
while not mydaq.valid and not mydaq2.valid:
print ('.')
time.sleep(0.5)
print ("\nData from chan 2 =")
print (mydaq.data[:mydaq.samples_collected].astype(np.int32))
print ("\nData from chan 3 =")
print (mydaq2.data[:mydaq2.samples_collected].astype(np.int32))
print ("Finished")