"""Python library for accessing the Phoenix ker* fronel driver. The Phoenix kernel driver provides a set of functions for precise time measurements, A/D, D/A conversions, digital I/O etc. As all the timing/hardware related actions are performed in the kernel, we have the freedom to use a language like Python which traditionally is not suited for real-time work. """ START_HIST = 1000 #For ppdas hardware (not for Phoenix) STOP_HIST = 1001 READ_HIST = 1002 CLEAR_HIST = 1003 DIGIN = 0 DIGOUT = 1 SETDAC = 2 MOTORCW = 3 MOTORCCW = 4 SETMOTOR = 5 SELECTADC = 6 READADC = 7 READBLOCK = 8 MULTIREADBLOCK = 9 TRIGREADBLOCK = 10 PERIOD = 11 R2RTIME = 12 R2FTIME = 13 F2RTIME = 14 F2FTIME = 15 SET2RTIME = 16 SET2FTIME = 17 CLR2RTIME = 18 CLR2FTIME = 19 PULSE2RTIME = 20 PULSE2FTIME = 21 PULSEOUT = 22 SLOWPULSEOUT = 23 SETMAXWAIT = 24 ENBIRQ = 30 IRQCOUNT = 31 TIMESTAMP = 32 OUTDATA = 51 OUTCTL = 52 INSTAT = 53 PULSEDATA = 54 PULSECTL = 55 MAXPOINTS = 1024 DEVICE_NAME = '/dev/phoenix' import fcntl, types import os, array, time from Tkinter import * __phoenix = None # old ones plot_colours = ['black', 'red', 'green', 'blue'] WIDTH = 400.0 HALF_HEIGHT = 150.0 YMAX = 5000.0 #5000 mV WIDTH = 300.0 # used by plot() HALF_HEIGHT = 100.0 YMAX = 5000.0 #5000 mV BUFSIZE = 800 # ADC Buffer size should not exceed this (bytes) # how to make the following as static variables of the class ???. buf = array.array('I',range(6)) # used for passing ioctl data buf2 = array.array('I', range(12)) # for getting two timestamps mbuf = buf * MAXPOINTS # for getting ADC block data HISTSIZE = 4096 # PPDAS , 12 bit ADC hist = array.array('I',range(HISTSIZE)) # for ppdas read_hist() data def phoenix(dev=DEVICE_NAME): """Return an object of class Phoenix. 1 Takes care not to return multiple instances.""" global __phoenix if __phoenix == None: try: p = Phoenix(dev) except: print 'Could not open/initialize ', dev print 'Maybe, you have not loaded the phoenix driver' return __phoenix = p return __phoenix class Phoenix: fd = None # file handle plotwin = None canvas =None trace = [] last_message = '' hradda_active = 0 seeprom_active = 0 colors = ['black', 'red', 'green', 'blue'] plotwin = None # used by plot_data() plot_trace = [] border = 5 # used by window() etc. root = None line_data = [] line_trace = [] box_trace = [] box_data = [] grid_trace = [] bordcol = '#555555' gridcol = '#f0f0f0' gridcol2 ='#d0d0d0' fd = None # file handle num_samples = 100 # 1 to 800 num_chans = 1 # 1 to 4 current_chan = 1 # 0 to 3 adc_size = 1 # 1 or 2 bytes adc_delay = 10 # 10 to 1000 usecs, Atmega16 adc_format_bip = 0; # 1 if through level shifter amplifier maxwaitds = 40 # timeout = 40 * 50 msec pulse_width = 13 # 1 to 1000 usecs pulse_pol = 0 # HIGH TRUE (0) or LOW TRUE (1) def __init__(self, dev): self.fd = os.open(dev, os.O_RDWR) # Put the hardware in a `known' state buf[0] = 128 # DAC to zero volt fcntl.ioctl(self.fd, SETDAC, buf, 0) self.adc_chan = 0 buf[0] = 0 fcntl.ioctl(self.fd, SELECTADC, buf, 0) fcntl.ioctl(self.fd, DIGOUT, buf ,0) fcntl.ioctl(self.fd, SETMOTOR, buf, 0) #---------- World Coordinate Graphics Routines using Tkinter ------------ def set_scale(self, x1, y1, x2, y2): """ Calculate the scale factors to be used by draw functions from the upper and lower limits of the X and Y coordinates to be plotted. usage p.set_scale(xmin, ymin, xmax, ymax) """ Phoenix.xmin = float(x1) Phoenix.ymin = float(y1) Phoenix.xmax = float(x2) Phoenix.ymax = float(y2) Phoenix.xscale = (Phoenix.xmax - Phoenix.xmin) / (Phoenix.SCX) Phoenix.yscale = (Phoenix.ymax - Phoenix.ymin) / (Phoenix.SCY) def w2s(self, p): #change from world to screen coordinates ip = [] for xy in p: ix = Phoenix.border + int( (xy[0] - Phoenix.xmin) / Phoenix.xscale) iy = Phoenix.border + int( (xy[1] - Phoenix.ymin) / Phoenix.yscale) iy = Phoenix.YLIM - iy ip.append((ix,iy)) return ip def box(self, points, col = '#e0e0e0'): """ Draws a rectangle on the window opened earlier. Accepts a list of coordinate pairs. usage: p.box([(x1,y1),....,(xn,yn)], 'red') """ ip = self.w2s(points) t = Phoenix.canvas.create_rectangle(ip, fill=col) Phoenix.box_trace.append(t) Phoenix.box_data.append((points, col)) def remove_boxes(self): for ch in range(len(Phoenix.box_trace)): Phoenix.canvas.delete(Phoenix.box_trace[ch]) Phoenix.box_trace = [] Phoenix.box_data = [] def line(self, points, col = 'black', grid = 0): """ Draws a line on the window opened earlier. Accepts a list of coordinate pairs. usage: p.line([(x1,y1),....,(xn,yn)], 'red') """ ip = self.w2s(points) t = Phoenix.canvas.create_line(ip, fill=col, smooth = 0) if grid == 0: Phoenix.line_trace.append(t) Phoenix.line_data.append((points, col)) else: Phoenix.grid_trace.append(t) def remove_lines(self): """ Deletes all the lines drawn by functions line() and plot() """ for ch in range(len(Phoenix.line_trace)): Phoenix.canvas.delete(Phoenix.line_trace[ch]) Phoenix.line_trace = [] Phoenix.line_data = [] def draw_grid(self): major = 10 minor = 100 dx = (Phoenix.xmax - Phoenix.xmin) / major dy = (Phoenix.ymax - Phoenix.ymin) / major x = Phoenix.xmin while x <= Phoenix.xmax: self.line([(x,Phoenix.ymin),(x,Phoenix.ymax)],Phoenix.gridcol,1) x = x +dx y = Phoenix.ymin while y <= Phoenix.ymax: self.line([(Phoenix.xmin,y),(Phoenix.xmax,y)],Phoenix.gridcol,1) y = y +dy dx = (Phoenix.xmax - Phoenix.xmin) / minor dy = (Phoenix.ymax - Phoenix.ymin) / minor x = Phoenix.xmin while x <= Phoenix.xmax: self.line([(x, 0.),(x, dy)],Phoenix.gridcol2,1) x = x +dx y = Phoenix.ymin while y <= Phoenix.ymax: self.line([(0., y),(dx,y)],Phoenix.gridcol2,1) y = y +dy def close_window(self): Phoenix.root.destroy() Phoenix.root = None def resize(self, event): if event.widget != Phoenix.canvas: return Phoenix.SCX = event.width - 2 * (Phoenix.border+1) Phoenix.SCY = event.height - 2 * (Phoenix.border+1) Phoenix.XLIM = event.width Phoenix.YLIM = event.height self.set_scale(Phoenix.xmin, Phoenix.ymin, Phoenix.xmax, Phoenix.ymax) for ch in range(len(Phoenix.grid_trace)): Phoenix.canvas.delete(Phoenix.grid_trace[ch]) Phoenix.grid_trace = [] self.draw_grid() for ch in range(len(Phoenix.line_trace)): Phoenix.canvas.delete(Phoenix.line_trace[ch]) ip = self.w2s(Phoenix.line_data[ch][0]) col = self.line_data[ch][1] Phoenix.line_trace[ch] = Phoenix.canvas.create_line(ip, fill=col) def show_xy(self,event): """ Prints the XY coordinated of the current cursor position """ ix = Phoenix.canvas.canvasx(event.x) - Phoenix.border iy = Phoenix.YLIM - Phoenix.canvas.canvasy(event.y) - Phoenix.border x = ix * Phoenix.xscale + Phoenix.xmin y = iy * Phoenix.yscale + Phoenix.ymin if event.num == 1: s = 'x = %5.0f\ny = %5.3f' % (x,y) Phoenix.marker = (x,y) elif event.num == 3 and Phoenix.marker != None: s = 'x = %5.0f dx = %5.0f\ny = %5.3f dy = %5.3f' % \ (Phoenix.marker[0], x-Phoenix.marker[0], Phoenix.marker[1], y - Phoenix.marker[1]) try: Phoenix.canvas.delete(Phoenix.xydisp) except: pass Phoenix.xydisp = Phoenix.canvas.create_text(Phoenix.border+1,Phoenix.SCY-1, \ anchor = SW, justify = LEFT, text = s) def window(self, width=400, height=300, parent = None): """ Opens a Tkinter window. If no parent window given, a new root window is created and used as the parent. """ if Phoenix.root == None: # create a new window if parent == None: Phoenix.root = Tk() # Inside Toplevel Phoenix.root.title('Phoenix plot') else: Phoenix.root = parent # Inside the given parent window Phoenix.SCX = width Phoenix.SCY = height self.set_scale(0., -5000., 1000., 5000.) # temporary scale Phoenix.XLIM = width + 2 * Phoenix.border Phoenix.YLIM = height + 2 * Phoenix.border Phoenix.canvas = Canvas(Phoenix.root, background="white",\ width = Phoenix.XLIM, height = Phoenix.YLIM) Phoenix.canvas.pack(expand = 1, fill = BOTH) Phoenix.canvas.bind("", self.show_xy) Phoenix.canvas.bind("", self.show_xy) Phoenix.root.bind("", self.resize) Phoenix.root.protocol("WM_DELETE_WINDOW", self.close_window) def plot(self, data, width=400, height=300, parent = None): """ Plots the result of read_block() functions. Provides Grid, window resizing and coordinate measurement. Multiple traces in case of multi_read_block() results. Will delete all the previous plots existing on the window. usage: v = p.read_block(200,10,1) p.plot(v, 400, 300) """ if Phoenix.root == None: self.window(width,height,parent) self.remove_lines() Phoenix.xmax = data[-1][0] self.set_scale(Phoenix.xmin, Phoenix.ymin, Phoenix.xmax, Phoenix.ymax) numchans = len(data[0]) - 1 npoints = len(data) for ch in range(numchans): points = [] for i in range(npoints): points.append((data[i][0], data[i][ch+1])) self.line(points, Phoenix.colors[ch]) """ try: Phoenix.canvas.delete(Phoenix.tbdisp) except: pass mspd = (data[1][0] + data[-1][0]) /10000.0 Phoenix.tbdisp = Phoenix.canvas.create_text(Phoenix.SCX - Phoenix.border - 1, Phoenix.SCY-1, \ anchor = SE, justify = RIGHT, text = 'x = %6.3f ms/div'%(mspd) ) """ # The simple window to plot Data returned by read_block() functions def plot_data(self,v): """ Simple plot window that can be updated very fast. No grid or resize like plot() """ if self.plotwin == None: self.plotwin = Tk() self.plotwin.title('Phoenix plot') self.plotwin.protocol("WM_DELETE_WINDOW", self.clean_qplot) self.canvas = Canvas(self.plotwin, background='white', width=WIDTH + 20, height=HALF_HEIGHT*2 + 20) self.canvas.pack() self.canvas.create_rectangle(10, 10, WIDTH+10, HALF_HEIGHT*2 + 10, outline='#009900') self.canvas.create_line([(10, HALF_HEIGHT+10), (WIDTH+10, HALF_HEIGHT+10)], fill='#00ff00') if len(Phoenix.plot_trace) != 0: map(lambda x: self.canvas.delete(x), Phoenix.plot_trace) Phoenix.plot_trace = [] self.plotwin.update() numchans = len(v[0]) - 1 npoints = len(v) xscale = WIDTH/v[-1][0] yscale = HALF_HEIGHT/YMAX for ch in range(numchans): a = [] for i in range(npoints): x = 10 + v[i][0] * xscale y = (HALF_HEIGHT + 10) - v[i][ch+1] * yscale a.append((x, y)) line = self.canvas.create_line(a, fill=Phoenix.colors[ch]) Phoenix.plot_trace.append(line) self.plotwin.update() def clean_qplot(self): self.plotwin.destroy() self.plotwin = None self.trace = [] def save_data(self, v, fn = 'plot.dat'): """ Saves the dataset returned by read_block() functions to a file in multi-column format default filename is 'plot.dat' Usage: v = p.read_block(200,10,1) p.save_data(v, 'myfile.dat') """ f = open(fn,'w') numchans = len(v[0]) - 1 npoints = len(v) for x in v: s = '' for i in x: s = s + str(i) + ' ' s = s + '\n' f.write(s) f.close() #--------------------Phoenix hardware Routines-------------------- def read_inputs(self): """Return a 4 bit number representing the logic levels on the four digital input pins. Usage: p = Phoenix() p.read_inputs() """ fcntl.ioctl(self.fd, DIGIN, buf, 1) return buf[0] & 15 def write_outputs(self, val): """Write an 8-bit value to the digital output pins. Usage: p = Phoenix() p.write_outputs(integer) """ buf[0] = val fcntl.ioctl(self.fd, DIGOUT, buf, 0) def set_dac(self, val): """Set output voltage of Digital to Analog converter. val from 0 to 255 will set DAC from -5V to +5V """ if ((val < 0) or (val > 255)): print 'DAC o/p should be between 0 and 255' return buf[0] = val fcntl.ioctl(self.fd, SETDAC, buf, 0) def set_voltage(self, val): """Set output voltage of Digital to Analog converter. Usage: p = Phoenix() p = set_voltage(-2345) # o/p -2345 mV p = set_voltage(4315) # o/p 4315 mV Note that the argument `val' should be between -5000.0 and +5000.0.""" if ((val < -5000) or (val > 5000)): print 'DAC o/p should be between -5000 and +5000 mV' return m = 10000.0/255.0 buf[0] = int(128.0 + (val/m)) fcntl.ioctl(self.fd, SETDAC, buf, 0) def write_motor(self, val): """Writes a 4-bit pattern to the stepper motor driver pins. Useful when using relay coils.""" buf[0] = val fcntl.ioctl(self.fd, SETMOTOR, buf, 0) def rotate_motor(self, nsteps, dir): """Rotates a stepper motor `nsteps'; if `dir' is 1, direction is clockwise, else it is counter clockwise.""" buf[0] = nsteps if (dir): fcntl.ioctl(self.fd, MOTORCW, buf, 0) else: fcntl.ioctl(self.fd, MOTORCCW, buf, 0) def select_adc(self, chan): """Select ADC channel to use. Usage: p = Phoenix() p.select_adc(0) Note: Channel number should be between 0 and 7 """ if chan > 7: print 'ADC channel number should be 0 - 7' return self.adc_chan = chan buf[0] = chan fcntl.ioctl(self.fd, SELECTADC, buf, 0) def read_adc(self): """Read from the current ADC channel and return a number between 0 and 255. Usage: p = Phoenix() p.read_adc() """ fcntl.ioctl(self.fd, READADC, buf, 1) ts = float(buf[0]) + float(buf[1])/ 1.0e6; return ts, (buf[2] & 255); def zero_to_5000(self): """Read from ADC and return a voltage in the range 0 to +5000 mV. Usage: p = Phoenix() p.zero_to_5000() """ res = self.read_adc() return res[0], (5000.0/255) * res[1] def minus5000_to_5000(self): """Read from ADC and return a voltage in the range -5000 to +5000 mV. The ADC accepts only 0 to 5V inputs. A bipolar signal will be fed to the ADC input after shifting it up to the 0-5V range. This function simply interprets the digital output of the ADC as a voltage in the -5000 to +5000 mV range.""" res = self.read_adc() return res[0], ((10000.0/255) * res[1]) - 5000.0 def read_block(self,npoints,delay,bipolar): """Read a block of data from the current ADC channel and return an array containg timestamps as well as sampled values. Note: if npoints is 2 the timestaps are absolute. Otherwise they are relative to the first timestamp. This is done for implementing subsecond pendulum digitization delays Usage: p = Phoenix() p.read_block(10, 0, 1) The returned list is of the form: [(timestamp1, adval1), (timestamp2, adval2) ....] If bipolar=1, the returned value will be between -5000mV and +5000mV. The argument `delay' specifies microsecond delay between ADC reads. """ if (npoints > MAXPOINTS): print 'Only ', MAXPOINTS, 'points can be sampled ', print 'at a time.' return mbuf[0] = npoints mbuf[1] = delay fcntl.ioctl(self.fd, READBLOCK, mbuf, 1) if npoints == 2: start = 0 else: start = mbuf[0] + mbuf[1]/1.0e6 dat = [] i = 0; j = 0 while (i < npoints): t = (mbuf[j] + mbuf[j+1]/1.0e6) - start if(bipolar): adval = mbuf[j+2] * (10000.0/255.0) - 5000.0 else: adval = mbuf[j+2] * (5000.0/255.0) dat.append((t, adval)) i = i + 1 j = j + 6 return dat def multi_read_block(self, npoints, nchan, delay, bip): """Digitize more than two ADC channels simultaneously. Always starts with channel zero. Returns a list of groups containg timestamps as well as the required sampled values. Usage: p = Phoenix() p.multi_read_block(256, 3, 0, 1) The returned list is of the form: [(timestamp1, adval1, ..,advalN), ....] ADC value returned is from 0 to 255 size of each element of the list depends on number of channels requested. """ if (npoints > MAXPOINTS): print 'Only ', MAXPOINTS, 'points can be sampled ', print 'at a time.' return mbuf[0] = npoints mbuf[1] = nchan mbuf[2] = delay fcntl.ioctl(self.fd, MULTIREADBLOCK, mbuf, 1) dat = [] i = 0; j = 0 start = mbuf[0] + mbuf[1]/1.0e6 while (i < npoints): item = [] t = (mbuf[j] + mbuf[j+1]/1.0e6) - start item.append(t) for k in range(nchan): if bip: mv = mbuf[j+2+k] * (10000.0/255.0) - 5000.0 else: mv = mbuf[j+2+k] * (5000.0/255.0) item.append(mv) dat.append(item) i = i + 1 j = j + 6 return dat def trig_read_block(self, pin, npoints, deadtime, delay, trigpol, bipolar): """Read a block of data from the current ADC channel and return an array containg timestamps as well as sampled values. The reading starts after 'delay' usecs when specified digital input pin goes zero. Usage: p = Phoenix() p.trig_read_block(0, 100, 0, 200, 0, 1) The returned list is of the form: [(timestamp1, adval1), (timestamp2, adval2) ....] """ if (npoints > MAXPOINTS): print 'Only ', MAXPOINTS, 'points can be sampled ', print 'at a time.' return mbuf[0] = pin mbuf[1] = npoints mbuf[2] = deadtime mbuf[3] = delay mbuf[4] = trigpol fcntl.ioctl(self.fd, TRIGREADBLOCK, mbuf, 1) dat = [] i = 0; j = 0 start = mbuf[0] + mbuf[1]/1.0e6 while (i < npoints): t = (mbuf[j] + mbuf[j+1]/1.0e6) - start if(bipolar): adval = mbuf[j+2] * (10000.0/255.0) - 5000.0 else: adval = mbuf[j+2] * (5000.0/255.0) dat.append((t, adval)) i = i + 1 j = j + 6 return dat #----------------------Time measurement calls start from here def __helper(self, cmd, pin1, pin2): buf2[0] = pin1 buf2[1] = pin2 try: fcntl.ioctl(self.fd, cmd, buf2, 1) except: print 'Timeout error.' return -1.0 t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return t1 - t0 def period(self, pin): """Return period of waveform on `pin'. Pin numbers are from 0 to 3. Return value is in micro seconds.""" buf2[0] = pin buf2[1] = 10 try: fcntl.ioctl(self.fd, PERIOD, buf2, 1) except: print 'get_period: I/O error' return t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return (t1 - t0)/10 def r2rtime(self, pin1, pin2): """Returns delay (in micro seconds) between two consecutive rising edges on pin1 & pin2. """ return self.__helper(R2RTIME, pin1, pin2) def r2ftime(self, pin1, pin2): """Returns delay (in micro seconds) between two consecutive rising and falling edges on two pins, pins[0] and pins[1]. The pins can be the same.""" return self.__helper(R2FTIME, pin1, pin2) def f2rtime(self, pin1, pin2): """Returns delay (in micro seconds) between two consecutive falling and rising edges on two pins. The pins can be the same.""" return self.__helper(F2RTIME, pin1, pin2) def f2ftime(self, pin1, pin2): """Returns delay (in micro seconds) between two consecutive falling edges on two pins, The pins must be distinct.""" return self.__helper(F2FTIME, pin1, pin2) def set2rtime(self, pin1, pin2): """ Returns delay between setting pin1 to the rising of pin2""" return self.__helper(SET2RTIME, pin1, pin2) def set2ftime(self, pin1, pin2): """ Returns delay between setting pin1 to the falling of pin2""" return self.__helper(SET2FTIME, pin1, pin2) def clr2rtime(self, pin1, pin2): """ Returns delay between clearing pin1 to the rising of pin2""" return self.__helper(CLR2RTIME, pin1, pin2) def clr2ftime(self, pin1, pin2): """ Returns delay between clearing pin1 to the falling of pin2""" return self.__helper(CLR2FTIME, pin1, pin2) def pulse2rtime(self, pin1, pin2, width, deadtime, pol): """Send one pulse out on `pin1' with specified width Returns the time delay for a rising edge on pin2 width - width of the pulse deadtime - look for falling edge only after that pol - polarity of output. zero for rising pulse (low to high) one means falling pulse """ buf2[0] = pin1 buf2[1] = pin2 buf2[2] = width buf2[3] = deadtime buf2[4] = pol try: fcntl.ioctl(self.fd, PULSE2RTIME, buf2, 1) except: return -1.0 t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return t1 - t0 def pulse2ftime(self, pin1, pin2, width, deadtime, pol): """Send one pulse out on `pin1' with specified width Returns the time delay for a falling edge on pin2 width - width of the pulse deadtime - look for falling edge only after that pol - polarity of output. zero for rising pulse (low to high) """ buf2[0] = pin1 buf2[1] = pin2 buf2[2] = width buf2[3] = deadtime buf2[4] = pol try: fcntl.ioctl(self.fd, PULSE2FTIME, buf2, 1) except: return -1.0 t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return t1 - t0 def mid_high_time(self, pin): """Look for a HIGH TRUE TTL pulse on the specified input pin and returns the absolute time stamp of its midpoint. Make sense only as a difference between two time stamps """ buf2[0] = pin buf2[1] = pin try: fcntl.ioctl(self.fd, R2FTIME, buf2, 1) except: print 'mid_high_time: error.' return -1.0 t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return t0 + (t1 - t0) / 2.0; def mid_low_time(self, pin): """Look for a LOW TRUE TTL pulse on the specified input pin and returns the absolute time stamp of its midpoint. Make sense only as a difference between two time stamps """ buf2[0] = pin buf2[1] = pin try: fcntl.ioctl(self.fd, F2RTIME, buf2, 1) except: print 'mid_high_time: error.' return -1.0 t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return t0 + (t1 - t0) / 2.0; #------------------------------------------------------------------- def pulse_out(self, pin, hightime, lowtime, npulses, pol): """Send `npulses' pulses out on `pin' with specified `high' and `low' times, both not exceeding 1 millisecond. Returns total time (in microseconds) spent generating the pulses.""" buf2[0] = pin buf2[1] = hightime buf2[2] = lowtime buf2[3] = npulses buf2[4] = pol try: fcntl.ioctl(self.fd, PULSEOUT, buf2, 1) except: return -1.0 t0 = buf2[0] * 1.0e6 + buf2[1] t1 = buf2[6] * 1.0e6 + buf2[7] return t1 - t0 def set_timeout(self, timeout): """Time measurement calls wait inside the kernel and freezes the system for that period. It is essential to provide maximum limits for waiting. The default is around 1000000 micro seconds (ie, 1 second). Users can change this to any value between 5 milli second and 5 second. The argument `timeout' specifies the new timeout in microseconds.""" buf[0] = timeout try: fcntl.ioctl(self.fd, SETMAXWAIT, buf, 0) except: print 'set_timeout: error in changing timeout' def timestamp(self): """Return the time stamp. Time elapsed after AD1970""" try: fcntl.ioctl(self.fd, TIMESTAMP, buf2, 1) except: print 'timestamp: I/O error' return t0 = buf2[0] * 1.0e6 + buf2[1] return t0 #----------------- functions for hardware trouble shooting------------- def write_dataport(self, dat): """Writes to dataport 0x378""" buf[0] = dat try: fcntl.ioctl(self.fd, OUTDATA, buf, 0) except: print 'write_data_port: I/O error' def write_ctlport(self, dat): """Writes to control port""" buf[0] = dat try: fcntl.ioctl(self.fd, OUTCTL, buf, 0) except: print 'write_ctl_port: I/O error' def pulse_dataport(self, mask): """pulse dataport. 1024 pulses at full speed""" buf[0] = mask try: fcntl.ioctl(self.fd, OUTDATA, buf, 0) except: print 'pulse_data_port: I/O error' def write_ctlport(self, mask): """pulse control port. 1024 pulses at full speed""" buf[0] = mask try: fcntl.ioctl(self.fd, OUTCTL, buf, 0) except: print 'write_ctl_port: I/O error' def read_statport(self): """Returns status port value""" try: fcntl.ioctl(self.fd, INSTAT, buf, 1) except: print 'read_statport: I/O error' return return buf[0] #------------------------ PPDAS Hardware routines ------------------------ def start_hist(self): """ Starts the ppdas box """ if (fcntl.ioctl(self.fd, START_HIST, hist, 1) < 0): return False return True def stop_hist(self): """ Stops the ppdas box """ if (fcntl.ioctl(self.fd, STOP_HIST, hist, 1) < 0): return False return True def clear_hist(self): """ Clears the ppdas histogram """ if (fcntl.ioctl(self.fd, CLEAR_HIST, hist, 1) < 0): return False return True def read_hist(self): """ Returns the Histogram from ppdas box in a 4096 element list """ if (fcntl.ioctl(self.fd, READ_HIST, hist, 1) < 0): return None data = [] for k in range(HISTSIZE): data.append((k,hist[k])) return data;