from time import sleep
-
from termplotter import TermPlotter
tplot = TermPlotter([-2**11, 2**11], width=80)
-# devboard is assumed to be imported as:
-#
-# import devboard as dev
#
-# and already initialized/setup to the desired state
+# devboard is assumed to be already initialized to the desired state
#
+# This import, then, does not trigger *creation* of the objects, just
+# gets the references already setup.
+import devboard as dev
+
+
#helpers
chain = dev.chain
dac = dev.dac
+#
+# configuration
+#
+N_CHANNELS = 48
+MUX_SETTLE_DELAY = 0.1
+MUX_CAL_DELAY = 0.5
+N_SAMPLES = 1
+SAMPLE_DELAY = 0.02
+
+
def bisect(fset, limits=[0.0, 1.0], dgmin=None, up='uk', down='dj'):
"""Given a one-input function fset and list of limits, interactively bisect
the range by sending fset(guess) and asking to go (u)p or (d)own for the
-def secant_opt(func, x0, limits, x1scale=0.1, maxiter=50, k=1.0):
+def secant_opt(func, x0, limits, elements=None, x1scale=0.1, maxiter=25, verbose=True):
"""
Find the function's zero on a bounded interval using the secant method.
x0 - initial integer guess
limits - tuple of min,max for x
+ elements - coordinates of x0 to calibrate, leaving others alone
+
Evaluates func(x0), guesses x1 by sign of result and x1scale'th of the
range in that direction. The expected function shape is atan()-like.
"""
xmin = min(limits)
xmax = max(limits)
- converging = ones(x0.shape, dtype=int)
+ # unspecified means: do all items of x0
+ if elements is None:
+ converging = ones(x0.shape, dtype=int)
+ else:
+ converging = zeros(x0.shape, dtype=int)
+ for idx in tuple(elements):
+ converging[tuple(idx)] = 1
+
evaluations = zeros(x0.shape, dtype=int)
+ # channels of interest get 2 evaluations before starting the loop
+ evaluations += 2*converging
def saturate(x):
imax = find(x > xmax)
imin = find(x < xmin)
x.flat[imax] = xmax
x.flat[imin] = xmin
- return int64(around(x))
- #return int(round(max(min(x, xmax), xmin)))
+ return array(around(x), dtype=int)
def feval(x, conv):
- #print 'feval:'
- #print x[0:N_CHANNELS]
r = func(x, conv)
guesses.append(x)
results.append(r)
for niter in range(maxiter):
iz = find(q1==q0)
q1.flat[iz] = q1.flat[iz] + 1e-6
- #dp = q1*(p1 - p0)/(q1 - q0)
dp = q1*abs(p1 - p0)/abs(q1 - q0)
- #dp = k*q1*(p1 - p0)/(q1 - q0)
- #dp = k*sign(q1)*abs(p1-p0)
max_step = r * 2**(-niter + 1)
ia = find(abs(dp) > max_step)
- if len(ia) > 0: print 'limiting max steps:', ia
+ if verbose and (len(ia) > 0): print 'limiting max steps:', ia
dp.flat[ia] = sign(dp.flat[ia]) * max_step
#force a +-1 step
p = saturate(p1 - dp)
q = feval(p, converging)
+ evaluations += converging
+
# Convergence criteria
# x to varies by (+1,-1) or (-1,+1)
# or 3 consecutive guesses the same
#ensure we pick the correct value
if (a0 <= a1) and (a <= a1):
converging.flat[i] = 0
- evaluations.flat[i] = 3 + niter
+ #evaluations.flat[i] = 3 + niter
+
- print converging[0:N_CHANNELS]
+ if verbose: print converging
if not (sum(converging[0:N_CHANNELS]) > 0.0):
return (p0, {'nevals': evaluations,
'guesses': guesses,
q0 = q1
p1 = p
q1 = q
+ # failed to completely converge in niter steps, return the current state
return (p0, {'nevals': evaluations,
'guesses': guesses,
'results': results,
'converged': False})
- #raise Error("Failed to converge.")
def offset2signed(v, bits):
return (v - 2**(bits-1)) >> 2
-def mux_a_offset(x, c):
- xi = int(x[0])
+def mux_a_offset(x, c, verbose=True):
+ #xi = int(x[0])
+ xi = int(x)
mux.otaA.offset = xi
mux.write()
- #sleep(2.0)
+ sleep(MUX_CAL_DELAY)
adc.read()
values = []
for i in range(N_SAMPLES):
values.append(offset2signed(adc.read(), 16))
sleep(SAMPLE_DELAY)
mv = mean(values)
- print tpx(xi), tplot(mv)
+ #if verbose: print tpx(xi), tplot(mv)
return array((mv,))
-def mux_b_offset(x, c):
- xi = int(x[0])
+def mux_b_offset(x, c, verbose=True):
+ xi = int(x)
mux.otaB.offset = xi
mux.write()
- #sleep(2.0)
+ sleep(MUX_CAL_DELAY)
adc.read()
values = []
for i in range(N_SAMPLES):
values.append(offset2signed(adc.read(), 16))
sleep(SAMPLE_DELAY)
mv = mean(values)
- print tpx(xi), tplot(mv)
+ #if verbose: print tpx(xi), tplot(mv)
return array((mv,))
def chain_a_offset(x, n, mux_offset):
values.append(offset2signed(adc.read(), 16))
sleep(DELAY)
mv = mean(values)
- print tpx(xi), tplot(mv)
+ if verbose: print tpx(xi), tplot(mv)
return mv
-def chain_outputs(n, converged):
+def chain_outputs(converged, verbose=True):
adc.triggerMode(adc.MODE_IDLE)
adc.mux(4) #mux.otaA
adc.fifo(1)
adc.triggerMode(adc.MODE_MANUAL_MANUAL)
- outs = zeros((2, n+1))
- for i in range(n+1):
+ outs = zeros(converged.shape)
+ #each harmonic
+ for i in range(outs.shape[1]):
# skip evaluating converged channel groups
# output values are ignored for these channels
if sum(converged[:,i]) == 0:
outs[0, i] = 0.0
outs[1, i] = 0.0
- print 'skipping converged channel pair: %02i' % i
+ if verbose: print 'skipping converged channel pair: %02i' % i
continue
mux.selA = i
mux.selB = i
mux.write()
-
sleep(MUX_SETTLE_DELAY)
+
+ # I, Q
for j in range(2):
# also ignore individual channel
# (small speedup, but it all helps...)
if converged[j, i] == 0:
outs[j, i] = 0.0
- print 'skipping converged channel %i %s' % (i, ('A','B')[j])
+ if verbose: print 'skipping converged channel %i %s' % (i, ('A','B')[j])
continue
- adc.mux(4+j)
+ adc.mux(4+j) #vouts on channels 4,5
adc.read()
values = []
v = offset2signed(adc.read(), 16)
v = offset2signed(adc.read(), 16)
values.append(v)
if N_SAMPLES >= 2:
- mv = mean(values[-1*min(10, N_SAMPLES):-1])
+ mv = mean(values[-1*min(10, N_SAMPLES):-1]) #TODO
else:
mv = values[0]
outs[j, i] = mv
- print '*** measured outputs: ***'
- print outs[:,0:N_CHANNELS]
+ if verbose:
+ print '*** measured outputs: ***'
+ print outs
+
return outs
-def chain_offsets(values, converged, mux_offset):
- print
- print '*** sending values: ***'
- print values[0:N_CHANNELS]
- for i,n in enumerate(values[0]):
- chain.h[i].otaA.offset = int(n)
- chain.h[i].otaA.fast = 1
- chain.h[i].cal = 1
- chain.h[i].otaA.se = 1
- chain.h[i].otaA.cint = 0
- chain.h[i].otaA.zero = 0
-
- for i,n in enumerate(values[1]):
- chain.h[i].otaB.offset = int(n)
- chain.h[i].otaB.fast = 1
- chain.h[i].otaB.se = 1
- chain.h[i].otaB.cint = 0
- chain.h[i].otaB.zero = 0
+
+def chain_offsets(values, converged, mux_offset, verbose=True):
+ if verbose:
+ print
+ print '*** sending values: ***'
+ print values
+
+ for hi in range(N_CHANNELS):
+ for iq in range(2):
+ # only update flagged values
+ if converged[iq, hi] == 1:
+ chain.h[hi].ota[iq].offset = int(values[iq, hi])
+
+ #for i,n in enumerate(values[0]):
+ ## only update flagged values
+ #if converged[0, i]:
+ #chain.h[i].otaA.offset = int(n)
+
+ #for i,n in enumerate(values[1]):
+ ## only update flagged values
+ #if converged[0, i]:
+ #chain.h[i].otaB.offset = int(n)
+
chain.write()
- sleep(1.0)
+ sleep(1.0) # TODO: better delay value?
- outs = chain_outputs(N_CHANNELS-1, converged)
+ outs = chain_outputs(converged, verbose=verbose)
+ # remove measured residual offset from mux to "see through" the pad buffer
for i in range(2):
outs[i,:] -= mux_offset[i]
- return array(outs)
+ return outs
+
+
+
dac.vina(1.25)
dac.vinb(1.25)
-N_CHANNELS = 48
-MUX_SETTLE_DELAY = 0.1
-N_SAMPLES = 1
-SAMPLE_DELAY = 0.02
import datetime as dt
-while True:
+while False:
if 1:
mux.otaA.mode = mux.otaA.CAL_BUF
mux.otaA.fast = 1
print
print 'Calibrating mux otaA'
adc.mux(4)
- resA, tmp = secant_opt(mux_a_offset, zeros((1,), dtype=int), [-128, 127])
+ resA, tmp = secant_opt(
+ mux_a_offset,
+ zeros((1,), dtype=int),
+ [-128, 127],
+ )
muxA_offset = mean([offset2signed(adc.read(), 16) for i in range(1000)])
print '*** mux.otaA.offset =', muxA_offset, ' @', resA
else:
print 'Calibrating mux otaB'
adc.mux(5)
#resB = secant_opt(mux_b_offset, 0, [-128, 127])
- resB, tmp = secant_opt(mux_b_offset, zeros((1,), dtype=int), [-128, 127])
+ resB, tmp = secant_opt(
+ mux_b_offset,
+ zeros((1,), dtype=int),
+ [-128, 127],
+ )
muxB_offset = mean([offset2signed(adc.read(), 16) for i in range(1000)])
print '*** mux.otaB.offset =', muxB_offset, ' @', resB
else:
mux.otaA.mode = mux.otaA.MUX_BUF
mux.otaB.mode = mux.otaB.MUX_BUF
mux.write()
- chain_os, stats = secant_opt(lambda x,y: chain_offsets(x, y, (muxA_offset, muxB_offset)), resA*ones((2,N_CHANNELS), dtype=int), (-128, 127), k=0.5)
+ #def secant_opt(func, x0, limits, elements=None, x1scale=0.1, maxiter=50):
+
+ chain_os, stats = secant_opt(
+ lambda x,y: chain_offsets(x, y, (muxA_offset, muxB_offset)),
+ resA*ones((2,N_CHANNELS), dtype=int),
+ (-128, 127),
+ verbose=True,
+ )
+
print '-'*80
print 'offsets:', chain_os[:N_CHANNELS]
print 'N-evals:', stats['nevals'][:N_CHANNELS]
)
break
+
+
+def calibrate(name, elements=None, verbose=True):
+ """Calibrate the block identified by name. If ids is specified, this is
+ an element or list of the specific elements to be calibrated, leaving
+ others alone.
+
+ NB: The calibration routines only modify the relevant ota.offset values,
+ and sets the .cal bit for the chains. All other settings (.fast, cint,
+ etc) are left un-touched.
+ """
+ #all amplifiers share this
+ limits = (-128, 127)
+
+ if name == 'chain':
+ def func(values, converging):
+ return chain_offsets(
+ values,
+ converging,
+ (int(mux.otaA.offset), int(mux.otaB.offset))
+ )
+
+ #setup initial guess, just use the current values
+ x0 = zeros((2, N_CHANNELS), dtype=int)
+ for i in range(N_CHANNELS):
+ for j in range(2):
+ x0[j,i] = chain.h[i].ota[j].offset
+ offsets, stats = secant_opt(func, x0, limits,
+ elements=elements, verbose=verbose)
+
+ elif name == 'mux':
+ #save mux state
+ old_mode = [mux.ota[i].mode for i in range(2)]
+
+ mux.otaA.mode = mux.otaA.CAL_CMP
+ mux.otaB.mode = mux.otaB.CAL_CMP
+ mux.write()
+ adc.mux(4)
+
+ def func(values, converging):
+ outA = mux_a_offset(values[0], converging[0], verbose)
+ outB = mux_b_offset(values[1], converging[1], verbose)
+ out = zeros((2,))
+ out[0] = outA
+ out[1] = outB
+ return out
+ return array([outA, outB])
+
+ #setup initial guess, just use the current values
+ x0 = zeros((2,), dtype=int)
+ for i in range(2):
+ x0[i] = int(mux.ota[i].offset)
+
+ offsets, stats = secant_opt(func, x0, limits,
+ elements=elements, verbose=verbose)
+
+ #restore mux state
+ for i,mode in enumerate(old_mode):
+ mux.ota[i].mode = mode
+ mux.write()
+
+ else:
+ print 'ERROR: unknown name %s' % name
+
+ return (offsets, stats)
+
+
+
+
+