Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alesnovak-s1 committed Oct 16, 2024
1 parent a5d0a15 commit ea66872
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions scalyr_agent/copying_manager/copying_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ def get_path(self, path):

def get(self, path, worker_id, default=None):
# type: (six.text_type, six.text_type, Optional[object]) -> Optional[object]

if path not in self.__paths:
return default

return self.__paths[path].get(worker_id, default)

def complement_keys(self, path, worker_ids):
Expand Down Expand Up @@ -435,7 +439,13 @@ def keys(self):

def pop(self, path, worker_id, default=None):
# type: (six.text_type, six.text_type, Optional[object]) -> Optional[object]
return self.__paths[path].pop(worker_id, default)

value = self.__paths[path].pop(worker_id, default)

if not self.__paths[path]:
self.__paths.pop(path)

return value

def __len__(self):
return len(list(self.keys()))
Expand Down Expand Up @@ -509,6 +519,8 @@ def __init__(self, configuration, monitors):
# log file has been processed yet
self.__logs_pending_removal = PathWorkerIdDict()

self.__logs_matcher_finishing = PathWorkerIdDict()

# Dict[Tuple(str, str), Dict]
# # a dict of (log path (str), processing CopyingManagerWorker ID (str)) -> Log config (Dict)
# a dict of log_configs keyed by log_path for logs with configs that need to be reloaded.
Expand All @@ -530,7 +542,7 @@ def __init__(self, configuration, monitors):

# A lock that protects the status variables and the __log_matchers variable, the only variables that
# are access in generate_status() which needs to be thread safe.
self.__lock = threading.Lock()
self.__lock = threading.RLock()

# The last time we scanned for new files that match the __log_matchers.
self.__last_new_file_scan_time = 0
Expand Down Expand Up @@ -694,9 +706,23 @@ def add_log_config(self, monitor_name, log_config, force_add=False):
)
self.__logs_pending_removal.pop(path, worker_id)
return log_config
elif force_add and self.__logs_matcher_finishing.contains(path, worker_id):
log.log(
scalyr_logging.DEBUG_LEVEL_0,
"Tried to add new log file (path='%s', worker_id='%s') for monitor '%s', but it is already being monitored by '%s' "
"and scheduled for removal. Canceling scheduled removal and ensuring log file is continue "
"to be monitored."
% (
path,
worker_id,
monitor_name,
self.__dynamic_paths.get(path, worker_id),
),
)

self.__remove_log_path_one_worker(monitor_name, path, worker_id)
# Make sure the path isn't already being dynamically monitored
if self.__dynamic_paths.contains(path, worker_id):
elif self.__dynamic_paths.contains(path, worker_id):
log.log(
scalyr_logging.DEBUG_LEVEL_0,
"Tried to add new log file '%s' for monitor '%s', but it is already being monitored by '%s'"
Expand Down Expand Up @@ -861,6 +887,7 @@ def __remove_log_path_one_worker(self, monitor_name, log_path, worker_id):

self._log_matchers[:] = matchers
self.__logs_pending_removal.pop(log_path, worker_id)
self.__logs_matcher_finishing.pop(log_path, worker_id)
self.__logs_pending_reload.pop(log_path, worker_id)
self.__dynamic_paths.pop(log_path, worker_id)

Expand Down Expand Up @@ -1323,10 +1350,7 @@ def __remove_logs_scheduled_for_deletion(self):
# so we can iterate without a lock (remove_log_path also acquires the lock so best
# not to do that while the lock is already aquired
self.__lock.acquire()
try:
pending_removal = self.__logs_pending_removal.copy()
finally:
self.__lock.release()
pending_removal = self.__logs_pending_removal.copy()

# if we have a log matcher for the path, then set it to finished
for path, worker_id in pending_removal.keys():
Expand All @@ -1336,13 +1360,11 @@ def __remove_logs_scheduled_for_deletion(self):
continue

matcher.finish()
self.__logs_matcher_finishing.set(path, worker_id, matcher)

# remove from list of logs pending removal
self.__lock.acquire()
try:
self.__logs_pending_removal = PathWorkerIdDict()
finally:
self.__lock.release()
self.__logs_pending_removal = PathWorkerIdDict()
self.__lock.release()

def __purge_finished_log_matchers(self):
# type: () -> int
Expand All @@ -1360,6 +1382,7 @@ def __purge_finished_log_matchers(self):
if m.is_finished():
self.__remove_log_path_one_worker(SCHEDULED_DELETION, path, worker_id)
self.__dynamic_matchers.pop(path, worker_id, None)
self.__logs_matcher_finishing.pop(path, worker_id)
removed += 1

return removed
Expand Down

0 comments on commit ea66872

Please sign in to comment.