-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathwatch_manager.py
412 lines (369 loc) · 16.6 KB
/
watch_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
"""
WatchManager for Sync Watch Logic
"""
import logging
import platform
import threading
import time
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Set
from watchdog.events import EVENT_TYPE_MODIFIED, EVENT_TYPE_OPENED, FileSystemEvent
from samcli.lib.providers.exceptions import InvalidTemplateFile, MissingCodeUri, MissingLocalDefinition
from samcli.lib.providers.provider import ResourceIdentifier, Stack, get_all_resource_ids
from samcli.lib.providers.sam_stack_provider import SamLocalStackProvider
from samcli.lib.sync.continuous_sync_flow_executor import ContinuousSyncFlowExecutor
from samcli.lib.sync.exceptions import InfraSyncRequiredError, MissingPhysicalResourceError, SyncFlowException
from samcli.lib.sync.infra_sync_executor import InfraSyncExecutor, InfraSyncResult
from samcli.lib.sync.sync_flow_factory import SyncFlowFactory
from samcli.lib.utils.code_trigger_factory import CodeTriggerFactory
from samcli.lib.utils.colors import Colored, Colors
from samcli.lib.utils.path_observer import HandlerObserver
from samcli.lib.utils.resource_trigger import OnChangeCallback, TemplateTrigger
from samcli.local.lambdafn.exceptions import ResourceNotFound
if TYPE_CHECKING: # pragma: no cover
from samcli.commands.build.build_context import BuildContext
from samcli.commands.deploy.deploy_context import DeployContext
from samcli.commands.package.package_context import PackageContext
from samcli.commands.sync.sync_context import SyncContext
DEFAULT_WAIT_TIME = 1
LOG = logging.getLogger(__name__)
class WatchManager:
_stacks: Optional[List[Stack]]
_template: str
_build_context: "BuildContext"
_package_context: "PackageContext"
_deploy_context: "DeployContext"
_sync_context: "SyncContext"
_sync_flow_factory: Optional[SyncFlowFactory]
_sync_flow_executor: ContinuousSyncFlowExecutor
_executor_thread: Optional[threading.Thread]
_observer: HandlerObserver
_trigger_factory: Optional[CodeTriggerFactory]
_waiting_infra_sync: bool
_color: Colored
_auto_dependency_layer: bool
_disable_infra_syncs: bool
def __init__(
self,
template: str,
build_context: "BuildContext",
package_context: "PackageContext",
deploy_context: "DeployContext",
sync_context: "SyncContext",
auto_dependency_layer: bool,
disable_infra_syncs: bool,
watch_exclude: Dict[str, List[str]],
):
"""Manager for sync watch execution logic.
This manager will observe template and its code resources.
Automatically execute infra/code syncs when changes are detected.
Parameters
----------
template : str
Template file path
build_context : BuildContext
BuildContext
package_context : PackageContext
PackageContext
deploy_context : DeployContext
DeployContext
"""
self._stacks = None
self._template = template
self._build_context = build_context
self._package_context = package_context
self._deploy_context = deploy_context
self._sync_context = sync_context
self._auto_dependency_layer = auto_dependency_layer
self._disable_infra_syncs = disable_infra_syncs
self._sync_flow_factory = None
self._sync_flow_executor = ContinuousSyncFlowExecutor()
self._executor_thread = None
self._observer = HandlerObserver()
self._trigger_factory = None
self._waiting_infra_sync = False
self._color = Colored()
self._watch_exclude = watch_exclude
def queue_infra_sync(self) -> None:
"""Queue up an infra structure sync.
A simple bool flag is suffice
"""
if self._disable_infra_syncs:
LOG.info(
self._color.color_log(
msg="You have enabled the --code flag, which limits sam sync updates to code changes only. To do a "
"complete infrastructure and code sync, remove the --code flag.",
color=Colors.WARNING,
),
extra=dict(markup=True),
)
return
self._waiting_infra_sync = True
def _update_stacks(self) -> None:
"""
Reloads template and its stacks.
Update all other member that also depends on the stacks.
This should be called whenever there is a change to the template.
"""
self._stacks = SamLocalStackProvider.get_stacks(self._template, use_sam_transform=False)[0]
self._sync_flow_factory = SyncFlowFactory(
self._build_context,
self._deploy_context,
self._sync_context,
self._stacks,
self._auto_dependency_layer,
)
self._sync_flow_factory.load_physical_id_mapping()
self._trigger_factory = CodeTriggerFactory(self._stacks, Path(self._build_context.base_dir))
def _add_code_triggers(self) -> None:
"""Create CodeResourceTrigger for all resources and add their handlers to observer"""
if not self._stacks or not self._trigger_factory:
return
resource_ids = get_all_resource_ids(self._stacks)
for resource_id in resource_ids:
try:
additional_excludes = self._watch_exclude.get(str(resource_id), [])
trigger = self._trigger_factory.create_trigger(
resource_id, self._on_code_change_wrapper(resource_id), additional_excludes
)
except (MissingCodeUri, MissingLocalDefinition):
LOG.warning(
self._color.color_log(
msg="CodeTrigger not created as CodeUri or DefinitionUri is missing for %s.",
color=Colors.WARNING,
),
str(resource_id),
extra=dict(markup=True),
)
continue
except ResourceNotFound:
LOG.warning(
self._color.color_log(
msg="CodeTrigger not created as %s is not found or is with a S3 Location.",
color=Colors.WARNING,
),
str(resource_id),
extra=dict(markup=True),
)
continue
if not trigger:
continue
self._observer.schedule_handlers(trigger.get_path_handlers())
def _add_template_triggers(self) -> None:
"""Create TemplateTrigger and add its handlers to observer"""
stacks = SamLocalStackProvider.get_stacks(self._template, use_sam_transform=False)[0]
for stack in stacks:
template = stack.location
template_trigger = TemplateTrigger(template, stack.name, lambda _=None: self.queue_infra_sync())
try:
template_trigger.validate_template()
except InvalidTemplateFile:
LOG.warning(
self._color.color_log(msg="Template validation failed for %s in %s", color=Colors.WARNING),
template,
stack.name,
extra=dict(markup=True),
)
self._observer.schedule_handlers(template_trigger.get_path_handlers())
def _execute_infra_context(self, first_sync: bool = False) -> InfraSyncResult:
"""Execute infrastructure sync
Returns
----------
InfraSyncResult
Returns information containing whether infra sync executed plus resources to do code sync on
"""
self._infra_sync_executor = InfraSyncExecutor(
self._build_context, self._package_context, self._deploy_context, self._sync_context
)
return self._infra_sync_executor.execute_infra_sync(first_sync)
def _start_code_sync(self) -> None:
"""Start SyncFlowExecutor in a separate thread."""
if not self._executor_thread or not self._executor_thread.is_alive():
self._executor_thread = threading.Thread(
target=lambda: self._sync_flow_executor.execute(
exception_handler=self._watch_sync_flow_exception_handler
)
)
self._executor_thread.start()
def _stop_code_sync(self) -> None:
"""Blocking call that stops SyncFlowExecutor and waits for it to finish."""
if self._executor_thread and self._executor_thread.is_alive():
self._sync_flow_executor.stop()
self._executor_thread.join()
def start(self) -> None:
"""Start WatchManager and watch for changes to the template and its code resources."""
# The actual execution is done in _start()
# This is a wrapper for gracefully handling Ctrl+C or other termination cases.
try:
self.queue_infra_sync()
if self._disable_infra_syncs:
self._start_sync()
LOG.info(
self._color.color_log(msg="Sync watch started.", color=Colors.SUCCESS), extra=dict(markup=True)
)
self._start()
except KeyboardInterrupt:
LOG.info(
self._color.color_log(msg="Shutting down sync watch...", color=Colors.PROGRESS), extra=dict(markup=True)
)
self._observer.stop()
self._stop_code_sync()
LOG.info(self._color.color_log(msg="Sync watch stopped.", color=Colors.SUCCESS), extra=dict(markup=True))
def _start(self) -> None:
"""Start WatchManager and watch for changes to the template and its code resources."""
first_sync = True
self._observer.start()
while True:
if self._waiting_infra_sync:
self._execute_infra_sync(first_sync)
first_sync = False
time.sleep(1)
def _start_sync(self) -> None:
"""Update stacks and populate all triggers"""
self._observer.unschedule_all()
self._update_stacks()
self._add_template_triggers()
self._add_code_triggers()
self._start_code_sync()
def _execute_infra_sync(self, first_sync: bool = False) -> None:
"""Logic to execute infra sync."""
LOG.info(
self._color.color_log(
msg="Queued infra sync. Waiting for in progress code syncs to complete...", color=Colors.PROGRESS
),
extra=dict(markup=True),
)
self._waiting_infra_sync = False
self._stop_code_sync()
try:
LOG.info(self._color.color_log(msg="Starting infra sync.", color=Colors.PROGRESS), extra=dict(markup=True))
infra_sync_result = self._execute_infra_context(first_sync)
except Exception as e:
LOG.error(
self._color.color_log(
msg="Failed to sync infra. Code sync is paused until template/stack is fixed.", color=Colors.FAILURE
),
exc_info=e,
extra=dict(markup=True),
)
# Unschedule all triggers and only add back the template one as infra sync is incorrect.
self._observer.unschedule_all()
self._add_template_triggers()
else:
# Update stacks and repopulate triggers
# Trigger are not removed until infra sync is finished as there
# can be code changes during infra sync.
self._start_sync()
if not infra_sync_result.infra_sync_executed:
# This is for initiating code sync for all resources
# To improve: only initiate code syncs for ones with template changes
self._queue_up_code_syncs(infra_sync_result.code_sync_resources)
LOG.info(
self._color.color_log(
msg="Skipped infra sync as the local template is in sync with the cloud template.",
color=Colors.SUCCESS,
),
extra=dict(markup=True),
)
if len(infra_sync_result.code_sync_resources) != 0:
LOG.info("Required code syncs are queued up.")
else:
LOG.info(
self._color.color_log(msg="Infra sync completed.", color=Colors.SUCCESS), extra=dict(markup=True)
)
def _queue_up_code_syncs(self, resource_ids_with_code_sync: Set[ResourceIdentifier]) -> None:
"""
For ther given resource IDs, create sync flow tasks in the queue
Parameters
----------
resource_ids_with_code_sync: Set[ResourceIdentifier]
The set of resource IDs to be synced
"""
if not self._sync_flow_factory:
return
for resource_id in resource_ids_with_code_sync:
sync_flow = self._sync_flow_factory.create_sync_flow(resource_id, self._build_context.build_result)
if sync_flow:
self._sync_flow_executor.add_delayed_sync_flow(sync_flow)
def _on_code_change_wrapper(self, resource_id: ResourceIdentifier) -> OnChangeCallback:
"""Wrapper method that generates a callback for code changes.
Parameters
----------
resource_id : ResourceIdentifier
Resource that associates to the callback
Returns
-------
OnChangeCallback
Callback function
"""
def on_code_change(event: Optional[FileSystemEvent] = None) -> None:
"""
Custom event handling to create a new sync flow if a file was modified.
Parameters
----------
event: Optional[FileSystemEvent]
The event that triggered the change
"""
if event and event.event_type == EVENT_TYPE_OPENED:
# Ignore all file opened events since this event is
# added in addition to a create or modified event,
# causing an infinite loop of sync flow creations
LOG.debug("Ignoring file system OPENED event")
return
if (
platform.system().lower() == "linux"
and event
and event.event_type == EVENT_TYPE_MODIFIED
and event.is_directory
):
# Linux machines appear to emit an additional event when
# a file gets updated; a folder modfied event
# If folder/file.txt gets updated, there will be two events:
# 1. file.txt modified event
# 2. folder modified event
# We want to ignore the second event
#
# It looks like the other way a folder modified event can happen
# is if the permissions of the folder were changed
LOG.debug(f"Ignoring file system MODIFIED event for folder {event.src_path!r}")
return
# sync flow factory should always exist, but guarding just incase
if not self._sync_flow_factory:
LOG.debug("Sync flow factory not defined, skipping trigger")
return
sync_flow = self._sync_flow_factory.create_sync_flow(resource_id)
if sync_flow and not self._waiting_infra_sync:
self._sync_flow_executor.add_delayed_sync_flow(sync_flow, dedup=True, wait_time=DEFAULT_WAIT_TIME)
return on_code_change
def _watch_sync_flow_exception_handler(self, sync_flow_exception: SyncFlowException) -> None:
"""Exception handler for watch.
Simply logs unhandled exceptions instead of failing the entire process.
Parameters
----------
sync_flow_exception : SyncFlowException
SyncFlowException
"""
exception = sync_flow_exception.exception
if isinstance(exception, MissingPhysicalResourceError):
LOG.warning(
self._color.color_log(
msg="Missing physical resource. Infra sync will be started.", color=Colors.WARNING
),
extra=dict(markup=True),
)
self.queue_infra_sync()
elif isinstance(exception, InfraSyncRequiredError):
LOG.warning(
self._color.yellow(
f"Infra sync is required for {exception.resource_identifier} due to: "
+ f"{exception.reason}. Infra sync will be started."
),
extra=dict(markup=True),
)
self.queue_infra_sync()
else:
LOG.error(
self._color.color_log(msg="Code sync encountered an error.", color=Colors.FAILURE),
exc_info=exception,
extra=dict(markup=True),
)