Source code for bouwer.action

#
# Copyright (C) 2012 Niek Linnenbank
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""
Bouwer action layer

This layer is responsible for executing actions. An action can
be a shell command or a python function. The action layer does not know
about the other layers (build and configuration).
"""

import multiprocessing
import subprocess
import os
import os.path
import sys
import datetime
import logging
import bouwer.util
from bouwer.cli import CommandLine

[docs]class Worker(multiprocessing.Process): """ Implements a consumer process for executable :class:`Action` objects. The :class:`Worker` class implements a simple consumer for executing :class:`Action` objects. It takes a :obj:`dict` of available `actions` and receives target names of the appropriate action to execute from the `work` :class:`Queue`. The worker sends an :class:`ActionEvent` on the `events` :class:`Queue` before with type `ActionEvent.EXECUTE` and after executing an action with type `ActionEvent.FINISH`. """ def __init__(self, actions, work, events): """ Constructor :param dict actions: Dictionary with :class:`Action` objects :param :class:`Queue` work: Queue to receive target names to execute :param :class:`Queue` events: Queue to publish events to the :class:`WorkerManager` """ super(Worker, self).__init__() self._actions = actions self._work = work self._events = events
[docs] def run(self): """ Main execution loop of the Worker. Does not return. """ while True: # Retrieve the next Action target target = self._work.get() # Trigger ActionEvents and execute the action self._events.put(ActionEvent(self.name, target, ActionEvent.EXECUTE)) result = self._actions[target]() self._events.put(ActionEvent(self.name, target, ActionEvent.FINISH, result))
[docs]class WorkerManager: """ Manages a pool of :class:`Worker` processes """ def __init__(self, actions): """ Constructor :param dict actions: Dictionary with :class:`Action` objects """ self.actions = actions self.work = multiprocessing.Queue() self.events = multiprocessing.Queue() self.workers = [] self.log = logging.getLogger(__name__) self.output_plugin = getattr(CommandLine.Instance().args, 'output_plugin', None) self.running = [] self.pending = self.actions.copy() # Create workers for i in range(CommandLine.Instance().args.workers): worker = Worker(self.actions, self.work, self.events) self.workers.append(worker) worker.start() def __del__(self): """ Destructor """ for proc in self.workers: proc.terminate() proc.join()
[docs] def execute(self): """ Execute all :class:`.Action` objects """ self.log.debug("running actions") while True: collecting = True # Make as much as possible work available while collecting or (self.pending and not self.running): collecting = False for key in list(self.pending.keys()): action = self.pending[key] if self.decide(action): action.status = ActionEvent.EXECUTE self.work.put(action.target) self.running.append(action) del self.pending[key] collecting = True # Wait for events if not self.running: break event = self.events.get() action = self.actions[event.target] action.status = event.type self.log.debug("event: " + str(event)) if action.status == ActionEvent.FINISH: self.running.remove(action) # Report the event to the builder action.builder.action_event(action, event) # Invoke output plugin. if self.output_plugin: self.output_plugin.action_event(action, event)
[docs] def decide(self, action): """ Decide if this action needs to run. :param :class:`Action` action: the action to try Returns `True` if the :class:`Action` needs to run or `False` otherwise. """ need_run = False # Is/has the action already ran? if action.status is not ActionEvent.CREATE: return False # Try to see if the target exists try: my_st = os.stat(action.target) except OSError: need_run = True # Do all our dependencies satisfy? for src in action.sources: try: if self.actions[src].status != ActionEvent.FINISH: return False except KeyError: pass # See if their timestamp is larger than ours try: st = os.stat(src) if not need_run and st.st_mtime > my_st.st_mtime: need_run = True except OSError: pass # Allow override to enfore full build from command line if CommandLine.Instance().args.force or need_run: return True # None of the sources is updated and we exist. Don't build. action.status = ActionEvent.FINISH del self.pending[action.target] return False
[docs]class ActionEvent: """ Represents an event which occurred for an :class:`.Action` """ CREATE = 'create' EXECUTE = 'execute' FINISH = 'finish' def __init__(self, worker, target, event_type, result = None): """ Constructor :param str worker: the name of the :class:`Worker` that caused the event :param str target: target of the :class:`Action` for this event :param str event_type: type of event :param int result: exit code of the :class:`Action` """ self.worker = worker self.target = target self.type = event_type self.result = result self.time = datetime.datetime.now() def __str__(self): """ Convert to string representation """ return 'ActionEvent.' + self.type.upper() + ' : ' + self.target + \ ' @ worker[' + self.worker + '] type=' + self.type + \ ' result=' + str(self.result) + ' time=' + str(self.time) def __repr__(self): """ Convert to short string representation """ return self.__str__()
[docs]class Action: """ Represents an executable action. """ def __init__(self, target, sources, command, tags, builder): """ Constructor :param str target: target file for the action :param list sources: a `list` of source files as dependency :param str command: Either a `str` with a shell command or a python function :param dict tags: Dictionary with parameters called tags :param :class:`Plugin` builder: the builder that generated this action """ self.target = target self.sources = sources self.command = command self.tags = tags self.builder = builder self.status = ActionEvent.CREATE def __call__(self): """ Execute the action """ # If the command is a python function, run it. if callable(self.command): return self.command(self) # If quiet mode is set, do not show any output if 'quiet' in self.tags and self.tags['quiet']: try: subprocess.check_output(self.command, stderr=subprocess.PIPE, shell=True) return 0 except subprocess.CalledProcessError as e: return e.returncode else: return os.system(self.command) def __str__(self): """ Convert to string representation """ return self.target + " <<< sources=" + \ str(self.sources) + " status=" + self.status + " : [" + str(self.command) + "]" def __repr__(self): """ Convert to short string representation """ return self.__str__()
[docs]class ActionManager(object): """ Manages all :class:`.Action` objects registered for execution. """ def __init__(self): """ Constructor """ self.log = logging.getLogger(__name__) self.actions = {}
[docs] def submit(self, target, sources, command, tags, builder): """ Submit a new :class:`.Action` for execution :param str target: target file for the action :param list sources: a `list` of source files as dependency :param str command: Either a `str` with a shell command or a python function :param dict tags: Dictionary with parameters called tags :param :class:`Plugin` builder: the builder that generated this action """ if target in self.actions: raise Exception("target " + target + " already submitted") self.actions[target] = Action(target, sources, command, tags, builder) self.log.debug("submitted: " + str(self.actions[target]))
[docs] def run(self, clean = False): """ Run all registered :class:`.Action` objects :param bool clean: True to remove the targets. False to execute. """ if clean: for action in self.actions.values(): self.log.debug("removing: " + action.target) try: os.remove(action.target) except OSError: pass else: # Allow output plugins self.workers = WorkerManager(self.actions) self.workers.execute() self.workers = None self.actions.clear()