Source code for manager

# -*- coding: utf-8 -*-
"""
@author: Sam Schott  (ss2151@cam.ac.uk)

(c) Sam Schott; This work is licensed under a Creative Commons
Attribution-NonCommercial-NoDerivs 2.0 UK: England & Wales License.

"""
import os
import time
import logging
import logging.handlers
from PySignal import ClassSignal
from queue import Queue, Empty
from threading import RLock, Event, Thread, current_thread
from enum import Enum
import collections
from functools import wraps

from customxepr.config import CONF


logger = logging.getLogger(__name__)
root_logger = logging.getLogger("customxepr")


# ======================================================================================
# class to wrap queued function calls ('experiments') and provide metadata
# ======================================================================================


class CancelledError(Exception):
    pass


class TimeoutError(Exception):
    pass


[docs]class ExpStatus(Enum): """ Enumeration to hold experiment status. """ QUEUED = object() RUNNING = object() ABORTED = object() FAILED = object() FINISHED = object()
[docs]class Experiment(object): """ Class to hold a scheduled job / experiment and keep track of its status and result. It is similar in functionality to :class:`concurrent.futures.Future`. :param func: Function or method to call when running experiment. :param args: Arguments for function call. :param kwargs: Keyword arguments for function call. """ def __init__(self, func, args, kwargs): self.func = func self.args = args self.kwargs = kwargs self._done_event = Event() self._status = ExpStatus.QUEUED self._result = None
[docs] def running(self): """Returns ``True`` if the job is running.""" return self._status == ExpStatus.RUNNING
[docs] def done(self): """Returns ``True`` if the job is done, has failed or has been cancelled.""" return self._status in (ExpStatus.FINISHED, ExpStatus.FAILED, ExpStatus.ABORTED)
def _set_result(self, result): self._result = result self._done_event.set()
[docs] def result(self, timeout=None): """ Returns the result of the experiment. If the experiment hasn’t yet completed then this method will wait up to timeout seconds. If the experiment hasn’t completed in timeout seconds, then a TimeoutError will be raised. If timeout is not specified or None, there is no limit to the wait time. If the experiment is cancelled before completing then CancelledError will be raised. If the call raised, this method will raise the same exception. :param int timeout: Time in seconds to wait for a result. :raises: TimeoutError if no result becomes available within ``timeout``, CancelledError if the experiment has been cancelled or Exception if an exception occurred during execution. """ if self._status == ExpStatus.ABORTED: raise CancelledError("Experiment has been cancelled.") if self._done_event.wait(timeout): if isinstance(self._result, Exception): raise self._result else: return self._result else: raise TimeoutError("No result available yet.")
@property def status(self): """ Property that holds the status of the job. """ return self._status @status.setter def status(self, s): if s not in ExpStatus: raise ValueError("Argument must be of type %s" % type(ExpStatus)) else: self._status = s def __repr__(self): info_strings = ["func={}".format(self.func.__name__)] if len(self.args) > 0: info_strings.append("args={}".format(self.args)) if len(self.kwargs) > 0: info_strings.append("kwargs={}".format(self.kwargs)) info_strings.append("status={}".format(self.status)) if self.status is ExpStatus.FINISHED: info_strings.append("has_result={}".format(self._result is not None)) return "<{0}({1})>".format(self.__class__.__name__, ", ".join(info_strings))
# ====================================================================================== # custom queue which emits PyQt signals on put and get # ====================================================================================== class SignalQueue(Queue): """ Custom queue that emits PySignal signals if an item is added or removed. Inherits from :class:`queue.Queue` and provides a thread-safe method to remove items from the center of the queue. :cvar added_signal: Is emitted when an item is put into the queue. :cvar removed_signal: Is emitted when items are removed from the queue. """ added_signal = ClassSignal() removed_signal = ClassSignal() def __init__(self): super(self.__class__, self).__init__() def _put(self, item): Queue._put(self, item) self.added_signal.emit() def _get(self): item = Queue._get(self) self.removed_signal.emit(0, 1) return item def remove_item(self, i): """ Removes item from the queue in a thread safe manner. Calls :meth:`job_done` when done. :param int i: Index of item to remove. """ self.remove_items(i) def remove_items(self, i_start, i_end=None): """ Removes the items from index `i_start` to `i_end` inclusive from the queue. Raises a :class:`ValueError` if the item belongs to a running or already completed job. Emits the :attr:`removed_signal` for every removed item. Calls :meth:`job_done` for every item removed. This call has O(n) performance with regards to the queue length and number of items to be removed. :param int i_start: Index of first item to remove. :param int i_end: Index of last item to remove (defaults to i_end = i_start). """ size = self.qsize() with self.mutex: if i_end is None: i_end = i_start # convert negative to positive indices i0 = i_start % size i1 = i_end % size if not i0 <= i1: raise ValueError("'i_end' must be larger than or equal to 'i_start'.") else: new_items = [x for i, x in enumerate(self.queue) if i < i0 or i > i1] self.queue = collections.deque(new_items) n_items = i1 - i0 + 1 self.removed_signal.emit(i0, n_items) for i in range(n_items): self.task_done() def __repr__(self): return "<{0}({1} results)>".format(self.__class__.__name__, self.qsize()) # ====================================================================================== # custom queue for experiments where all history is kept # ======================================================================================
[docs]class ExperimentQueue(object): """ Queue to hold all jobs: Pending, running and already completed. Items in this queue should be of type :class:`Experiment`. :cvar added_signal: Emitted when a new job is added to the queue. :cvar removed_signal: Emitted when a job is removed from the queue. Carriers the index of the job in :class:`ExperimentQueue`. :cvar status_changed_signal: Emitted when a job status changes, e.g., from :class:`ExpStatus.QUEUED` to :class:`ExpStatus.RUNNING`. Carries a tuple holding the job index and status. """ added_signal = ClassSignal() removed_signal = ClassSignal() status_changed_signal = ClassSignal() _lock = RLock() def __init__(self): super(self.__class__, self).__init__() self._queued = Queue() self._running = Queue(maxsize=1) self._history = Queue() @property def queue(self): """ Returns list of all items in queue (queued, running, and in history). """ with self._lock: return ( list(self._history.queue) + list(self._running.queue) + list(self._queued.queue) )
[docs] def put(self, exp): """ Adds item `exp` to the end of the queue. Its status must be :class:`ExpStatus.QUEUED`. Emits the :attr:`added_signal` signal. """ if not exp.status == ExpStatus.QUEUED: raise ValueError('Can only append experiments with status "QUEUED".') with self._lock: self._queued.put(exp) self.added_signal.emit()
[docs] def get_next_job(self): """ Returns the next queued item and flags it as running. If there are no queued items, :class:`queue.Empty` is raised. Emits the :attr:`status_changed_signal` with the item's index and its new status. """ with self._lock: exp = self._queued.get_nowait() exp.status = ExpStatus.RUNNING self._running.put(exp) index = self.first_queued_index() - 1 self.status_changed_signal.emit(index, exp.status) return exp
[docs] def job_done(self, exit_status, result=None): """ Call to inform the Experiment queue that a task is completed. Changes the corresponding item's status to `exit_status` and its result to `result`. Emits the `status_changed_signal` with the item's index and its new status. :param exit_status: Status of the completed job, i.e., :class:`ExpStatus.ABORTED`. :param result: Return value of job, if available. """ with self._lock: exp = self._running.get_nowait() exp.status = exit_status exp._set_result(result) self._history.put(exp) index = self._history.qsize() - 1 self.status_changed_signal.emit(index, exit_status)
[docs] def remove_item(self, i): """ Removes the item with index `i` from the queue. Raises a :class:`ValueError` if the item belongs to a running or already completed job. Emits the :attr:`removed_signal` carrying the index. :param int i: Index of item to remove. """ self.remove_items(i)
[docs] def remove_items(self, i_start, i_end=None): """ Removes the items from index `i_start` to `i_end` inclusive from the queue. Raises a :class:`ValueError` if the item belongs to a running or already completed job. Emits the :attr:`removed_signal` for every removed item. This call has O(n) performance with regards to the queue length and number of items to be removed. :param int i_start: Index of first item to remove. :param int i_end: Index of last item to remove (defaults to i_end = i_start). """ if i_end is None: i_end = i_start with self._lock: # convert negative indices to positive i_start = i_start % self.qsize() i_end = i_end % self.qsize() # convert to index of self._queued.queue i0 = i_start - self.first_queued_index() i1 = i_end - self.first_queued_index() if i0 < 0: raise ValueError("Only queued experiments can be removed.") elif not i0 <= i1: raise ValueError("'i_end' must be larger than or equal to 'i_start'.") else: new_items = [ x for i, x in enumerate(self._queued.queue) if i < i0 or i > i1 ] self._queued.queue = collections.deque(new_items) n_items = i_end - i_start + 1 self.removed_signal.emit(i_start, n_items)
[docs] def clear(self): """ Removes all queued experiments at once. """ with self._lock: if self.has_queued(): self.removed_signal.emit( self.first_queued_index(), self._queued.qsize() ) self._queued.queue.clear()
def has_running(self): return self._running.qsize() > 0 def has_queued(self): return self._queued.qsize() > 0 def has_history(self): return self._history.qsize() > 0 def first_queued_index(self): with self._lock: return self._history.qsize() + self._running.qsize()
[docs] def qsize(self, status=None): """ Return the approximate number of jobs with given status (not reliable!). :param status: Can be 'queued', 'running', 'history' or `None`. If `None`, the full queue size will be returned. """ with self._lock: return self._qsize(status)
def _qsize(self, status): if status == "queued": return self._queued.qsize() elif status == "running": return self._running.qsize() elif status == "history": return self._history.qsize() else: return self._history.qsize() + self._running.qsize() + self._queued.qsize() def __repr__(self): return "<{0}({1} done, {2} running, {3} queued)>".format( self.__class__.__name__, self.qsize("history"), self.qsize("running"), self.qsize("queued"), )
# ====================================================================================== # worker that gets function / method calls from queue and carriers them out # ======================================================================================
[docs]class Worker(object): """ Worker that gets all method calls with args from :attr:`job_q` and executes them. Results are then stored in the :attr:`result_q`. :param job_q: Queue with jobs to be performed. :param result_q: Queue with results from completed jobs. :cvar running: Event that causes the worker to pause between jobs if set. """ running = Event() def __init__(self, job_q, result_q, abort_events): super(self.__class__, self).__init__() self.job_q = job_q self.result_q = result_q self.abort_events = abort_events def abort_is_set(self): return any(event.is_set() for event in self.abort_events) def clear_abort(self): for event in self.abort_events: event.clear() def process(self): while True: time.sleep(0.1) if not self.running.is_set(): logger.debug("PAUSED") self.running.wait() try: exp = self.job_q.get_next_job() # get next job except Empty: pass else: # noinspection PyBroadException try: result = exp.func(*exp.args, **exp.kwargs) # run the job except Exception as e: # log exception and pause execution of jobs logger.exception("Job error") self.job_q.job_done(ExpStatus.FAILED, result=e) self.running.clear() logger.debug("PAUSED") else: if result is not None: self.result_q.put(result) if self.abort_is_set(): exit_status = ExpStatus.ABORTED self.clear_abort() else: exit_status = ExpStatus.FINISHED self.job_q.job_done(exit_status, result) logger.debug("IDLE")
# ====================================================================================== # manager to coordinate everything # ======================================================================================
[docs]class Manager(object): """ :class:`Manager` provides a high level interface for the scheduling and executing experiments. All queued experiments will be run in a background thread and :class:`Manager` provides methods to pause, resume and abort the execution of experiments. All results will be kept in the `result_queue` for later retrieval. Function calls can be queued as experiments by decorating the function with the :attr:`Manager.queued_exec` decorator: >>> import time >>> from customxepr.manager import Manager >>> manager = Manager() >>> # create test function >>> @manager.queued_exec ... def decorated_test_func(*args): ... # do something ... for i in range(0, 10): ... time.sleep(1) ... # check for requested aborts ... if manager.abort.is_set() ... break ... return args # return input arguments >>> # run the function: this will return immediately >>> exp = decorated_test_func('test succeeded') The result returned by `test_func` can be retrieved from the result queue or the returned Experiment instance: >>> res1 = manager.result_queue.get() # blocks until result is available >>> res2 = exp.result() # blocks until result is available >>> print(res1) test succeeded >>> print(res1 is res2) True Alternatively, it is possible to manually create an experiment from a function call and queue it for processing as follows: >>> import time >>> from customxepr.manager import Manager, Experiment >>> manager = Manager() >>> # create test function >>> def stand_alone_test_func(*args): ... # do something ... for i in range(0, 10): ... time.sleep(1) ... # check for requested aborts ... if manager.abort.is_set() ... break ... return args # return input arguments >>> # create an experiment from function >>> exp = Experiment(stand_alone_test_func, args=['test succeeded',], kwargs={}) >>> # queue the experiment >>> manager.job_queue.put(exp) This class provides an event `abort` which can queried periodically by any function to see if it should abort prematurely. Alternatively, functions and methods can provide their own abort events and register them with the manager as follows: >>> from threading import Event >>> abort_event = Event() >>> # register the event with the manager instance >>> manager.abort_events = [abort_event] The manager will automatically set the status of an experiment to ABORTED if it finishes while an abort event is set and clear all abort events afterwards. :cvar job_queue: An instance of :class:`ExperimentQueue` holding all queued and finished experiments. :cvar result_queue: A queue holding all results. :ivar running: Event that causes the worker to pause between jobs if set. :cvar abort: A generic event which can be used in long-running experiments to check if they should be aborted. """ job_queue = ExperimentQueue() result_queue = SignalQueue() def __init__(self): super(self.__class__, self).__init__() self.abort = Event() self.abort_events = [self.abort] # create background thread to process all executions in queue self.worker = Worker(self.job_queue, self.result_queue, self.abort_events) self.thread = Thread( target=self.worker.process, name="ExperimentManagerThread", ) self.thread.daemon = True self.worker.running.set() self.thread.start() self.running = self.worker.running # set up logging functionality self._setup_root_logger() # ================================================================================== # job execution management # ==================================================================================
[docs] def queued_exec(self, func): """ Decorator that puts a call to a wrapped function into the job_queue queue instead of executing it. Returns the queued :class:`Experiment` which is similar to a Python's :class:`concurrent.futures.Future`. """ @wraps(func) def wrapper(*args, **kwargs): if current_thread() is self.thread: return func(*args, **kwargs) else: exp = Experiment(func, args, kwargs) self.job_queue.put(exp) return exp return wrapper
[docs] def pause_worker(self): """ Pauses the execution of jobs after the current job has been completed. """ self.worker.running.clear()
[docs] def resume_worker(self): """ Resumes the execution of jobs. """ self.worker.running.set() logger.debug("IDLE")
[docs] def abort_job(self): """ Aborts the current job and continues with the next. """ if self.job_queue.has_running() > 0: for event in self.abort_events: event.set()
[docs] def clear_all_jobs(self): """ Clears all pending jobs in :attr:`job_queue`. """ self.job_queue.clear()
# ================================================================================== # logging facilities # ================================================================================== @staticmethod def _setup_root_logger(): root_logger.setLevel(logging.DEBUG) root_logger.handlers.clear() # define standard format of logging messages f = logging.Formatter( fmt="%(asctime)s %(name)s %(levelname)s: " + "%(message)s", datefmt="%H:%M" ) # create and add email handler email_handler = logging.handlers.SMTPHandler( mailhost=(CONF.get("SMTP", "mailhost"), CONF.get("SMTP", "port")), fromaddr=CONF.get("SMTP", "fromaddr"), toaddrs=CONF.get("CustomXepr", "notify_address"), subject="CustomXepr logger", credentials=CONF.get("SMTP", "credentials"), secure=CONF.get("SMTP", "secure"), ) email_handler.setFormatter(f) email_handler.setLevel(CONF.get("CustomXepr", "email_handler_level")) root_logger.addHandler(email_handler) # add file handler home_path = os.path.expanduser("~") logging_path = os.path.join(home_path, ".CustomXepr", "LOG_FILES") if not os.path.exists(logging_path): os.makedirs(logging_path) log_file = os.path.join( logging_path, "root_logger " + time.strftime("%Y-%m-%d_%H-%M-%S") + ".txt", ) file_handler = logging.FileHandler(log_file) file_handler.setFormatter(f) file_handler.setLevel(logging.INFO) root_logger.addHandler(file_handler) # delete old log files now = time.time() days_to_keep = 365 for f in os.listdir(logging_path): f = os.path.join(logging_path, f) if os.stat(f).st_mtime < now - days_to_keep * 24 * 60 * 60: if os.path.isfile(f): os.remove(f) @property def notify_address(self): """ List of addresses for email notifications.""" return CONF.get("CustomXepr", "notify_address") @notify_address.setter def notify_address(self, email_list): """Setter: Address list for email notifications.""" # find all email handlers (there should be only one) eh = [ x for x in root_logger.handlers if type(x) == logging.handlers.SMTPHandler ] if len(eh) == 0: logging.warning("No email handler could be found.") elif len(eh) > 0: for handler in eh: handler.toaddrs = email_list email_list_str = ", ".join(email_list) logger.info("Email notifications will be sent to " + email_list_str + ".") # update conf file CONF.set("CustomXepr", "notify_address", email_list) @property def log_file_dir(self): """Directory for log files. Defaults to '~/.CustomXepr'.""" # get root logger root_log = logging.getLogger("customxepr") # find all email handlers (there should be only one) fh = [x for x in root_log.handlers if type(x) == logging.FileHandler] if len(fh) == 0: logger.warning("No file handler could be found.") else: file_name = fh[0].baseFilename return os.path.dirname(file_name) @property def email_handler_level(self): """ Logging level for email notifications. Defaults to :class:`logging.WARNING`. """ # find all email handlers (there should be only one) eh = [ x for x in root_logger.handlers if type(x) == logging.handlers.SMTPHandler ] if len(eh) == 0: logger.warning("No email handler could be found.") else: return eh[0].level @email_handler_level.setter def email_handler_level(self, level=logging.WARNING): """Setter: Logging level for email notifications.""" # find all email handlers (there should be only one) eh = [ x for x in root_logger.handlers if type(x) == logging.handlers.SMTPHandler ] if len(eh) == 0: logger.warning("No email handler could be found.") else: eh[0].setLevel(level) # update conf file CONF.set("CustomXepr", "email_handler_level", level)