from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from builtins import object
import os
import logging
import time
import uuid
import threading
from .util import stack_template_key_name
from .exceptions import (
GraphError,
PlanFailed,
)
from .ui import ui
from .dag import DAG, DAGValidationError, walk
from .status import (
FailedStatus,
PENDING,
SUBMITTED,
COMPLETE,
SKIPPED,
FAILED,
)
logger = logging.getLogger(__name__)
COLOR_CODES = {
SUBMITTED.code: 33, # yellow
COMPLETE.code: 32, # green
FAILED.code: 31, # red
}
[docs]def log_step(step):
msg = "%s: %s" % (step, step.status.name)
if step.status.reason:
msg += " (%s)" % (step.status.reason)
color_code = COLOR_CODES.get(step.status.code, 37)
ui.info(msg, extra={"color": color_code})
[docs]class Step(object):
"""State machine for executing generic actions related to stacks.
Args:
stack (:class:`stacker.stack.Stack`): the stack associated
with this step
fn (func): the function to run to execute the step. This function will
be ran multiple times until the step is "done".
watch_func (func): an optional function that will be called to "tail"
the step action.
"""
def __init__(self, stack, fn, watch_func=None):
self.stack = stack
self.status = PENDING
self.last_updated = time.time()
self.fn = fn
self.watch_func = watch_func
def __repr__(self):
return "<stacker.plan.Step:%s>" % (self.stack.name,)
def __str__(self):
return self.stack.name
[docs] def run(self):
"""Runs this step until it has completed successfully, or been
skipped.
"""
stop_watcher = threading.Event()
watcher = None
if self.watch_func:
watcher = threading.Thread(
target=self.watch_func,
args=(self.stack, stop_watcher)
)
watcher.start()
try:
while not self.done:
self._run_once()
finally:
if watcher:
stop_watcher.set()
watcher.join()
return self.ok
def _run_once(self):
try:
status = self.fn(self.stack, status=self.status)
except Exception as e:
logger.exception(e)
status = FailedStatus(reason=str(e))
self.set_status(status)
return status
@property
def name(self):
return self.stack.name
@property
def requires(self):
return self.stack.requires
@property
def required_by(self):
return self.stack.required_by
@property
def completed(self):
"""Returns True if the step is in a COMPLETE state."""
return self.status == COMPLETE
@property
def skipped(self):
"""Returns True if the step is in a SKIPPED state."""
return self.status == SKIPPED
@property
def failed(self):
"""Returns True if the step is in a FAILED state."""
return self.status == FAILED
@property
def done(self):
"""Returns True if the step is finished (either COMPLETE, SKIPPED or FAILED)
"""
return self.completed or self.skipped or self.failed
@property
def ok(self):
"""Returns True if the step is finished (either COMPLETE or SKIPPED)"""
return self.completed or self.skipped
@property
def submitted(self):
"""Returns True if the step is SUBMITTED, COMPLETE, or SKIPPED."""
return self.status >= SUBMITTED
[docs] def set_status(self, status):
"""Sets the current step's status.
Args:
status (:class:`Status <Status>` object): The status to set the
step to.
"""
if status is not self.status:
logger.debug("Setting %s state to %s.", self.stack.name,
status.name)
self.status = status
self.last_updated = time.time()
if self.stack.logging:
log_step(self)
[docs] def complete(self):
"""A shortcut for set_status(COMPLETE)"""
self.set_status(COMPLETE)
[docs] def skip(self):
"""A shortcut for set_status(SKIPPED)"""
self.set_status(SKIPPED)
[docs] def submit(self):
"""A shortcut for set_status(SUBMITTED)"""
self.set_status(SUBMITTED)
[docs]def build_plan(description, graph,
targets=None, reverse=False):
"""Builds a plan from a list of steps.
Args:
description (str): an arbitrary string to
describe the plan.
graph (:class:`Graph`): a list of :class:`Graph` to execute.
targets (list): an optional list of step names to filter the graph to.
If provided, only these steps, and their transitive dependencies
will be executed. If no targets are specified, every node in the
graph will be executed.
reverse (bool): If provided, the graph will be walked in reverse order
(dependencies last).
"""
# If we want to execute the plan in reverse (e.g. Destroy), transpose the
# graph.
if reverse:
graph = graph.transposed()
# If we only want to build a specific target, filter the graph.
if targets:
nodes = []
for target in targets:
for k, step in graph.steps.items():
if step.name == target:
nodes.append(step.name)
graph = graph.filtered(nodes)
return Plan(description=description, graph=graph)
[docs]def build_graph(steps):
"""Builds a graph of steps.
Args:
steps (list): a list of :class:`Step` objects to execute.
"""
graph = Graph()
for step in steps:
graph.add_step(step)
for step in steps:
for dep in step.requires:
graph.connect(step.name, dep)
for parent in step.required_by:
graph.connect(parent, step.name)
return graph
[docs]class Graph(object):
"""Graph represents a graph of steps.
The :class:`Graph` helps organize the steps needed to execute a particular
action for a set of :class:`stacker.stack.Stack` objects. When initialized
with a set of steps, it will first build a Directed Acyclic Graph from the
steps and their dependencies.
Example:
>>> dag = DAG()
>>> a = Step("a", fn=build)
>>> b = Step("b", fn=build)
>>> dag.add_step(a)
>>> dag.add_step(b)
>>> dag.connect(a, b)
Args:
steps (list): an optional list of :class:`Step` objects to execute.
dag (:class:`stacker.dag.DAG`): an optional :class:`stacker.dag.DAG`
object. If one is not provided, a new one will be initialized.
"""
def __init__(self, steps=None, dag=None):
self.steps = steps or {}
self.dag = dag or DAG()
[docs] def add_step(self, step):
self.steps[step.name] = step
self.dag.add_node(step.name)
[docs] def connect(self, step, dep):
try:
self.dag.add_edge(step, dep)
except KeyError as e:
raise GraphError(e, step, dep)
except DAGValidationError as e:
raise GraphError(e, step, dep)
[docs] def transitive_reduction(self):
self.dag.transitive_reduction()
[docs] def walk(self, walker, walk_func):
def fn(step_name):
step = self.steps[step_name]
return walk_func(step)
return walker(self.dag, fn)
[docs] def downstream(self, step_name):
"""Returns the direct dependencies of the given step"""
return list(self.steps[dep] for dep in self.dag.downstream(step_name))
[docs] def transposed(self):
"""Returns a "transposed" version of this graph. Useful for walking in
reverse.
"""
return Graph(steps=self.steps, dag=self.dag.transpose())
[docs] def filtered(self, step_names):
"""Returns a "filtered" version of this graph."""
return Graph(steps=self.steps, dag=self.dag.filter(step_names))
[docs] def topological_sort(self):
nodes = self.dag.topological_sort()
return [self.steps[step_name] for step_name in nodes]
[docs] def to_dict(self):
return self.dag.graph
[docs]class Plan(object):
"""A convenience class for working on a Graph.
Args:
description (str): description of the plan.
graph (:class:`Graph`): a graph of steps.
"""
def __init__(self, description, graph):
self.id = uuid.uuid4()
self.description = description
self.graph = graph
[docs] def outline(self, level=logging.INFO, message=""):
"""Print an outline of the actions the plan is going to take.
The outline will represent the rough ordering of the steps that will be
taken.
Args:
level (int, optional): a valid log level that should be used to log
the outline
message (str, optional): a message that will be logged to
the user after the outline has been logged.
"""
steps = 1
logger.log(level, "Plan \"%s\":", self.description)
for step in self.steps:
logger.log(
level,
" - step: %s: target: \"%s\", action: \"%s\"",
steps,
step.name,
step.fn.__name__,
)
steps += 1
if message:
logger.log(level, message)
[docs] def dump(self, directory, context, provider=None):
logger.info("Dumping \"%s\"...", self.description)
directory = os.path.expanduser(directory)
if not os.path.exists(directory):
os.makedirs(directory)
def walk_func(step):
step.stack.resolve(
context=context,
provider=provider,
)
blueprint = step.stack.blueprint
filename = stack_template_key_name(blueprint)
path = os.path.join(directory, filename)
blueprint_dir = os.path.dirname(path)
if not os.path.exists(blueprint_dir):
os.makedirs(blueprint_dir)
logger.info("Writing stack \"%s\" -> %s", step.name, path)
with open(path, "w") as f:
f.write(blueprint.rendered)
return True
return self.graph.walk(walk, walk_func)
[docs] def execute(self, *args, **kwargs):
"""Walks each step in the underlying graph, and raises an exception if
any of the steps fail.
Raises:
PlanFailed: Raised if any of the steps fail.
"""
self.walk(*args, **kwargs)
failed_steps = [step for step in self.steps if step.status == FAILED]
if failed_steps:
raise PlanFailed(failed_steps)
[docs] def walk(self, walker):
"""Walks each step in the underlying graph, in topological order.
Args:
walker (func): a walker function to be passed to
:class:`stacker.dag.DAG` to walk the graph.
"""
def walk_func(step):
# Before we execute the step, we need to ensure that it's
# transitive dependencies are all in an "ok" state. If not, we
# won't execute this step.
for dep in self.graph.downstream(step.name):
if not dep.ok:
step.set_status(FailedStatus("dependency has failed"))
return step.ok
return step.run()
return self.graph.walk(walker, walk_func)
@property
def steps(self):
steps = self.graph.topological_sort()
steps.reverse()
return steps
@property
def step_names(self):
return [step.name for step in self.steps]
[docs] def keys(self):
return self.step_names