Job scheduling

class manager.Manager[source]

Manager provides a high level interface for the scheduling and executing experiments. All queued experiments will be run in a background thread and Manager provides methods to pause, resume and abort the execution of experiments. All results will be kept in the result_queue for later retrieval.

Function calls can be queued as experiments by decorating the function with the Manager.queued_exec decorator:

>>> import time
>>> from customxepr.manager import Manager
>>> manager = Manager()
>>> # create test function
>>> @manager.queued_exec
... def decorated_test_func(*args):
...     # do something
...     for i in range(0, 10):
...         time.sleep(1)
...         # check for requested aborts
...         if manager.abort.is_set()
...             break
...     return args  # return input arguments
>>> # run the function: this will return immediately
>>> exp = decorated_test_func('test succeeded')

The result returned by test_func can be retrieved from the result queue or the returned Experiment instance:

>>> res1 = manager.result_queue.get()  # blocks until result is available
>>> res2 = exp.result()  # blocks until result is available
>>> print(res1)
test succeeded
>>> print(res1 is res2)
True

Alternatively, it is possible to manually create an experiment from a function call and queue it for processing as follows:

>>> import time
>>> from customxepr.manager import Manager, Experiment
>>> manager = Manager()
>>> # create test function
>>> def stand_alone_test_func(*args):
...     # do something
...     for i in range(0, 10):
...         time.sleep(1)
...         # check for requested aborts
...         if manager.abort.is_set()
...             break
...     return args  # return input arguments
>>> # create an experiment from function
>>> exp = Experiment(stand_alone_test_func, args=['test succeeded',], kwargs={})
>>> # queue the experiment
>>> manager.job_queue.put(exp)

This class provides an event abort which can queried periodically by any function to see if it should abort prematurely. Alternatively, functions and methods can provide their own abort events and register them with the manager as follows:

>>> from threading import Event
>>> abort_event = Event()
>>> # register the event with the manager instance
>>> manager.abort_events = [abort_event]

The manager will automatically set the status of an experiment to ABORTED if it finishes while an abort event is set and clear all abort events afterwards.

Variables
  • job_queue – An instance of ExperimentQueue holding all queued and finished experiments.

  • result_queue – A queue holding all results.

  • running – Event that causes the worker to pause between jobs if set.

  • abort – A generic event which can be used in long-running experiments to check if they should be aborted.

queued_exec(func)[source]

Decorator that puts a call to a wrapped function into the job_queue queue instead of executing it. Returns the queued Experiment which is similar to a Python’s concurrent.futures.Future.

pause_worker()[source]

Pauses the execution of jobs after the current job has been completed.

resume_worker()[source]

Resumes the execution of jobs.

abort_job()[source]

Aborts the current job and continues with the next.

clear_all_jobs()[source]

Clears all pending jobs in job_queue.

property notify_address

List of addresses for email notifications.

property log_file_dir

Directory for log files. Defaults to ‘~/.CustomXepr’.

property email_handler_level

Logging level for email notifications. Defaults to logging.WARNING.

class manager.Experiment(func, args, kwargs)[source]

Class to hold a scheduled job / experiment and keep track of its status and result. It is similar in functionality to concurrent.futures.Future.

Parameters
  • func – Function or method to call when running experiment.

  • args – Arguments for function call.

  • kwargs – Keyword arguments for function call.

running()[source]

Returns True if the job is running.

done()[source]

Returns True if the job is done, has failed or has been cancelled.

result(timeout=None)[source]

Returns the result of the experiment. If the experiment hasn’t yet completed then this method will wait up to timeout seconds. If the experiment hasn’t completed in timeout seconds, then a TimeoutError will be raised. If timeout is not specified or None, there is no limit to the wait time.

If the experiment is cancelled before completing then CancelledError will be raised.

If the call raised, this method will raise the same exception.

Parameters

timeout (int) – Time in seconds to wait for a result.

Raises

TimeoutError if no result becomes available within timeout, CancelledError if the experiment has been cancelled or Exception if an exception occurred during execution.

property status

Property that holds the status of the job.

class manager.ExpStatus(value)[source]

Enumeration to hold experiment status.

class manager.ExperimentQueue[source]

Queue to hold all jobs: Pending, running and already completed. Items in this queue should be of type Experiment.

Variables
  • added_signal – Emitted when a new job is added to the queue.

  • removed_signal – Emitted when a job is removed from the queue. Carriers the index of the job in ExperimentQueue.

  • status_changed_signal – Emitted when a job status changes, e.g., from ExpStatus.QUEUED to ExpStatus.RUNNING. Carries a tuple holding the job index and status.

property queue

Returns list of all items in queue (queued, running, and in history).

put(exp)[source]

Adds item exp to the end of the queue. Its status must be ExpStatus.QUEUED. Emits the added_signal signal.

get_next_job()[source]

Returns the next queued item and flags it as running. If there are no queued items, queue.Empty is raised. Emits the status_changed_signal with the item’s index and its new status.

job_done(exit_status, result=None)[source]

Call to inform the Experiment queue that a task is completed. Changes the corresponding item’s status to exit_status and its result to result. Emits the status_changed_signal with the item’s index and its new status.

Parameters
  • exit_status – Status of the completed job, i.e., ExpStatus.ABORTED.

  • result – Return value of job, if available.

remove_item(i)[source]

Removes the item with index i from the queue. Raises a ValueError if the item belongs to a running or already completed job. Emits the removed_signal carrying the index.

Parameters

i (int) – Index of item to remove.

remove_items(i_start, i_end=None)[source]

Removes the items from index i_start to i_end inclusive from the queue. Raises a ValueError if the item belongs to a running or already completed job. Emits the removed_signal for every removed item.

This call has O(n) performance with regards to the queue length and number of items to be removed.

Parameters
  • i_start (int) – Index of first item to remove.

  • i_end (int) – Index of last item to remove (defaults to i_end = i_start).

clear()[source]

Removes all queued experiments at once.

qsize(status=None)[source]

Return the approximate number of jobs with given status (not reliable!).

Parameters

status – Can be ‘queued’, ‘running’, ‘history’ or None. If None, the full queue size will be returned.

class manager.Worker(job_q, result_q, abort_events)[source]

Worker that gets all method calls with args from job_q and executes them. Results are then stored in the result_q.

Parameters
  • job_q – Queue with jobs to be performed.

  • result_q – Queue with results from completed jobs.

Variables

running – Event that causes the worker to pause between jobs if set.