Source code for stacker.hooks.iam

from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from builtins import input
import copy
import logging

from stacker.session_cache import get_session
from botocore.exceptions import ClientError

from awacs.aws import Statement, Allow, Policy
from awacs import ecs
from awacs.helpers.trust import get_ecs_assumerole_policy

from . import utils

logger = logging.getLogger(__name__)


[docs]def create_ecs_service_role(provider, context, **kwargs): """Used to create the ecsServieRole, which has to be named exactly that currently, so cannot be created via CloudFormation. See: http://docs.aws.amazon.com/AmazonECS/latest/developerguide/IAM_policies.html#service_IAM_role Args: provider (:class:`stacker.providers.base.BaseProvider`): provider instance context (:class:`stacker.context.Context`): context instance Returns: boolean for whether or not the hook succeeded. """ role_name = kwargs.get("role_name", "ecsServiceRole") client = get_session(provider.region).client('iam') try: client.create_role( RoleName=role_name, AssumeRolePolicyDocument=get_ecs_assumerole_policy().to_json() ) except ClientError as e: if "already exists" in str(e): pass else: raise policy = Policy( Version='2012-10-17', Statement=[ Statement( Effect=Allow, Resource=["*"], Action=[ecs.CreateCluster, ecs.DeregisterContainerInstance, ecs.DiscoverPollEndpoint, ecs.Poll, ecs.Action("Submit*")] ) ]) client.put_role_policy( RoleName=role_name, PolicyName="AmazonEC2ContainerServiceRolePolicy", PolicyDocument=policy.to_json() ) return True
def _get_cert_arn_from_response(response): result = copy.deepcopy(response) # GET response returns this extra key if "ServerCertificate" in response: result = response["ServerCertificate"] return result["ServerCertificateMetadata"]["Arn"]
[docs]def get_cert_contents(kwargs): """Builds parameters with server cert file contents. Args: kwargs(dict): The keyword args passed to ensure_server_cert_exists, optionally containing the paths to the cert, key and chain files. Returns: dict: A dictionary containing the appropriate parameters to supply to upload_server_certificate. An empty dictionary if there is a problem. """ paths = { "certificate": kwargs.get("path_to_certificate"), "private_key": kwargs.get("path_to_private_key"), "chain": kwargs.get("path_to_chain"), } for key, value in paths.items(): if value is not None: continue path = input("Path to %s (skip): " % (key,)) if path == "skip" or not path.strip(): continue paths[key] = path parameters = { "ServerCertificateName": kwargs.get("cert_name"), } for key, path in paths.items(): if not path: continue # Allow passing of file like object for tests try: contents = path.read() except AttributeError: with open(utils.full_path(path)) as read_file: contents = read_file.read() if key == "certificate": parameters["CertificateBody"] = contents elif key == "private_key": parameters["PrivateKey"] = contents elif key == "chain": parameters["CertificateChain"] = contents return parameters
[docs]def ensure_server_cert_exists(provider, context, **kwargs): client = get_session(provider.region).client('iam') cert_name = kwargs["cert_name"] status = "unknown" try: response = client.get_server_certificate( ServerCertificateName=cert_name ) cert_arn = _get_cert_arn_from_response(response) status = "exists" logger.info("certificate exists: %s (%s)", cert_name, cert_arn) except ClientError: if kwargs.get("prompt", True): upload = input( "Certificate '%s' wasn't found. Upload it now? (yes/no) " % ( cert_name, ) ) if upload != "yes": return False parameters = get_cert_contents(kwargs) if not parameters: return False response = client.upload_server_certificate(**parameters) cert_arn = _get_cert_arn_from_response(response) status = "uploaded" logger.info( "uploaded certificate: %s (%s)", cert_name, cert_arn, ) return { "status": status, "cert_name": cert_name, "cert_arn": cert_arn, }