Data Acquisition with PiMotion™

This Python code demonstrates collecting data (data acquisition) with the PiMotion™.

[source code]

import time
import pilib
import numpy as np

class Pidaq():    
    """ Class for managing data acquisition  """
    
    def __init__(self,pm,chan=2,collection_time=1.0,num_samples=20,trigger=0):
        self.pm = pm
        self.chan = chan

        # calculate the sampling rate in terms of the divisor
        self.divisor = int((float(collection_time) * pilib.DEFAULT_CLK_PIMOTION)/float(num_samples))
        self.valid = False
        self.samples_requested = num_samples
        self.samples_collected = 0
        self.avg_sum = 0.0

        # create a callback to collect the data
        self.pm.daq_register_callback(self.chan, self.samples_requested, 10, self.daq_status)

        # start the data acquisition and wait on trigger (if enabled)
        self.pm.daq_start(self.chan, self.divisor, 0, self.samples_requested, trigger)

        # create a place to store the data
        self.data = np.zeros(num_samples, dtype=np.int32)

    def daq_status(self, chan_id, samples_ready):
        """ Data acquisition callback. This method is called to notify that data is ready to be collected """
        if samples_ready > 0:
            # grab the data
            acquired, sample_id = self.pm.daq_acquire_data(chan_id, samples_ready, self.data[self.samples_collected:])
            self.samples_collected = self.samples_collected + samples_ready
            
        if self.samples_collected >= self.samples_requested:
            self.valid = True
            

if __name__ == "__main__":
    
    num_samples = 5000      # number of samples to collect
    collection_time = 2.0   # the number of seconds to collect data
 
    # make a connection to the PiMotion
    pm = pilib.Pidev('192.168.1.200')
    
    # create two daq channels
    mydaq = Pidaq(pm,chan=2,collection_time=collection_time,num_samples=num_samples,trigger=pilib.TRIG_SRC_MANUAL)
    mydaq2 = Pidaq(pm,chan=3,collection_time=collection_time,num_samples=num_samples,trigger=pilib.TRIG_SRC_MANUAL)

    # trigger the start of data collection
    pm.pidev_trigger_manual()

    # wait for all the data to be collected
    print ('Collecting...')
    while not mydaq.valid and not mydaq2.valid:
        print ('.')
        time.sleep(0.5)
    
    print ("\nData from chan 2 =")
    print (mydaq.data[:mydaq.samples_collected].astype(np.int32))

    print ("\nData from chan 3 =")
    print (mydaq2.data[:mydaq2.samples_collected].astype(np.int32))
    print ("Finished")