Add and update calibration
authorDan White <dan@whiteaudio.com>
Sat, 27 Oct 2012 03:17:20 +0000 (22:17 -0500)
committerDan White <dan@whiteaudio.com>
Sat, 27 Oct 2012 03:17:20 +0000 (22:17 -0500)
python-lib/calibrate.py
python-lib/devboard.py

index a6e332e2a4563de0f0882bc481a884f4de8722be..98e43ac9c232883534f52e9b35576a68da711047 100644 (file)
@@ -1,6 +1,5 @@
 
 from time import sleep
-
 from termplotter import TermPlotter
 
 tplot = TermPlotter([-2**11, 2**11], width=80)
@@ -8,12 +7,14 @@ tpx = TermPlotter([-128, 127], width=80)
 
 
 
-# devboard is assumed to be imported as:
-#
-# import devboard as dev
 #
-# and already initialized/setup to the desired state
+# devboard is assumed to be already initialized to the desired state
 #
+# This import, then, does not trigger *creation* of the objects, just
+# gets the references already setup.
+import devboard as dev
+
+
 
 #helpers
 chain = dev.chain
@@ -24,6 +25,16 @@ adc = dev.adc
 dac = dev.dac
 
 
+#
+# configuration
+#
+N_CHANNELS = 48
+MUX_SETTLE_DELAY = 0.1
+MUX_CAL_DELAY = 0.5
+N_SAMPLES = 1
+SAMPLE_DELAY = 0.02
+
+
 def bisect(fset, limits=[0.0, 1.0], dgmin=None, up='uk', down='dj'):
     """Given a one-input function fset and list of limits, interactively bisect
     the range by sending fset(guess) and asking to go (u)p or (d)own for the
@@ -71,13 +82,15 @@ def bisect(fset, limits=[0.0, 1.0], dgmin=None, up='uk', down='dj'):
 
 
 
-def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
+def secant_opt(func, x0, limits, elements=None, x1scale=0.1, maxiter=25, verbose=True):
     """
     Find the function's zero on a bounded interval using the secant method.
 
     x0 - initial integer guess
     limits - tuple of min,max for x
 
+    elements - coordinates of x0 to calibrate, leaving others alone
+
     Evaluates func(x0), guesses x1 by sign of result and x1scale'th of the
     range in that direction.  The expected function shape is atan()-like.
     """
@@ -87,20 +100,26 @@ def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
     xmin = min(limits)
     xmax = max(limits)
 
-    converging = ones(x0.shape, dtype=int)
+    # unspecified means: do all items of x0
+    if elements is None:
+        converging = ones(x0.shape, dtype=int)
+    else:
+        converging = zeros(x0.shape, dtype=int)
+        for idx in tuple(elements):
+            converging[tuple(idx)] = 1
+
     evaluations = zeros(x0.shape, dtype=int)
+    # channels of interest get 2 evaluations before starting the loop
+    evaluations += 2*converging
 
     def saturate(x):
         imax = find(x > xmax)
         imin = find(x < xmin)
         x.flat[imax] = xmax
         x.flat[imin] = xmin
-        return int64(around(x))
-        #return int(round(max(min(x, xmax), xmin)))
+        return array(around(x), dtype=int)
 
     def feval(x, conv):
-        #print 'feval:'
-        #print x[0:N_CHANNELS]
         r = func(x, conv)
         guesses.append(x)
         results.append(r)
@@ -114,14 +133,11 @@ def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
     for niter in range(maxiter):
         iz = find(q1==q0)
         q1.flat[iz] = q1.flat[iz] + 1e-6
-        #dp = q1*(p1 - p0)/(q1 - q0)
         dp = q1*abs(p1 - p0)/abs(q1 - q0)
-        #dp = k*q1*(p1 - p0)/(q1 - q0)
-        #dp = k*sign(q1)*abs(p1-p0)
 
         max_step = r * 2**(-niter + 1)
         ia = find(abs(dp) > max_step)
-        if len(ia) > 0: print 'limiting max steps:', ia
+        if verbose and (len(ia) > 0): print 'limiting max steps:', ia
         dp.flat[ia] = sign(dp.flat[ia]) * max_step
 
         #force a +-1 step
@@ -134,6 +150,8 @@ def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
         p = saturate(p1 - dp)
         q = feval(p, converging)
 
+        evaluations += converging
+
         # Convergence criteria
         #   x to varies by (+1,-1) or (-1,+1)
         #   or 3 consecutive guesses the same
@@ -147,9 +165,10 @@ def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
                 #ensure we pick the correct value
                 if (a0 <= a1) and (a <= a1):
                     converging.flat[i] = 0
-                    evaluations.flat[i] = 3 + niter
+                    #evaluations.flat[i] = 3 + niter
+
 
-        print converging[0:N_CHANNELS]
+        if verbose: print converging
         if not (sum(converging[0:N_CHANNELS]) > 0.0): 
             return (p0, {'nevals': evaluations,
                          'guesses': guesses,
@@ -160,11 +179,11 @@ def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
         q0 = q1
         p1 = p
         q1 = q
+    # failed to completely converge in niter steps, return the current state
     return (p0, {'nevals': evaluations,
                  'guesses': guesses,
                  'results': results,
                  'converged': False})
-    #raise Error("Failed to converge.")
 
 
 
@@ -172,32 +191,33 @@ def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
 def offset2signed(v, bits):
     return (v - 2**(bits-1)) >> 2
 
-def mux_a_offset(x, c):
-    xi = int(x[0])
+def mux_a_offset(x, c, verbose=True):
+    #xi = int(x[0])
+    xi = int(x)
     mux.otaA.offset = xi
     mux.write()
-    #sleep(2.0)
+    sleep(MUX_CAL_DELAY)
     adc.read()
     values = []
     for i in range(N_SAMPLES):
         values.append(offset2signed(adc.read(), 16))
         sleep(SAMPLE_DELAY)
     mv = mean(values)
-    print tpx(xi), tplot(mv)
+    #if verbose: print tpx(xi), tplot(mv)
     return array((mv,))
 
-def mux_b_offset(x, c):
-    xi = int(x[0])
+def mux_b_offset(x, c, verbose=True):
+    xi = int(x)
     mux.otaB.offset = xi
     mux.write()
-    #sleep(2.0)
+    sleep(MUX_CAL_DELAY)
     adc.read()
     values = []
     for i in range(N_SAMPLES):
         values.append(offset2signed(adc.read(), 16))
         sleep(SAMPLE_DELAY)
     mv = mean(values)
-    print tpx(xi), tplot(mv)
+    #if verbose: print tpx(xi), tplot(mv)
     return array((mv,))
 
 def chain_a_offset(x, n, mux_offset):
@@ -221,40 +241,42 @@ def chain_a_offset(x, n, mux_offset):
         values.append(offset2signed(adc.read(), 16))
         sleep(DELAY)
     mv = mean(values)
-    print tpx(xi), tplot(mv)
+    if verbose: print tpx(xi), tplot(mv)
     return mv
 
 
-def chain_outputs(n, converged):
+def chain_outputs(converged, verbose=True):
     adc.triggerMode(adc.MODE_IDLE)
     adc.mux(4) #mux.otaA
     adc.fifo(1)
     adc.triggerMode(adc.MODE_MANUAL_MANUAL)
 
-    outs = zeros((2, n+1))
-    for i in range(n+1):
+    outs = zeros(converged.shape)
+    #each harmonic
+    for i in range(outs.shape[1]):
         # skip evaluating converged channel groups
         # output values are ignored for these channels
         if sum(converged[:,i]) == 0:
             outs[0, i] = 0.0
             outs[1, i] = 0.0
-            print 'skipping converged channel pair: %02i' % i
+            if verbose: print 'skipping converged channel pair: %02i' % i
             continue
 
         mux.selA = i
         mux.selB = i
         mux.write()
-
         sleep(MUX_SETTLE_DELAY)
+
+        # I, Q
         for j in range(2):
             # also ignore individual channel
             # (small speedup, but it all helps...)
             if converged[j, i] == 0:
                 outs[j, i] = 0.0
-                print 'skipping converged channel %i %s' % (i, ('A','B')[j])
+                if verbose: print 'skipping converged channel %i %s' % (i, ('A','B')[j])
                 continue
 
-            adc.mux(4+j)
+            adc.mux(4+j) #vouts on channels 4,5
             adc.read()
             values = []
             v = offset2signed(adc.read(), 16)
@@ -264,42 +286,53 @@ def chain_outputs(n, converged):
                 v = offset2signed(adc.read(), 16)
                 values.append(v)
             if N_SAMPLES >= 2:
-                mv = mean(values[-1*min(10, N_SAMPLES):-1])
+                mv = mean(values[-1*min(10, N_SAMPLES):-1]) #TODO
             else:
                 mv = values[0]
             outs[j, i] = mv
 
-    print '*** measured outputs: ***'
-    print outs[:,0:N_CHANNELS]
+    if verbose:
+        print '*** measured outputs: ***'
+        print outs
+
     return outs
 
-def chain_offsets(values, converged, mux_offset):
-    print
-    print '*** sending values: ***'
-    print values[0:N_CHANNELS]
-    for i,n in enumerate(values[0]):
-        chain.h[i].otaA.offset = int(n)
-        chain.h[i].otaA.fast = 1
-        chain.h[i].cal = 1
-        chain.h[i].otaA.se = 1
-        chain.h[i].otaA.cint = 0
-        chain.h[i].otaA.zero = 0
-
-    for i,n in enumerate(values[1]):
-        chain.h[i].otaB.offset = int(n)
-        chain.h[i].otaB.fast = 1
-        chain.h[i].otaB.se = 1
-        chain.h[i].otaB.cint = 0
-        chain.h[i].otaB.zero = 0
+
+def chain_offsets(values, converged, mux_offset, verbose=True):
+    if verbose:
+        print
+        print '*** sending values: ***'
+        print values
+
+    for hi in range(N_CHANNELS):
+        for iq in range(2):
+            # only update flagged values
+            if converged[iq, hi] == 1:
+                chain.h[hi].ota[iq].offset = int(values[iq, hi])
+
+        #for i,n in enumerate(values[0]):
+            ## only update flagged values
+            #if converged[0, i]:
+                #chain.h[i].otaA.offset = int(n)
+
+        #for i,n in enumerate(values[1]):
+            ## only update flagged values
+            #if converged[0, i]:
+                #chain.h[i].otaB.offset = int(n)
+
     chain.write()
-    sleep(1.0)
+    sleep(1.0) # TODO: better delay value?
 
-    outs = chain_outputs(N_CHANNELS-1, converged)
+    outs = chain_outputs(converged, verbose=verbose)
 
+    # remove measured residual offset from mux to "see through" the pad buffer
     for i in range(2):
         outs[i,:] -= mux_offset[i]
 
-    return array(outs)
+    return outs
+
+
+
 
 dac.vina(1.25)
 dac.vinb(1.25)
@@ -319,15 +352,11 @@ adc.triggerMode(adc.MODE_MANUAL_MANUAL)
 
 
 
-N_CHANNELS = 48
-MUX_SETTLE_DELAY = 0.1
-N_SAMPLES = 1
-SAMPLE_DELAY = 0.02
 
 
 import datetime as dt
 
-while True:
+while False:
     if 1:
         mux.otaA.mode = mux.otaA.CAL_BUF
         mux.otaA.fast = 1
@@ -336,7 +365,11 @@ while True:
         print
         print 'Calibrating mux otaA'
         adc.mux(4)
-        resA, tmp = secant_opt(mux_a_offset, zeros((1,), dtype=int), [-128, 127])
+        resA, tmp = secant_opt(
+                mux_a_offset,
+                zeros((1,), dtype=int),
+                [-128, 127],
+                )
         muxA_offset = mean([offset2signed(adc.read(), 16) for i in range(1000)])
         print '*** mux.otaA.offset =', muxA_offset, ' @', resA
     else:
@@ -353,7 +386,11 @@ while True:
         print 'Calibrating mux otaB'
         adc.mux(5)
         #resB = secant_opt(mux_b_offset, 0, [-128, 127])
-        resB, tmp = secant_opt(mux_b_offset, zeros((1,), dtype=int), [-128, 127])
+        resB, tmp = secant_opt(
+                mux_b_offset,
+                zeros((1,), dtype=int),
+                [-128, 127],
+                )
         muxB_offset = mean([offset2signed(adc.read(), 16) for i in range(1000)])
         print '*** mux.otaB.offset =', muxB_offset, ' @', resB
     else:
@@ -383,7 +420,15 @@ while True:
         mux.otaA.mode = mux.otaA.MUX_BUF
         mux.otaB.mode = mux.otaB.MUX_BUF
         mux.write()
-        chain_os, stats = secant_opt(lambda x,y: chain_offsets(x, y, (muxA_offset, muxB_offset)), resA*ones((2,N_CHANNELS), dtype=int), (-128, 127), k=0.5)
+        #def secant_opt(func, x0, limits, elements=None, x1scale=0.1, maxiter=50):
+
+        chain_os, stats = secant_opt(
+                lambda x,y: chain_offsets(x, y, (muxA_offset, muxB_offset)),
+                resA*ones((2,N_CHANNELS), dtype=int),
+                (-128, 127),
+                verbose=True,
+                )
+
         print '-'*80
         print 'offsets:', chain_os[:N_CHANNELS]
         print 'N-evals:', stats['nevals'][:N_CHANNELS]
@@ -405,3 +450,73 @@ while True:
             )
     break
 
+
+
+def calibrate(name, elements=None, verbose=True):
+    """Calibrate the block identified by name.  If ids is specified, this is
+    an element or list of the specific elements to be calibrated, leaving
+    others alone.
+
+    NB: The calibration routines only modify the relevant ota.offset values,
+    and sets the .cal bit for the chains.  All other settings (.fast, cint,
+    etc) are left un-touched.
+    """
+    #all amplifiers share this
+    limits = (-128, 127)
+
+    if name == 'chain':
+        def func(values, converging):
+            return chain_offsets(
+                    values,
+                    converging,
+                    (int(mux.otaA.offset), int(mux.otaB.offset))
+                    )
+
+        #setup initial guess, just use the current values
+        x0 = zeros((2, N_CHANNELS), dtype=int)
+        for i in range(N_CHANNELS):
+            for j in range(2):
+                x0[j,i] = chain.h[i].ota[j].offset
+        offsets, stats = secant_opt(func, x0, limits,
+                elements=elements, verbose=verbose)
+
+    elif name == 'mux':
+        #save mux state
+        old_mode = [mux.ota[i].mode for i in range(2)]
+
+        mux.otaA.mode = mux.otaA.CAL_CMP
+        mux.otaB.mode = mux.otaB.CAL_CMP
+        mux.write()
+        adc.mux(4)
+
+        def func(values, converging):
+            outA = mux_a_offset(values[0], converging[0], verbose)
+            outB = mux_b_offset(values[1], converging[1], verbose)
+            out = zeros((2,))
+            out[0] = outA
+            out[1] = outB
+            return out
+            return array([outA, outB])
+
+        #setup initial guess, just use the current values
+        x0 = zeros((2,), dtype=int)
+        for i in range(2):
+            x0[i] = int(mux.ota[i].offset)
+
+        offsets, stats = secant_opt(func, x0, limits,
+                elements=elements, verbose=verbose)
+
+        #restore mux state
+        for i,mode in enumerate(old_mode):
+            mux.ota[i].mode = mode
+        mux.write()
+
+    else:
+        print 'ERROR: unknown name %s' % name
+
+    return (offsets, stats)
+
+
+
+
+
index 2d0c91e4a113afee583c3c890d8784aa30542b7d..709793b4c3f2c5fb4d5c781a38a7b9136e2224db 100644 (file)
@@ -3,8 +3,12 @@
 from datetime import datetime as dt
 from time import sleep
 import yaml
+
+#local
 import usbio
 
+
+
 #
 # devboard conversion constants
 #
@@ -167,6 +171,8 @@ def save_config(name=None):
 # devboard configuration
 _config = {}
 
+is_initialized = False
+
 spi0 = None
 spi1 = None
 i2c = None
@@ -184,19 +190,41 @@ def init_devboard(name='devboard-defaults.yaml'):
     """Initialize devboard <-> usbio with a known configuration.  `name` may be
     a string filename, defaulting to 'devboard-defaults.yaml' or an integer
     representing the file 'chip%02i.calibration.yaml'.
+
+    If the board has been initialized, start fresh by re-initing the usb also.
     """
+    global is_initialized
     global spi0, spi1, i2c
     global ibias, vatoi, v430
     global dac, adc
     global chain, mux
     global arb, amux
 
+    #blank the slate
+    if is_initialized:
+        _config = {}
+        spi0 = None
+        spi1 = None
+        i2c = None
+        ibias = None
+        vatoi = None
+        v430 = None
+        dac = None
+        adc = None
+        chain = None
+        mux = None
+        arb = None
+        amux = None
+        is_initialized = False
+
+
     # try loading config for given chip ID
     if isinstance(name, int):
         name = 'chip%02i-calibration.yaml' % name
 
     try:
         load_config(name)
+        is_initialized = True
         return
     except IOError:
         s = '* WARNING: could not find "%s", using built-in defaults' % name
@@ -262,6 +290,7 @@ def init_devboard(name='devboard-defaults.yaml'):
 
     psdefaults()
     ## done with init_devboard()
+    is_initialized = True