Job scheduling¶
- class manager.Manager[source]¶
Manager
provides a high level interface for the scheduling and executing experiments. All queued experiments will be run in a background thread andManager
provides methods to pause, resume and abort the execution of experiments. All results will be kept in the result_queue for later retrieval.Function calls can be queued as experiments by decorating the function with the
Manager.queued_exec
decorator:>>> import time >>> from customxepr.manager import Manager >>> manager = Manager()
>>> # create test function >>> @manager.queued_exec ... def decorated_test_func(*args): ... # do something ... for i in range(0, 10): ... time.sleep(1) ... # check for requested aborts ... if manager.abort.is_set() ... break ... return args # return input arguments
>>> # run the function: this will return immediately >>> exp = decorated_test_func('test succeeded')
The result returned by test_func can be retrieved from the result queue or the returned Experiment instance:
>>> res1 = manager.result_queue.get() # blocks until result is available >>> res2 = exp.result() # blocks until result is available >>> print(res1) test succeeded >>> print(res1 is res2) True
Alternatively, it is possible to manually create an experiment from a function call and queue it for processing as follows:
>>> import time >>> from customxepr.manager import Manager, Experiment >>> manager = Manager()
>>> # create test function >>> def stand_alone_test_func(*args): ... # do something ... for i in range(0, 10): ... time.sleep(1) ... # check for requested aborts ... if manager.abort.is_set() ... break ... return args # return input arguments
>>> # create an experiment from function >>> exp = Experiment(stand_alone_test_func, args=['test succeeded',], kwargs={}) >>> # queue the experiment >>> manager.job_queue.put(exp)
This class provides an event abort which can queried periodically by any function to see if it should abort prematurely. Alternatively, functions and methods can provide their own abort events and register them with the manager as follows:
>>> from threading import Event >>> abort_event = Event() >>> # register the event with the manager instance >>> manager.abort_events = [abort_event]
The manager will automatically set the status of an experiment to ABORTED if it finishes while an abort event is set and clear all abort events afterwards.
- Variables
job_queue – An instance of
ExperimentQueue
holding all queued and finished experiments.result_queue – A queue holding all results.
running – Event that causes the worker to pause between jobs if set.
abort – A generic event which can be used in long-running experiments to check if they should be aborted.
- queued_exec(func)[source]¶
Decorator that puts a call to a wrapped function into the job_queue queue instead of executing it. Returns the queued
Experiment
which is similar to a Python’sconcurrent.futures.Future
.
- property notify_address¶
List of addresses for email notifications.
- property log_file_dir¶
Directory for log files. Defaults to ‘~/.CustomXepr’.
- property email_handler_level¶
Logging level for email notifications. Defaults to
logging.WARNING
.
- class manager.Experiment(func, args, kwargs)[source]¶
Class to hold a scheduled job / experiment and keep track of its status and result. It is similar in functionality to
concurrent.futures.Future
.- Parameters
func – Function or method to call when running experiment.
args – Arguments for function call.
kwargs – Keyword arguments for function call.
- result(timeout=None)[source]¶
Returns the result of the experiment. If the experiment hasn’t yet completed then this method will wait up to timeout seconds. If the experiment hasn’t completed in timeout seconds, then a TimeoutError will be raised. If timeout is not specified or None, there is no limit to the wait time.
If the experiment is cancelled before completing then CancelledError will be raised.
If the call raised, this method will raise the same exception.
- Parameters
timeout (int) – Time in seconds to wait for a result.
- Raises
TimeoutError if no result becomes available within
timeout
, CancelledError if the experiment has been cancelled or Exception if an exception occurred during execution.
- property status¶
Property that holds the status of the job.
- class manager.ExperimentQueue[source]¶
Queue to hold all jobs: Pending, running and already completed. Items in this queue should be of type
Experiment
.- Variables
added_signal – Emitted when a new job is added to the queue.
removed_signal – Emitted when a job is removed from the queue. Carriers the index of the job in
ExperimentQueue
.status_changed_signal – Emitted when a job status changes, e.g., from
ExpStatus.QUEUED
toExpStatus.RUNNING
. Carries a tuple holding the job index and status.
- property queue¶
Returns list of all items in queue (queued, running, and in history).
- put(exp)[source]¶
Adds item exp to the end of the queue. Its status must be
ExpStatus.QUEUED
. Emits theadded_signal
signal.
- get_next_job()[source]¶
Returns the next queued item and flags it as running. If there are no queued items,
queue.Empty
is raised. Emits thestatus_changed_signal
with the item’s index and its new status.
- job_done(exit_status, result=None)[source]¶
Call to inform the Experiment queue that a task is completed. Changes the corresponding item’s status to exit_status and its result to result. Emits the status_changed_signal with the item’s index and its new status.
- Parameters
exit_status – Status of the completed job, i.e.,
ExpStatus.ABORTED
.result – Return value of job, if available.
- remove_item(i)[source]¶
Removes the item with index i from the queue. Raises a
ValueError
if the item belongs to a running or already completed job. Emits theremoved_signal
carrying the index.- Parameters
i (int) – Index of item to remove.
- remove_items(i_start, i_end=None)[source]¶
Removes the items from index i_start to i_end inclusive from the queue. Raises a
ValueError
if the item belongs to a running or already completed job. Emits theremoved_signal
for every removed item.This call has O(n) performance with regards to the queue length and number of items to be removed.
- class manager.Worker(job_q, result_q, abort_events)[source]¶
Worker that gets all method calls with args from
job_q
and executes them. Results are then stored in theresult_q
.- Parameters
job_q – Queue with jobs to be performed.
result_q – Queue with results from completed jobs.
- Variables
running – Event that causes the worker to pause between jobs if set.