-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathcompanion_stack_manager.py
317 lines (278 loc) · 12.3 KB
/
companion_stack_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
"""
Companion stack manager
"""
import logging
from typing import Dict, List, Optional
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError, NoCredentialsError, NoRegionError
from mypy_boto3_cloudformation.client import CloudFormationClient
from mypy_boto3_cloudformation.type_defs import WaiterConfigTypeDef
from mypy_boto3_s3.client import S3Client
from samcli.commands.exceptions import AWSServiceClientError, RegionError
from samcli.lib.bootstrap.companion_stack.companion_stack_builder import CompanionStackBuilder
from samcli.lib.bootstrap.companion_stack.data_types import CompanionStack, ECRRepo
from samcli.lib.package.artifact_exporter import mktempfile
from samcli.lib.package.s3_uploader import S3Uploader
from samcli.lib.providers.sam_function_provider import SamFunctionProvider
from samcli.lib.providers.sam_stack_provider import SamLocalStackProvider
from samcli.lib.utils.packagetype import IMAGE
from samcli.lib.utils.s3 import parse_s3_url
LOG = logging.getLogger(__name__)
class CompanionStackManager:
"""
Manager class for a companion stack
Used to create/update the remote stack
"""
_companion_stack: CompanionStack
_builder: CompanionStackBuilder
_boto_config: Config
_update_stack_waiter_config: WaiterConfigTypeDef
_delete_stack_waiter_config: WaiterConfigTypeDef
_s3_bucket: str
_s3_prefix: str
_cfn_client: CloudFormationClient
_s3_client: S3Client
def __init__(self, stack_name, region, s3_bucket, s3_prefix):
self._companion_stack = CompanionStack(stack_name)
self._builder = CompanionStackBuilder(self._companion_stack)
self._boto_config = Config(region_name=region if region else None)
self._update_stack_waiter_config = {"Delay": 10, "MaxAttempts": 120}
self._delete_stack_waiter_config = {"Delay": 10, "MaxAttempts": 120}
self._s3_bucket = s3_bucket
self._s3_prefix = s3_prefix
try:
self._cfn_client = boto3.client("cloudformation", config=self._boto_config)
self._ecr_client = boto3.client("ecr", config=self._boto_config)
self._s3_client = boto3.client("s3", config=self._boto_config)
self._account_id = boto3.client("sts").get_caller_identity().get("Account")
self._region_name = self._cfn_client.meta.region_name
except NoCredentialsError as ex:
raise AWSServiceClientError(
"Error Setting Up Managed Stack Client: Unable to resolve "
"credentials for the AWS SDK for Python client. "
"Please see their documentation for options to pass in credentials: "
"https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html"
) from ex
except NoRegionError as ex:
raise RegionError(
"Error Setting Up Managed Stack Client: Unable to resolve a region. "
"Please provide a region via the --region parameter or by the AWS_DEFAULT_REGION environment variable."
) from ex
def set_functions(
self, function_logical_ids: List[str], image_repositories: Optional[Dict[str, str]] = None
) -> None:
"""
Sets functions that need to have ECR repos created
Parameters
----------
function_logical_ids: List[str]
Function logical IDs that need to have ECR repos created
image_repositories: Optional[Dict[str, str]]
Optional image repository mapping. Functions with non-auto-ecr URIs
will be ignored.
"""
self._builder.clear_functions()
if image_repositories is None:
image_repositories = dict()
for function_logical_id in function_logical_ids:
if function_logical_id not in image_repositories or self.is_repo_uri(
image_repositories.get(function_logical_id), function_logical_id
):
self._builder.add_function(function_logical_id)
def update_companion_stack(self) -> None:
"""
Blocking call to create or update the companion stack based on current functions
Companion stack template will be updated to the s3 bucket first before deployment
"""
if not self._builder.repo_mapping:
return
stack_name = self._companion_stack.stack_name
template = self._builder.build()
with mktempfile() as temporary_file:
temporary_file.write(template)
temporary_file.flush()
s3_uploader = S3Uploader(
self._s3_client, bucket_name=self._s3_bucket, prefix=self._s3_prefix, no_progressbar=True
)
# TemplateUrl property requires S3 URL to be in path-style format
parts = parse_s3_url(
s3_uploader.upload_with_dedup(temporary_file.name, "template"), version_property="Version"
)
template_url = s3_uploader.to_path_style_s3_url(parts["Key"], parts.get("Version", None))
exists = self.does_companion_stack_exist()
if exists:
self._cfn_client.update_stack(
StackName=stack_name, TemplateURL=template_url, Capabilities=["CAPABILITY_AUTO_EXPAND"]
)
update_waiter = self._cfn_client.get_waiter("stack_update_complete")
update_waiter.wait(StackName=stack_name, WaiterConfig=self._update_stack_waiter_config)
else:
self._cfn_client.create_stack(
StackName=stack_name, TemplateURL=template_url, Capabilities=["CAPABILITY_AUTO_EXPAND"]
)
create_waiter = self._cfn_client.get_waiter("stack_create_complete")
create_waiter.wait(StackName=stack_name, WaiterConfig=self._update_stack_waiter_config)
def _delete_companion_stack(self) -> None:
"""
Blocking call to delete the companion stack
"""
stack_name = self._companion_stack.stack_name
waiter = self._cfn_client.get_waiter("stack_delete_complete")
self._cfn_client.delete_stack(StackName=stack_name)
waiter.wait(StackName=stack_name, WaiterConfig=self._delete_stack_waiter_config)
def list_deployed_repos(self) -> List[ECRRepo]:
"""
List deployed ECR repos for this companion stack
Not using create_change_set as it is slow.
Returns
-------
List[ECRRepo]
List of ECR repos deployed for this companion stack
Returns empty list if companion stack does not exist
"""
if not self.does_companion_stack_exist():
return []
repos: List[ECRRepo] = list()
stack = boto3.resource("cloudformation", config=self._boto_config).Stack(self._companion_stack.stack_name)
for resource in stack.resource_summaries.all():
if resource.resource_type == "AWS::ECR::Repository":
repos.append(
ECRRepo(logical_id=resource.logical_resource_id, physical_id=resource.physical_resource_id)
)
return repos
def get_unreferenced_repos(self) -> List[ECRRepo]:
"""
List deployed ECR repos that is not referenced by current list of functions
Returns
-------
List[ECRRepo]
List of deployed ECR repos that is not referenced by current list of functions
Returns empty list if companion stack does not exist
"""
if not self.does_companion_stack_exist():
return []
deployed_repos: List[ECRRepo] = self.list_deployed_repos()
current_mapping = self._builder.repo_mapping
unreferenced_repos: List[ECRRepo] = list()
for deployed_repo in deployed_repos:
for _, current_repo in current_mapping.items():
if current_repo.logical_id == deployed_repo.logical_id:
break
else:
unreferenced_repos.append(deployed_repo)
return unreferenced_repos
def delete_unreferenced_repos(self) -> None:
"""
Blocking call to delete all deployed ECR repos that are unreferenced by a function
If repo does not exist, this will simply skip it.
"""
repos = self.get_unreferenced_repos()
for repo in repos:
try:
self._ecr_client.delete_repository(repositoryName=repo.physical_id, force=True)
except self._ecr_client.exceptions.RepositoryNotFoundException:
LOG.debug("Image repo [%s] not found in companion stack. Skipping deletion.", repo.physical_id)
def sync_repos(self) -> None:
"""
Blocking call to sync companion stack with the following actions
Creates the stack if it does not exist, and updates it if it does.
Deletes unreferenced repos if they exist.
Deletes companion stack if there isn't any repo left.
"""
has_repo = bool(self.get_repository_mapping())
if self.does_companion_stack_exist():
self.delete_unreferenced_repos()
if has_repo:
self.update_companion_stack()
else:
self._delete_companion_stack()
elif has_repo:
self.update_companion_stack()
def does_companion_stack_exist(self) -> bool:
"""
Does companion stack exist
Returns
-------
bool
Returns True if companion stack exists
"""
try:
self._cfn_client.describe_stacks(StackName=self._companion_stack.stack_name)
return True
except ClientError as e:
error_message = e.response.get("Error", {}).get("Message")
if error_message == f"Stack with id {self._companion_stack.stack_name} does not exist":
return False
raise e
def get_repository_mapping(self) -> Dict[str, str]:
"""
Get current function to repo mapping
Returns
-------
Dict[str, str]
Dictionary with key as function logical ID and value as ECR repo URI.
"""
return dict((k, self.get_repo_uri(v)) for (k, v) in self._builder.repo_mapping.items())
def get_repo_uri(self, repo: ECRRepo) -> str:
"""
Get repo URI for a ECR repo
Parameters
----------
repo: ECRRepo
Returns
-------
str
ECR repo URI based on account ID and region.
"""
return repo.get_repo_uri(self._account_id, self._region_name)
def is_repo_uri(self, repo_uri: Optional[str], function_logical_id: str) -> bool:
"""
Check whether repo URI is a companion stack repo
Parameters
----------
repo_uri: str
Repo URI to be checked.
function_logical_id: str
Function logical ID associated with the image repo.
Returns
-------
bool
Returns True if repo_uri is a companion stack repo.
"""
return repo_uri == self.get_repo_uri(ECRRepo(self._companion_stack, function_logical_id))
def sync_ecr_stack(
template_file: str, stack_name: str, region: str, s3_bucket: str, s3_prefix: str, image_repositories: Dict[str, str]
) -> Dict[str, str]:
"""Blocking call to sync local functions with ECR Companion Stack
Parameters
----------
template_file : str
Template file path.
stack_name : str
Stack name
region : str
AWS region
s3_bucket : str
S3 bucket
s3_prefix : str
S3 prefix for the bucket
image_repositories : Dict[str, str]
Mapping between function logical ID and ECR URI
Returns
-------
Dict[str, str]
Updated mapping of image_repositories. Auto ECR URIs are added
for Functions without a repo specified.
"""
image_repositories = image_repositories.copy() if image_repositories else {}
manager = CompanionStackManager(stack_name, region, s3_bucket, s3_prefix)
stacks = SamLocalStackProvider.get_stacks(template_file)[0]
function_provider = SamFunctionProvider(stacks, ignore_code_extraction_warnings=True)
function_logical_ids = [
function.full_path for function in function_provider.get_all() if function.packagetype == IMAGE
]
manager.set_functions(function_logical_ids, image_repositories)
image_repositories.update(manager.get_repository_mapping())
manager.sync_repos()
return image_repositories