# -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under a Creative Commons
Attribution-NonCommercial-NoDerivs 2.0 UK: England & Wales License.
"""
import os
import logging
import time
import types
import shlex
from datetime import timedelta, datetime
import tempfile
import numpy as np
from pint import UnitRegistry
from keithleygui.config.main import CONF as KCONF
from mercuryitc.mercury_driver import MercuryITC_TEMP
from customxepr.utils import EmailSender
from customxepr.experiment import ModePicture, XeprData, XeprParam
from customxepr.experiment.xepr_dataset import ParamGroupDSL
from customxepr.manager import Manager
from customxepr.config import CONF
try:
from XeprAPI import ExperimentError, ParameterError
except ImportError:
ExperimentError = RuntimeError
ParameterError = RuntimeError
_root = os.path.dirname(os.path.realpath(__file__))
logger = logging.getLogger(__name__)
ureg = UnitRegistry()
Q_ = ureg.Quantity
def cmp(a, b):
return bool(a > b) - bool(a < b) # convert possible numpy-bool to bool
# noinspection PyUnresolvedReferences
[docs]class CustomXepr(object):
"""
CustomXepr defines routines to control Bruker's Xepr software and to run full ESR
measurement cycles. When creating an instance of CustomXepr, you can pass instances
of :class:`XeprAPI.XeprAPI`, :class:`mercurygui.MercuryFeed` and
:py:class:`keithley2600.Keithley2600` to handle interactions with the respective
instruments.
All CustomXepr methods that do not end with '_sync' are executed in a worker thread
in the order of their calls. Scheduling of jobs and retrieval of results is handled
by :class:`manager.Manager`. For instructions on how to schedule your own
experiments, please refer to the documentation of :mod:`manager`.
Every asynchronous method has an equivalent method ending with '_sync' which will be
called immediately and block until it is done. Those synchronous equivalents are
generated at runtime and not documented here.
You can use :class:`CustomXepr` on its own, but it is recommended to start it with
the function :func:`startup.run` in the :mod:`startup` module. This will
automatically connect to available instruments and start CustomXepr's graphical user
interfaces.
:param xepr: Xepr instance from the Bruker Python XeprAPI. Defaults to `None` if not
provided.
:param mercuryfeed: :class:`mercurygui.MercuryFeed` instance for live feed from
MercuryiTC temperature controller. Defaults to `None` if not provided.
:param keithley: :class:`keithley2600.Keithley2600` instance from keithley2600
driver. Defaults to `None` if not provided.
"""
manager = Manager()
# ==================================================================================
# Set up basic CustomXepr functionality
# ==================================================================================
def __init__(self, xepr=None, mercury=None, keithley=None):
super(self.__class__, self).__init__()
self.emailSender = EmailSender(
mailhost=(CONF.get("SMTP", "mailhost"), CONF.get("SMTP", "port")),
fromaddr=CONF.get("SMTP", "fromaddr"),
credentials=CONF.get("SMTP", "credentials"),
secure=CONF.get("SMTP", "secure"),
)
# =====================================================================
# check if connections to Xepr, MercuryiTC and Keithley are present
# =====================================================================
self.xepr = xepr
self.mercury = mercury
self.keithley = keithley
# hidden Xepr experiment, created when EPR is connected:
self.hidden = None
self._check_for_xepr(raise_error=False)
self._check_for_keithley(raise_error=False)
if self._check_for_mercury(raise_error=False):
temperature_module_name = CONF.get("CustomXepr", "esr_temperature_nick")
cooling_module_name = CONF.get("CustomXepr", "cooling_temperature_nick")
(
self.esr_temperature,
self.esr_gasflow,
self.esr_heater,
) = self._select_temp_sensor(temperature_module_name)
self.cooling_sensor, _, _ = self._select_temp_sensor(cooling_module_name)
else:
self.esr_temperature = self.esr_gasflow = self.esr_heater = None
self.cooling_sensor = None
# =====================================================================
# define / load certain settings for customxepr functions
# =====================================================================
# settling time for cryostat temperature (in sec)
self._temp_wait_time = CONF.get("CustomXepr", "temp_wait_time")
# ESR temperature stability tolerance (in K)
self._temperature_tolerance = CONF.get(
"CustomXepr", "esr_temperature_tolerance"
)
# cooling temperature stability tolerance (in K)
self._max_cooling_temperature = CONF.get(
"CustomXepr", "max_cooling_temperature"
)
self._wait = 0.2 # waiting time for Xepr to process commands (in sec)
self._tuning_timeout = 60 # timeout for phase tuning (in sec)
self._last_qvalue = None # last measured Q-value
self._last_qvalue_err = None # last measured Q-value error
# =====================================================================
# interaction with manager
# =====================================================================
self.abort = self.manager.abort
if keithley is not None:
self.manager.abort_events.append(self.keithley.abort_event)
# =====================================================================
# create synchronous versions of CustomXepr methods
# =====================================================================
for attr_name in dir(self):
attr = getattr(self, attr_name)
if callable(attr) and hasattr(attr, "__wrapped__"):
setattr(
self, attr_name + "_sync", types.MethodType(attr.__wrapped__, self)
)
# ==================================================================================
# define basic functions for email notifications, pausing, etc.
# ==================================================================================
@property
def notify_address(self):
"""List with email addresses for status notifications."""
return self.manager.notify_address
@notify_address.setter
def notify_address(self, address_list):
"""Setter: List with email addresses for notifications."""
self.manager.notify_address = address_list
@property
def temp_wait_time(self):
"""Wait time until temperature is considered stable. Defaults to 120 sec."""
return self._temp_wait_time
@temp_wait_time.setter
def temp_wait_time(self, new_time):
"""Setter: Wait time until temperature is considered stable."""
self._temp_wait_time = new_time
# update config file
CONF.set("CustomXepr", "temp_wait_time", new_time)
@property
def temperature_tolerance(self):
"""Temperature fluctuation tolerance. Defaults to 0.1 Kelvin."""
return self._temperature_tolerance
@temperature_tolerance.setter
def temperature_tolerance(self, new_tol):
"""Setter: Temperature fluctuation tolerance."""
self._temperature_tolerance = new_tol
# update config file
CONF.set("CustomXepr", "temperature_tolerance", new_tol)
@manager.queued_exec
def sendEmail(self, body):
"""
Sends an email to the list of addresses given by :attr:`notify_address`.
The email server and sender address can be configured in the config file.
:param str body: Text to send.
"""
self.emailSender.sendmail(self.notify_address, "CustomXepr Notification", body)
@manager.queued_exec
def sleep(self, seconds):
"""
Pauses for the specified amount of seconds. Sleeping can be aborted
by the user.
:param int seconds: Number of seconds to pause.
"""
eta = time.time() + seconds
eta_string = time.strftime("%H:%M", time.localtime(eta))
message = "Waiting for {:.0f} seconds, ETA: {}.".format(seconds, eta_string)
logger.info(message)
# brake up into 1 sec sleep intervals, give option to abort
if seconds > 1:
for i in range(0, seconds):
time.sleep(1)
logger.debug("Waiting {:.0f}/{:.0f}.".format(i + 1, seconds))
# check for abort event
if self.abort.is_set():
logger.info("Aborted by user.")
return
# use a single sleep command for less than one second pause
else:
time.sleep(seconds)
# ==================================================================================
# set up Xepr functions
# ==================================================================================
@manager.queued_exec
def tune(self):
"""
Runs Xepr's built-in tuning routine.
"""
self._check_for_xepr()
idle_state = self.hidden["TuneState"].value
time.sleep(self._wait)
self.hidden["OpMode"].value = "Tune"
time.sleep(self._wait)
self.hidden["Tune"].value = "Up"
time.sleep(self._wait)
while self.hidden["TuneState"].value == idle_state:
if self.abort.is_set():
self.hidden["Tune"].value = "Stop"
time.sleep(self._wait)
logger.info("Tuning aborted by user.")
return
else:
time.sleep(1)
while self.hidden["TuneState"].value != idle_state:
if self.abort.is_set():
self.hidden["Tune"].value = "Stop"
time.sleep(self._wait)
logger.info("Tuning aborted by user.")
return
else:
time.sleep(1)
@manager.queued_exec
def finetune(self):
"""
Runs Xepr's built-in finetuning routine.
"""
self._check_for_xepr()
idle_state = self.hidden["TuneState"].value
self.hidden["Tune"].value = "Fine"
time.sleep(self._wait)
while self.hidden["TuneState"].value == idle_state:
if self.abort.is_set():
self.hidden["Tune"].value = "Stop"
time.sleep(self._wait)
logger.info("Tuning aborted by user.")
return
else:
time.sleep(1)
while self.hidden["TuneState"].value != idle_state:
if self.abort.is_set():
self.hidden["Tune"].value = "Stop"
time.sleep(self._wait)
logger.info("Tuning aborted by user.")
return
else:
time.sleep(1)
@manager.queued_exec
def customtune(self, low_q=False):
"""
Custom tuning routine with higher accuracy. It takes longer than :meth:`tune`
and requires the spectrometer to be already close to tuned. In case of lossy
samples, you can set ``lowQ`` to ``True`` so that the tuning routine will cycle
through a smaller range of microwave powers. For lossy samples where the Q-value
will be lower than 3000, it is recommended to manually tune the cavity.
:param bool low_q: If ``True``, the tuning routine will be adjusted for lossy
samples.
"""
self._check_for_xepr()
iris_tolerance = 3 if low_q else 1
bias_tolerance = 3 if low_q else 1
freq_tolerance = 5 if low_q else 2
logger.info("Tuning.")
# save current operation mode and attenuation
mode = self.hidden["OpMode"].value
time.sleep(self._wait)
atten_start = self.hidden["PowerAtten"].value
time.sleep(self._wait)
# switch mode to 'Operate'
if not mode == "Operate":
self.hidden["OpMode"].value = "Operate"
time.sleep(self._wait)
dB_min = 10 if not low_q else 20
dB_max = 50 if not low_q else 45
# tune frequency and phase at 30 dB
self.hidden["PowerAtten"].value = 30
time.sleep(self._wait)
self.tuneFreq(freq_tolerance)
time.sleep(self._wait)
self.tunePhase()
time.sleep(self._wait)
# tune bias of reference arm at dB_max
# (where diode current is determined by reference arm)
self.hidden["PowerAtten"].value = dB_max
time.sleep(self._wait)
self.tuneBias(bias_tolerance)
time.sleep(self._wait)
# tune iris at 40 dB and 30 dB
for atten in [40, 30]:
# check for abort event
if self.abort.is_set():
self.hidden["PowerAtten"].value = atten_start
time.sleep(self._wait)
logger.info("Aborted by user.")
return
self.hidden["PowerAtten"].value = atten
time.sleep(self._wait)
self.tuneIris(iris_tolerance)
time.sleep(self._wait)
# tune iris and phase and frequency at 20 dB and dB_min
for atten in [20, dB_min]:
# check for abort event, clear event
if self.abort.is_set():
self.hidden["PowerAtten"].value = atten_start
time.sleep(self._wait)
logger.info("Aborted by user.")
return
self.hidden["PowerAtten"].value = atten
time.sleep(self._wait)
self.tunePhase()
time.sleep(self._wait)
self.tuneIris(iris_tolerance)
time.sleep(self._wait)
self.tuneFreq(freq_tolerance)
time.sleep(self._wait)
# tune bias at dB_max
self.hidden["PowerAtten"].value = dB_max
time.sleep(self._wait)
self.tuneBias(bias_tolerance)
time.sleep(self._wait)
# tune iris at 15 dB
self.hidden["PowerAtten"].value = 20
time.sleep(self._wait)
self.tuneIris(iris_tolerance)
time.sleep(self._wait)
# tune bias at dB_max
self.hidden["PowerAtten"].value = dB_max
time.sleep(self._wait)
self.tuneBias(bias_tolerance)
time.sleep(self._wait)
# tune iris at dB_min
self.hidden["PowerAtten"].value = dB_min
time.sleep(self._wait)
self.tuneIris(iris_tolerance)
time.sleep(self._wait)
# reset attenuation to original value, tune frequency again
self.hidden["PowerAtten"].value = atten_start
time.sleep(self._wait)
self.tuneFreq(freq_tolerance)
time.sleep(self._wait)
logger.debug("Tuning done.")
@manager.queued_exec
def tuneBias(self, tolerance=1):
"""
Tunes the diode bias only. A perfectly tuned bias results in a diode current of
200 mA for all microwave powers.
:param int tolerance: Minimum diode current offset that must be achieved before
:meth:`tuneBias` returns.
"""
self._check_for_xepr()
# check for abort event
if self.abort.is_set():
return
logger.debug("Tuning (Bias).")
time.sleep(self._wait)
# get offset from 200 mA
diff = self.hidden["DiodeCurrent"].value - 200
time.sleep(self._wait)
tolerance1 = 10 # tolerance for fast tuning
tolerance2 = tolerance # tolerance for second fine tuning
# rapid tuning with high tolerance and large steps
while abs(diff) > tolerance1:
# check for abort event
if self.abort.is_set():
return
step = 1 * cmp(0, diff) # coarse step of 1
self.XeprCmds.aqParStep(
"AcqHidden", "*cwBridge.SignalBias", "Coarse {}".format(step)
) # TODO: migrate from XeprCmds
time.sleep(0.5)
diff = self.hidden["DiodeCurrent"].value - 200
time.sleep(self._wait)
# fine tuning with low tolerance and small steps
while abs(diff) > tolerance2:
# check for abort event
if self.abort.is_set():
return
step = 5 * cmp(0, diff) # fine step of 5
self.XeprCmds.aqParStep(
"AcqHidden", "*cwBridge.SignalBias", "Fine {}".format(step)
) # TODO: migrate from XeprCmds
time.sleep(0.5)
diff = self.hidden["DiodeCurrent"].value - 200
time.sleep(self._wait)
@manager.queued_exec
def tuneIris(self, tolerance=1):
"""
Tunes the cavity's iris only. A perfectly tuned iris results in a diode current
of 200 mA for all microwave powers.
:param int tolerance: Minimum diode current offset that must be achieved before
:meth:`tuneIris` returns.
"""
self._check_for_xepr()
# check for abort event
if self.abort.is_set():
return
logger.debug("Tuning (Iris).")
time.sleep(self._wait)
diff = self.hidden["DiodeCurrent"].value - 200
while abs(diff) > tolerance:
# check for abort event
if self.abort.is_set():
return
if diff < 0:
cmd = "*cwBridge.IrisUp"
elif diff > 0:
cmd = "*cwBridge.IrisDown"
else:
return
# determine step size for iris adjustment: slower adjustment when
# close to a diode current of 200, minimum step size of 0.3
step_size = max(abs(diff), 30) * 0.01
# scale step size for MW power: smaller steps at higher power
step = step_size * (self.hidden["PowerAtten"].value ** 2) / 400
time.sleep(self._wait)
# set value to 0.1 if step is smaller
# (usually only happens below 10dB)
step = max(step, 0.1)
# increase waiting time between steps when close to tuned
# with a maximum waiting of 1 sec
wait = min(5 / (abs(diff) + 0.1), 1)
self.XeprCmds.aqParSet(
"AcqHidden", cmd, "True"
) # TODO: migrate from XeprCmds
time.sleep(step)
self.XeprCmds.aqParSet(
"AcqHidden", cmd, "False"
) # TODO: migrate from XeprCmds
time.sleep(wait)
diff = self.hidden["DiodeCurrent"].value - 200
time.sleep(self._wait)
@manager.queued_exec
def tuneFreq(self, tolerance=2):
"""
Tunes the microwave frequency only, to a lock offset close to zero.
:param int tolerance: Minimum lock offset that must be achieved before
:meth:`tuneFreq` returns.
"""
self._check_for_xepr()
# check for abort event
if self.abort.is_set():
return
logger.debug("Tuning (Freq).")
time.sleep(self._wait)
fq_offset = self.hidden["LockOffset"].value
time.sleep(self._wait)
while abs(fq_offset) > tolerance:
# check for abort event
if self.abort.is_set():
return
step = 1 * cmp(0, fq_offset) * max(abs(int(fq_offset / 10)), 1)
self.XeprCmds.aqParStep(
"AcqHidden", "*cwBridge.Frequency", "Fine {}".format(step)
)
time.sleep(1)
fq_offset = self.hidden["LockOffset"].value
time.sleep(self._wait)
@manager.queued_exec
def tunePhase(self):
"""
Tunes the phase of the MW reference arm only such that the diode current is
maximized.
"""
self._check_for_xepr()
# check for abort event
if self.abort.is_set():
return
logger.debug("Tuning (Phase).")
time.sleep(self._wait)
t0 = time.time()
# get current phase and range
phase0 = self.hidden["SignalPhase"].value
time.sleep(self._wait)
phase_min = self.hidden["SignalPhase"].aqGetParMinValue()
time.sleep(self._wait)
phase_max = self.hidden["SignalPhase"].aqGetParMaxValue()
time.sleep(self._wait)
phase_step = self.hidden["SignalPhase"].aqGetParCoarseSteps()
time.sleep(self._wait)
# determine the direction of increasing diode current
diode_curr_array = np.array([])
interval_min = max(phase0 - 3 * phase_step, phase_min)
interval_max = min(phase0 + 4 * phase_step, phase_max)
phase_array = np.arange(interval_min, interval_max, phase_step)
for phase in phase_array:
# Check for abort event
if self.abort.is_set():
return
# Abort if phase at limit
if self._phase_at_limit(phase, phase_min, phase_max):
return
self.hidden["SignalPhase"].value = phase
time.sleep(1)
diode_curr = self.hidden["DiodeCurrent"].value
time.sleep(self._wait)
diode_curr_array = np.append(diode_curr_array, diode_curr)
if time.time() - t0 > self._tuning_timeout:
logger.info("Phase tuning timeout.")
break
upper = np.mean(diode_curr_array[phase_array > phase0])
lower = np.mean(diode_curr_array[phase_array < phase0])
direction = cmp(upper, lower)
# determine position of maximum phase by stepping until phase deceases again
self.hidden["SignalPhase"].value = phase0
time.sleep(1)
diode_curr_new = self.hidden["DiodeCurrent"].value
time.sleep(self._wait)
phase_array = np.array([phase0])
diode_curr_array = np.array([diode_curr_new])
new_phase = phase0
while diode_curr_new > np.max(diode_curr_array) - 15:
# get next phase step
new_phase += direction * phase_step
# check for abort event
if self.abort.is_set():
return
# check for limits of diode range, readjust iris if necessary and abort
if diode_curr_new in [0, 400]:
self.tuneIris(tolerance=10)
return
# abort if phase at limit
if self._phase_at_limit(new_phase, phase_min, phase_max):
return
# get new reading
self.hidden["SignalPhase"].value = new_phase
time.sleep(1)
diode_curr_new = self.hidden["DiodeCurrent"].value
time.sleep(self._wait)
diode_curr_array = np.append(diode_curr_array, diode_curr_new)
phase_array = np.append(phase_array, new_phase)
# timeout if Xepr is not responsive
if time.time() - t0 > self._tuning_timeout:
logger.info("Phase tuning timeout.")
break
# set phase to the best value
best_phase = phase_array[np.argmax(diode_curr_array)]
self.hidden["SignalPhase"].value = best_phase
time.sleep(self._wait)
@manager.queued_exec
def getQValueFromXepr(self):
"""
Gets the Q-Value as determined by Xepr, averaged over 20 readouts.
:returns: Measured Q-Value.
:rtype: float
"""
self._check_for_xepr()
wait_old = self._wait
self._wait = 1
logger.info("Reading Q-value.")
att = self.hidden["PowerAtten"].value # remember current attenuation
time.sleep(self._wait)
self.hidden["OpMode"].value = "Tune"
time.sleep(self._wait)
self.hidden["RefArm"].value = "On"
time.sleep(self._wait)
self.hidden["PowerAtten"].value = 33
time.sleep(self._wait)
self.hidden["ModeZoom"].value = 2
time.sleep(self._wait)
q_values = np.array([])
time.sleep(1)
for iteration in range(0, 40):
# check for abort event
if self.abort.is_set():
logger.info("Aborted by user.")
return
q_values = np.append(q_values, self.hidden["QValue"].value)
time.sleep(1)
self.hidden["PowerAtten"].value = 32
time.sleep(self._wait)
self.hidden["ModeZoom"].value = 1
time.sleep(self._wait)
self.hidden["RefArm"].value = "On"
time.sleep(self._wait)
self.hidden["OpMode"].value = "Operate"
time.sleep(3)
self.tuneFreq()
self.tuneFreq()
self.tuneBias()
self.tuneFreq()
self.hidden["PowerAtten"].value = att
time.sleep(self._wait)
q_mean = q_values.mean()
q_stderr = q_values.std()
if q_mean > 3000:
logger.info("Q = {:.0f}+/-{:.0f}.".format(q_mean, q_stderr))
elif q_mean <= 3000:
logger.warning(
"Q = {:.0f}+/-{:.0f} is very small. Please check on "
"experiment.".format(q_mean, q_stderr)
)
self._wait = wait_old
return q_mean
@manager.queued_exec
def getQValueCalc(self, path=None):
"""
Calculates the Q-value by fitting the cavity mode picture to a Lorentzian
resonance with a polynomial baseline. It uses all available zoom factors to
resolve both sharp and broad resonances (high and low Q-values, respectively)
and is therefore more accurate than :meth:`getQValueFromXepr`.
:param str path: Filename where Q-Value mode picture is saved together with any
metadata (temperature, time stamp).
:returns: Mode picture instance.
:rtype: :class:`experiment.ModePicture`
"""
self._check_for_xepr()
wait_old = self._wait
self._wait = 1
logger.info("Reading Q-value.")
att = self.hidden["PowerAtten"].value # remember current attenuation
time.sleep(self._wait)
freq = self.hidden["FrequencyMon"].value # get current frequency
time.sleep(self._wait)
self.hidden["OpMode"].value = "Tune"
time.sleep(self._wait)
self.hidden["RefArm"].value = "Off"
time.sleep(self._wait)
self.hidden["LogScaleEnab"].value = False # ensure linear scale mode picture
time.sleep(self._wait)
self.hidden["PowerAtten"].value = 33
time.sleep(1)
self.hidden["PowerAtten"].value = 20
time.sleep(2)
# collect mode pictures for different zoom levels
mode_pic_data = {}
for mode_zoom in [1, 2, 4, 8]:
# check for abort event
if self.abort.is_set():
logger.info("Aborted by user.")
return
y_data = np.array([])
self.hidden["ModeZoom"].value = mode_zoom
time.sleep(2)
n_points = int(self.hidden["DataRange"][1])
time.sleep(self._wait)
for i in range(0, n_points):
y_data = np.append(y_data, self.hidden["Data"][i])
mode_pic_data[mode_zoom] = y_data
mp = ModePicture(mode_pic_data, freq)
self._last_qvalue = mp.qvalue
self._last_qvalue_err = mp.qvalue_stderr
self.hidden["PowerAtten"].value = 30
time.sleep(self._wait)
self.hidden["ModeZoom"].value = 1
time.sleep(self._wait)
self.hidden["RefArm"].value = "On"
time.sleep(self._wait)
self.hidden["OpMode"].value = "Operate"
time.sleep(2)
self.tuneFreq()
self.tuneFreq()
self.tuneBias()
self.tuneFreq()
self.hidden["PowerAtten"].value = att
time.sleep(self._wait)
if mp.qvalue > 3000:
logger.info("Q = {:.0f}+/-{:.0f}.".format(mp.qvalue, mp.qvalue_stderr))
elif mp.qvalue <= 3000:
logger.warning(
"Q = {:.0f}+/-{:.0f} is very small. Please check on "
"experiment.".format(mp.qvalue, mp.qvalue_stderr)
)
# add temperature to metadata
if self._check_for_mercury(raise_error=False):
temperature = self.esr_temperature.temp[0]
else:
temperature = 298
mp.metadata["Temperature"] = f"{temperature:.3f}K"
if path is not None:
path = os.path.expanduser(path)
dirname = os.path.dirname(path)
os.makedirs(dirname, exist_ok=True)
mp.save(path)
self._wait = wait_old
return mp
[docs] @staticmethod
def getExpDuration(exp):
"""
Estimates the time required to run the given experiment. The returned value is
given in seconds and is a lower limit, the actual run time may be longer due to
fine-tuning between scans, waiting times for stabilization and flybacks.
:param exp: Xepr experiment object to run.
:returns: Estimated experiment duration in seconds.
:rtype: float
"""
sweep_time_par = exp["signalChannel.SweepTime"]
sweep_time = Q_(sweep_time_par.value, sweep_time_par.aqGetParUnits())
field_delay_par = exp["fieldCtrl.Delay"] # given in s
field_delay = Q_(field_delay_par.value, field_delay_par.aqGetParUnits())
nb_scans = exp["NbScansToDo"].value
ramp_step_time = (
sweep_time + field_delay
) * nb_scans # total time for one step
# check if we have a secondary axis
if "ramp2.*" in exp:
if "User defined" in exp["ramp2.sweepType"].value:
nb_ramp = len(
exp["ramp2.SweepData"].value.split()
) # returns a space delimited str
else:
nb_ramp = exp["ramp2.NbPoints"].value
else:
nb_ramp = 1
if "delay2.*" in exp:
ramp_delay_par = exp["delay2.Delay"]
ramp_delay = Q_(ramp_delay_par.value, ramp_delay_par.aqGetParUnits())
else:
ramp_delay = 0
total = (ramp_step_time + ramp_delay) * nb_ramp
return float(total / Q_("1 sec"))
@manager.queued_exec
def runXeprExperiment(
self, exp, retune=True, settling_time=0, path=None, callback=None, **kwargs
):
"""
Runs the Xepr experiment ``exp``. Keyword arguments ``kwargs`` allow the user to
pass experiment settings to Xepr (e.g., 'ModAmp' for modulation amplitude).
Allowed parameters will depend on the type of experiment and its functional
units, e.g., 'mwBridge', 'fieldCtrl' etc. You can get a list of all units and
their parameters for a given experiment ``exp`` as follows:
>>> print(exp.getFuList())
['acqStart',
'fieldCtrl',
'fieldSweep',
'freqCounter',
'mwBridge',
'recorder',
'scanEnd',
'signalChannel']
>>> print(exp.getFuParList('mwBridge'))
['AcqFineTuning',
'AcqScanFTuning',
'AcqSliceFTuning',
'BrConnStatus',
'BridgeCalib',
'EMBType',
'EWSMBC',
'EmbBridge',
'Power',
'PowerAt0DB',
'PowerAtten',
'PowerAttenMon',
'QValue',
'TuneStateExpMon']
If a temperature controller is connected, CustomXepr monitors the temperature
during the measurement and emits warnings when fluctuations repeatedly exceed
the given temperature stability requirement.
If the ``path`` argument is given, the resulting data set is saved to the drive.
Otherwise, a temporary file will be created (those are deleted periodically by
the operating system). The temperature and its stability as well as the last-
measurement Q-value (if available) are written to the Bruker '.DSC' file.
:param exp: Xepr experiment object to run.
:param bool retune: Retune iris and freq between scans (default: True).
:param settling_time: Settling time in seconds between individual slice scans
(default: 0).
:param str path: Path to file. If given, the data set will be saved to this
path, otherwise, a temporary file will be created. No Xepr file name
restrictions apply.
:param callback: Optional function to be called between individual scans. Will
be called after retuning and the optional settling time. Will be called
with the number of completed scans as a single argument. May return a single
number which will be saved in the DSC file.
:param kwargs: Keyword arguments corresponding to Xepr experiment parameters.
Allowed parameters will depend on the type of experiment.
:returns: Xepr dataset.
:rtype: :class:`experiment.XeprData`
"""
self._check_for_xepr()
seconds = int(settling_time)
pause_between_scans = callback or retune or settling_time > 0
callback_results = []
# ----------- set experiment parameters if given in kwargs ---------------------
for key, value in kwargs.items():
exp[key].value = value
time.sleep(self._wait)
# ----------- notify user, estimate runtime for cw experiments -----------------
try:
duration = timedelta(seconds=self.getExpDuration(exp))
eta = datetime.now() + duration
logger.info(
'Measurement "{0}" is running. Estimated duration: {1} min (ETA {2}).'.format(
exp.aqGetExpName(),
int(duration.total_seconds() / 60),
eta.strftime("%H:%M"),
)
)
except ParameterError:
logger.info('Measurement "{0}" is running.')
# ----------- start experiment -------------------------------------------------
has_mercury = self._check_for_mercury(raise_error=False)
if has_mercury:
temperature_fluct_history = np.array([])
temperature_setpoint = self.esr_temperature.loop_tset
n_temperature_volatile = 0
else:
temperature_fluct_history = None
temperature_setpoint = None
n_temperature_volatile = None
if has_mercury and not self._cooling_temperature_ok():
return
exp.select()
time.sleep(self._wait)
exp.aqExpRun()
time.sleep(self._wait)
# wait for experiment to start
while not exp.isRunning:
time.sleep(self._wait)
if pause_between_scans: # schedule pause after scan to tune / settle
time.sleep(1)
exp.aqExpPause()
time.sleep(self._wait)
def is_running_or_paused():
running = exp.isRunning
time.sleep(self._wait)
paused = exp.isPaused
time.sleep(self._wait)
return running or paused
while is_running_or_paused():
# check for abort event
if self.abort.is_set():
exp.aqExpPause()
exp.aqExpAbort()
time.sleep(self._wait)
logger.info("Aborted by user.")
return
nb_scans_done = exp["NbScansDone"].value
time.sleep(self._wait)
nb_scans_to_do = exp["NbScansToDo"].value
time.sleep(self._wait)
logger.debug(
"Recording scan {:.0f}/{:.0f}.".format(
nb_scans_done + 1, nb_scans_to_do
)
)
between_scans = exp.isPaused and not nb_scans_done == nb_scans_to_do
if between_scans:
if settling_time > 0:
# wait for requested settling time
for i in range(0, seconds):
time.sleep(1)
logger.debug("Waiting {:.0f}/{:.0f}.".format(i + 1, seconds))
# check for abort event
if self.abort.is_set():
logger.info("Aborted by user.")
return
if retune:
# tune frequency and iris when a new slice scan starts
if exp.isPaused and not nb_scans_done == nb_scans_to_do:
logger.debug("Checking tuned.")
self.tuneFreq(tolerance=3)
self.tuneFreq(tolerance=3)
self.tuneIris(tolerance=7)
if callback is not None:
res = callback(nb_scans_done)
callback_results.append(res)
# start next scan
exp.aqExpRun()
time.sleep(self._wait)
# wait for scan to start and schedule next pause
while not exp.isRunning:
time.sleep(1)
exp.aqExpPause()
time.sleep(self._wait)
# check cryostat and cooling water temperatures
if has_mercury:
if not self._cooling_temperature_ok():
return
diff = abs(self.esr_temperature.temp[0] - temperature_setpoint)
temperature_fluct_history = np.append(temperature_fluct_history, diff)
# increment the number of violations n_out if temperature is unstable
n_temperature_volatile += diff > 4 * self._temperature_tolerance
# warn once for every 120 temperature violations
if np.mod(n_temperature_volatile, 120) == 1:
max_diff = np.max(temperature_fluct_history)
logger.warning(
"Temperature fluctuations of +/-{:.2f}K.".format(max_diff)
)
n_temperature_volatile += (
1 # prevent from warning again the next second
)
# Pause measurement and raise error after 15 min of instability
if n_temperature_volatile > 60 * 15:
exp.aqExpPause()
raise RuntimeError(
"Temperature could not be kept stable for "
+ "15 min. Aborting current measurement and "
+ "pausing all pending jobs."
)
time.sleep(1)
# get temperature stability during scan if mercury was connected
if has_mercury:
max_diff = np.max(temperature_fluct_history)
logger.info(
"Temperature stable at ({:.2f}+/-{:.2f})K during "
"scans.".format(temperature_setpoint, max_diff)
)
logger.info("All scans complete.")
# ----------- save data with custom parameters ---------------------------------
# switch viewpoint to experiment which just finished running
time.sleep(self._wait)
exp_title = exp.aqGetExpName()
time.sleep(self._wait)
self.XeprCmds.aqExpSelect(1, exp_title)
time.sleep(self._wait)
# save the data to tmp file, this insures that we always save to a file path
# that Xepr can handle
with tempfile.NamedTemporaryFile(prefix="autosave_", delete=False) as f:
tmp_path = f.name
title = os.path.splitext(os.path.basename(path or tmp_path))[0]
self._saveData(tmp_path, exp=exp, title=title)
time.sleep(self._wait)
# add temperature data and Q-value if available
basename = tmp_path.split(".")[0]
dsc_path = basename + ".DSC"
dset = XeprData(dsc_path)
if self._last_qvalue is not None:
dsl_mwbridge = dset.dsl.groups["mwBridge"]
dsl_mwbridge.pars["QValue"] = XeprParam("QValue", round(self._last_qvalue))
dsl_mwbridge.pars["QValueErr"] = XeprParam(
"QValueErr", round(self._last_qvalue_err)
)
if has_mercury:
pars = [
XeprParam("Temperature", temperature_setpoint, "K"),
XeprParam("Stability", round(max_diff, 4), "K"),
XeprParam("AcqWaitTime", self._temp_wait_time, "s"),
XeprParam("Tolerance", self._temperature_tolerance, "K"),
]
dset.dsl.add_group(ParamGroupDSL("tempCtrl", pars))
if callback is not None:
try:
callback_results = list(map(float, callback_results))
except TypeError:
pass
else:
pars = [XeprParam("CallbackResults", np.array(callback_results))]
dset.dsl.add_group(ParamGroupDSL("CustomXepr", pars))
if retune:
dset.pars["AcqFineTuning"] = "Slice" # TODO: confirm correct value
dset.pars["AcqSliceFTuning"] = "On"
new_path = path or tmp_path
dset.save(new_path)
logger.info('Data saved to "{}".'.format(new_path))
# switch to saved folder
escaped_dirname = shlex.quote(os.path.dirname(new_path))
self.XeprCmds.ddPath(escaped_dirname)
time.sleep(self._wait)
return dset
def _cooling_temperature_ok(self):
if self.cooling_sensor:
cool_t_kelvin = self.cooling_sensor.temp[0]
cool_t_deg_c = round(cool_t_kelvin - 273.15, 2)
if cool_t_kelvin > self._max_cooling_temperature:
logger.warning(
"Cooling temperature at {} Celsius. Aborting "
"measurement.".format(cool_t_deg_c)
)
self.setStandby()
self.manager.pause_worker()
return False
else:
return True
else:
return True
@manager.queued_exec
def saveCurrentData(self, path, exp=None):
"""
Saves the data from a given experiment in Xepr to the specified path. If ``exp``
is `None` the currently displayed data set is saved.
Xepr only allows file paths shorter than 128 characters.
.. note::
To save a just completed measurement, please use the ``path`` argument of
:func:`runXeprExperiment`. This will automatically add temperature stability
and Q-value information to your data files.
:param str path: Absolute path to save data file. The path must be compatible
with Xepr, i.e., it must be shorter than 128 characters.
:param exp: Xepr experiment instance associated with data set. Defaults to
currently selected experiment if not given.
"""
print(
'To save a just completed measurement, please use the "path" argument '
+ 'of "runXeprExperiment". This will automatically add temperature '
+ "stability and Q-value information to your data files."
)
self._saveData(path, exp)
logger.info('Data saved to "{}".'.format(path))
@manager.queued_exec
def setStandby(self):
"""
Sets the magnetic field to zero and the MW bridge to standby.
"""
self._check_for_xepr()
# check if WindDown experiment already exists, otherwise create
try:
wd = self.xepr.XeprExperiment("WindDown")
time.sleep(self._wait)
except ExperimentError:
wd = self.xepr.XeprExperiment(
"WindDown", exptype="C.W.", axs1="Field", ordaxs="Signal channel"
)
time.sleep(self._wait)
wd.aqExpActivate()
time.sleep(self._wait)
wd["CenterField"].value = 0
time.sleep(self._wait)
wd["AtCenter"].value = True
time.sleep(self._wait)
self.hidden["OpMode"].value = "Tune"
time.sleep(3)
self.hidden["OpMode"].value = "Stand By"
time.sleep(self._wait)
logger.info("EPR set to standby.")
def _saveData(self, path, exp=None, title=None):
"""
Saves the data from a given experiment in Xepr to the specified path. If ``exp``
is `None` the currently displayed data set is saved.
Xepr only allows file paths shorter than 128 characters.
:param str path: Absolute path to save data file. Must be shorter than 128
characters and must comply with possibly other Xepr file name restrictions.
:param exp: Xepr experiment instance associated with data set. Defaults to
currently selected experiment if not given.
:param str title: Name of the data set. Will be saved as a parameter in the DSC
file. If not given, the basename of the path will be used.
"""
self._check_for_xepr()
path = os.path.expanduser(path)
if len(path) > 128:
raise ValueError(
"Only paths with with 128 characters or less are " + "allowed by Xepr."
)
directory, basename = os.path.split(path)
if not title:
title = os.path.splitext(basename)[0]
# check if directory exists, create otherwise
if not os.path.exists(directory):
os.makedirs(directory)
# switch viewpoint to experiment if given
if exp is not None:
exp_title = exp.aqGetExpName()
time.sleep(self._wait)
self.XeprCmds.aqExpSelect(1, exp_title)
time.sleep(self._wait)
# tell Xepr to save data
self.XeprCmds.vpSave("Current Primary", title, shlex.quote(path))
time.sleep(self._wait)
def _phase_at_limit(self, phase, phase_min, phase_max):
assert phase_max > phase_min
deg_step = 6.5 # approximate step of 1 deg
if phase_min < phase < phase_max:
return False
else:
# shift by 360° if maximum or minimum is encountered
direction = int(phase <= phase_min) - int(phase >= phase_max)
self.hidden["SignalPhase"].value = phase + direction * 360 * deg_step
logger.info("Phase at limit, cycling by 360 deg.")
time.sleep(4)
return True
# ==================================================================================
# set up cryostat functions
# ==================================================================================
@manager.queued_exec
def setTemperature(self, target, wait_stable=True):
"""
Sets the target temperature for the ESR900 cryostat and waits for it to
stabilize within :attr:`temp_wait_time` with fluctuations below
:attr:`temperature_tolerance`. Warns the user if this takes too long.
:param float target: Target temperature in Kelvin.
:param bool wait_stable: If ``True``, this function will wait until the
temperature is stable before it returns. See :func:`waitTemperatureStable`
"""
self._check_for_mercury()
logger.info("Setting target temperature to {}K.".format(target))
# set temperature and wait to stabilize
self.esr_temperature.loop_tset = target
if wait_stable:
self.waitTemperatureStable(target)
# check if gas flow is too high for temperature set point
# if yes, reduce minimum value until target is reached
ht = self._heater_target(target)
fmin = self.esr_gasflow.gmin
above_heater_target = self.esr_heater.volt[0] > 1.1 * ht
flow_at_min = self.esr_gasflow.perc[0] == fmin
if above_heater_target and flow_at_min:
logger.warning("Gas flow is too high, trying to reduce.")
self.esr_temperature.loop_faut = "ON"
self.esr_gasflow.gmin = max(fmin - 1, 1)
self.waitTemperatureStable(target, wait_time=30)
@manager.queued_exec
def setTemperatureRamp(self, ramp):
"""
Sets the temperature ramp speed for the cryostat in K/min.
:param float ramp: Ramp in Kelvin per minute.
"""
self._check_for_mercury()
# set temperature and wait to stabilize
self.esr_temperature.loop_rset = ramp
logger.info("Temperature ramp set to {} K/min.".format(ramp))
@manager.queued_exec
def waitTemperatureStable(self, target, tolerance=None, wait_time=None):
"""
Waits for the cryostat temperature to stabilize.
:param float target: Target temperature in Kelvin.
:param float tolerance: Allowed fluctuations in kelvin. Defaults to
:attr:`temperature_tolerance` if not given.
:param float wait_time: Time to wait for temperature to remain stable before
returning (seconds). Defaults to :attr:`temp_wait_time` if not given.
"""
# time in sec after which a timeout warning is issued
temperature_timeout = self._ramp_time(target) + 30 * 60 # in sec
# counter for elapsed seconds since temperature has been stable
stable_counter = 0
# counter for temperature warnings
temperature_warning_counter = 0
# starting time
t0 = time.time()
logger.info("Waiting for temperature to stabilize.")
while stable_counter < (wait_time or self._temp_wait_time):
# check for abort command
if self.abort.is_set():
logger.info("Aborted by user.")
return
# check temperature deviation
self.T_diff = abs(target - self.esr_temperature.temp[0])
if self.T_diff > (tolerance or self._temperature_tolerance):
stable_counter = 0
time.sleep(1)
logger.debug("Waiting for temperature to stabilize.")
else:
stable_counter += 1
logger.debug(
"Stable for {}/{} sec.".format(
stable_counter, wait_time or self._temp_wait_time
)
)
time.sleep(1)
# warn if stabilization is taking longer than expected
if (
time.time() - t0 > temperature_timeout
and temperature_warning_counter == 0
):
logger.warning("Temperature is taking a long time to stabilize.")
t0 = time.time()
temperature_timeout = self._ramp_time(target) + 30 * 60
temperature_warning_counter += 1
message = "Mercury iTC: Temperature is stable at {}K.".format(target)
logger.info(message)
@manager.queued_exec
def getTemperature(self):
"""Returns the current temperature in Kelvin."""
self._check_for_mercury()
return self.esr_temperature.temp[0]
@manager.queued_exec
def getTemperatureSetpoint(self):
"""Returns the temperature setpoint in Kelvin."""
self._check_for_mercury()
return self.esr_temperature.loop_tset
@staticmethod
def _heater_target(temperature, htt_file=None):
"""
Returns the ideal heater voltage for a given temperature. This function can be
used to check the current gas flow: If the heater voltage exceeds its target
value, the gas flow likely is too high (and vice versa).
:func:`heater_target` accepts a file path ``htt_file`` to a custom heater target
table file, used instead of the default values for the ESR900 cryostat. The file
must contain comma-delimited pairs of temperature (in Kelvin) and heater target
voltage (in Volts) with a new line for each pair.
:param float temperature: Temperature in Kelvin.
:param str htt_file: Path to file with custom heater target table.
"""
if htt_file is None:
htt_file = os.path.join(_root, "experiment", "mercury_htt.txt")
htt = np.loadtxt(htt_file, delimiter=",")
return np.interp(temperature, htt[:, 0], htt[:, 1])
def _ramp_time(self, target):
"""
Calculates the expected time in sec to reach the target temperature.
Assumes a ramp speed of 5 K/min if 'ramp' is turned off.
:param float target: Target temperature in Kelvin.
"""
if self.esr_temperature.loop_rena == "ON":
expected_time = (
abs(target - self.esr_temperature.temp[0])
/ self.esr_temperature.loop_rset
) # in min
else: # assume ramp of 5 K/min
expected_time = abs(target - self.esr_temperature.temp[0]) / 5
return expected_time * 60 # return value in sec
# ==================================================================================
# set up Keithley functions
# ==================================================================================
@manager.queued_exec
def transferMeasurement(
self,
smu_gate=KCONF.get("Sweep", "gate"),
smu_drain=KCONF.get("Sweep", "drain"),
vg_start=KCONF.get("Sweep", "VgStart"),
vg_stop=KCONF.get("Sweep", "VgStop"),
vg_step=KCONF.get("Sweep", "VgStep"),
vd_list=KCONF.get("Sweep", "VdList"),
t_int=KCONF.get("Sweep", "tInt"),
delay=KCONF.get("Sweep", "delay"),
pulsed=KCONF.get("Sweep", "pulsed"),
path=None,
):
"""
Records a transfer curve and returns the resulting data. If a valid path is path
given, the data is also saved as a .txt file.
:param smu_gate: Name of SMU attached to the gate electrode of an FET.
:param smu_drain: Name of SMU attached to the drain electrode of an FET.
:param float vg_start: Start voltage of transfer sweep in Volts.
:param float vg_stop: End voltage of transfer sweep in Volts.
:param float vg_step: Voltage step size for transfer sweep in Volts.
:param list vd_list: List of drain voltage steps in Volts.
:param float t_int: Integration time in sec for every data point.
:param float delay: Settling time in sec before every measurement. Set to -1 for
automatic delay.
:param bool pulsed: True or False for pulsed or continuous measurements.
:param str path: File path to save transfer curve data as .txt file.
:returns: Transfer curve data.
:rtype: :class:`keithley2600.TransistorSweepData`
"""
self._check_for_keithley()
smu_gate = getattr(self.keithley, smu_gate)
smu_drain = getattr(self.keithley, smu_drain)
sd = self.keithley.transfer_measurement(
smu_gate,
smu_drain,
vg_start,
vg_stop,
vg_step,
vd_list,
t_int,
delay,
pulsed,
)
if path is not None:
sd.save(path)
return sd
@manager.queued_exec
def outputMeasurement(
self,
smu_gate=KCONF.get("Sweep", "gate"),
smu_drain=KCONF.get("Sweep", "drain"),
vd_start=KCONF.get("Sweep", "VdStart"),
vd_stop=KCONF.get("Sweep", "VdStop"),
vd_step=KCONF.get("Sweep", "VdStep"),
vg_list=KCONF.get("Sweep", "VgList"),
t_int=KCONF.get("Sweep", "tInt"),
delay=KCONF.get("Sweep", "delay"),
pulsed=KCONF.get("Sweep", "pulsed"),
path=None,
):
"""
Records an output curve and returns the resulting data. If a valid path is
given, the data is also saved as a .txt file.
:param smu_gate: Name of SMU attached to the gate electrode of an FET.
:param smu_drain: Name of SMU attached to the drain electrode of an FET.
:param float vd_start: Start voltage of output sweep in Volts .
:param float vd_stop: End voltage of output sweep in Volts.
:param float vd_step: Voltage step size for output sweep in Volts.
:param list vg_list: List of gate voltage steps in Volts.
:param float t_int: Integration time in sec for every data point.
:param float delay: Settling time in sec before every measurement. Set to -1 for
automatic delay.
:param bool pulsed: True or False for pulsed or continuous measurements.
:param str path: File path to save output curve data as .txt file.
:returns: Output curve data.
:rtype: :class:`keithley2600.TransistorSweepData`
"""
self._check_for_keithley()
smu_gate = getattr(self.keithley, smu_gate)
smu_drain = getattr(self.keithley, smu_drain)
sd = self.keithley.output_measurement(
smu_gate,
smu_drain,
vd_start,
vd_stop,
vd_step,
vg_list,
t_int,
delay,
pulsed,
)
if path is not None:
sd.save(path)
return sd
@manager.queued_exec
def setVoltage(self, v, smu=KCONF.get("Sweep", "gate")):
"""
Sets the bias of the given Keithley SMU.
:param float v: Gate voltage in Volts.
:param str smu: Name of SMU. Defaults to the SMU saved as gate.
"""
self._check_for_keithley()
smu = getattr(self.keithley, smu)
self.keithley.apply_voltage(smu, v)
self.keithley.beeper.beep(0.3, 2400)
@manager.queued_exec
def setCurrent(self, i, smu=KCONF.get("Sweep", "drain")):
"""
Applies a specified current to the selected Keithley SMU.
:param float i: Current in Ampere.
:param str smu: Name of SMU. Defaults to the SMU saved as drain.
"""
self._check_for_keithley()
smu = getattr(self.keithley, smu)
self.keithley.apply_current(smu, i)
self.keithley.beeper.beep(0.3, 2400)
# ==================================================================================
# Helper methods
# ==================================================================================
def _check_for_mercury(self, raise_error=True):
"""
Checks if a MercuryITC is connect and correctly configured.
"""
if not self.mercury:
error_info = (
"No Mercury instance supplied. Functions that "
+ "require a connected cryostat will not work."
)
elif not self.mercury.connected:
error_info = (
"MercuryiTC is not connected. Functions that "
+ "require a connected cryostat will not work."
)
else:
temperature_module_name = CONF.get("CustomXepr", "esr_temperature_nick")
temp, gasflow, heater = self._select_temp_sensor(temperature_module_name)
if not temp:
error_info = (
'MercuryiTC error: temperature sensor "{}" not ' "found."
).format(temperature_module_name)
elif not heater:
error_info = (
'MercuryiTC error: No heater module configured for "{}". '
"Functions that require a connected cryostat will not "
"work."
).format(temperature_module_name)
elif not gasflow:
error_info = (
'MercuryiTC error: No gas flow module configured for "{}". '
"Functions that require a connected cryostat will not "
"work."
).format(temperature_module_name)
else:
error_info = False
if error_info:
if raise_error:
raise RuntimeError(error_info)
logger.info(error_info)
return False
else:
return True
def _check_for_keithley(self, raise_error=True):
"""
Checks if a keithley instance has been passed and is connected to an an actual
instrument.
"""
if not self.keithley:
error_info = (
"No Keithley instance supplied. Functions that "
+ "require a connected Keithley SMU will not work."
)
elif not self.keithley.connected:
error_info = (
"Keithley is not connected. Functions that "
+ "require a connected Keithley will not work."
)
else:
error_info = False
if error_info:
if raise_error:
raise RuntimeError(error_info)
logger.info(error_info)
return False
else:
return True
def _check_for_xepr(self, raise_error=True):
if not self.xepr:
error_info = (
"No Xepr instance supplied. Functions that "
+ "require Xepr will not work."
)
elif not self.xepr.XeprActive():
error_info = (
"Xepr API not active. Please activate Xepr API by "
+ 'pressing "Processing > XeprAPI > Enable Xepr API"'
)
else:
error_info = False
self.XeprCmds = self.xepr.XeprCmds
if not self.hidden:
try:
self.hidden = self.xepr.XeprExperiment("AcqHidden")
except Exception:
error_info = (
"Xepr is not connected to the spectrometer. "
+ 'Please connect by pressing "Acquisition > '
+ 'Connect To Spectrometer..."'
)
if error_info:
if raise_error:
raise RuntimeError(error_info)
logger.info(error_info)
return False
else:
return True
def _select_temp_sensor(self, nick):
# find all temperature modules
temp_mods = [m for m in self.mercury.modules if type(m) == MercuryITC_TEMP]
if len(temp_mods) == 0:
raise IOError("MercuryITC does not have any connected temperature modules")
# find the temperature module with given name
temperature = next((m for m in temp_mods if m.nick == nick), None)
if temperature:
htr_nick = temperature.loop_htr
aux_nick = temperature.loop_aux
heater = next((m for m in self.mercury.modules if m.nick == htr_nick), None)
gasflow = next(
(m for m in self.mercury.modules if m.nick == aux_nick), None
)
else:
gasflow = None
heater = None
return temperature, gasflow, heater