diff --git a/pghoard/common.py b/pghoard/common.py index ee444b8c..05eb831a 100644 --- a/pghoard/common.py +++ b/pghoard/common.py @@ -24,6 +24,7 @@ from threading import Thread from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast) +from pydantic import BaseModel, ValidationError from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile from rohmu.errors import Error, InvalidConfigurationError from rohmu.typing import FileLike, HasName @@ -31,6 +32,7 @@ from pghoard import pgutil TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json" +PROGRESS_FILE: Final[str] = "persisted_progress_file.json" LOG = logging.getLogger("pghoard.common") @@ -100,6 +102,42 @@ class BaseBackupMode(StrEnum): pipe = "pipe" +class ProgressData(BaseModel): + current_progress: float + last_updated_time: float + + +class PersistedProgress(BaseModel): + progress: Dict[str, ProgressData] = {} + + @classmethod + def read_persisted_progress(cls) -> "PersistedProgress": + if os.path.exists(PROGRESS_FILE): + with open(PROGRESS_FILE, "r") as file: + try: + return cls.parse_raw(file.read()) + except ValidationError as e: + print(f"Validation error: {e}") + return cls() + + def update_persisted_progress(self, key: str, current_progress: float, current_time: float) -> None: + self.progress[key] = ProgressData(current_progress=current_progress, last_updated_time=current_time) + self.save() + + def reset_persisted_progress(self, key: str) -> None: + if key in self.progress: + del self.progress[key] + self.save() + + def get_persisted_progress_for_key(self, key: str) -> ProgressData: + default_progress = ProgressData(current_progress=0, last_updated_time=time.monotonic()) + return self.progress.get(key, default_progress) + + def save(self): + with open(PROGRESS_FILE, "w") as file: + file.write(self.json()) + + def create_pgpass_file(connection_string_or_info): """Look up password from the given object which can be a dict or a string and write a possible password in a pgpass file; diff --git a/pghoard/transfer.py b/pghoard/transfer.py index 1af3daf3..f2d42b8a 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -25,7 +25,7 @@ from rohmu.typing import Metadata from pghoard.common import ( - CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, + CallbackEvent, CallbackQueue, FileType, PersistedProgress, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file, get_object_storage_config ) from pghoard.fetcher import FileFetchManager @@ -147,6 +147,9 @@ def untrack_upload_event(self, file_key: str) -> None: def increment(self, file_key: str, total_bytes_uploaded: float) -> None: metric_data = {} + now = time.monotonic() + persisted_progress = PersistedProgress.read_persisted_progress() + with self._tracked_events_lock: if file_key not in self._tracked_events: raise Exception(f"UploadEvent for {file_key} is not being tracked.") @@ -155,6 +158,16 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None: if file_type in ( FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk ): + progress_info = persisted_progress.get_persisted_progress_for_key(file_key) + last_persisted_progress = progress_info.current_progress + last_persisted_epoch = progress_info.last_updated_time + if total_bytes_uploaded > last_persisted_progress: + persisted_progress.update_persisted_progress(file_key, total_bytes_uploaded, now) + self.metrics.gauge("basebackup_stalled", 0) + elif total_bytes_uploaded <= last_persisted_progress: + stalled_time = now - last_persisted_epoch + self.metrics.gauge("basebackup_stalled", stalled_time) + self.log.warning("Upload for file %s has been stalled for %s seconds.", file_key, stalled_time) metric_data = { "metric": "pghoard.basebackup_bytes_uploaded", "inc_value": total_bytes_uploaded, @@ -410,6 +423,13 @@ def run_safe(self): time.monotonic() - start_time ) + if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + if result.success: + persisted_progress = PersistedProgress.read_persisted_progress() + persisted_progress.reset_persisted_progress(key) + self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -513,6 +533,10 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - + if file_to_transfer.file_type in ( + FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk + ): + persisted_progress = PersistedProgress.read_persisted_progress() + persisted_progress.reset_persisted_progress(key) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/test_common.py b/test/test_common.py index d575f79b..8e89e930 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -9,13 +9,14 @@ import os from pathlib import Path from typing import Any, Dict +from unittest.mock import patch import pytest from mock.mock import Mock from rohmu.errors import Error from pghoard.common import ( - TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file, + TAR_METADATA_FILENAME, PersistedProgress, create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version, pg_version_string_to_number, write_json_file ) @@ -88,6 +89,43 @@ def test_json_serialization(self, tmpdir): assert ob2 == ob2_ + def test_persisted_progress(self): + test_progress_file = "test_progress.json" + + test_data = { + "progress": { + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { + "current_progress": 100, + "last_updated_time": 1625072042.123456 + } + } + } + + with open(test_progress_file, "w") as file: + json.dump(test_data, file) + + with patch("pghoard.common.PROGRESS_FILE", test_progress_file): + persisted_progress = PersistedProgress.read_persisted_progress() + assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == 100 + assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == 1625072042.123456 + + new_progress = 200 + new_time = 1625072099.123456 + persisted_progress.update_persisted_progress( + "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b", new_progress, new_time + ) + + updated_progress = PersistedProgress.read_persisted_progress() + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].current_progress == new_progress + assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" + ].last_updated_time == new_time + + os.remove(test_progress_file) + def test_pg_major_version(): assert pg_major_version("10") == "10" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 703383fb..fc04d146 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -15,7 +15,9 @@ from rohmu.errors import FileNotFoundFromStorageError, StorageError from pghoard import metrics -from pghoard.common import CallbackEvent, CallbackQueue, FileType, QuitEvent +from pghoard.common import ( + PROGRESS_FILE, CallbackEvent, CallbackQueue, FileType, PersistedProgress, ProgressData, QuitEvent +) from pghoard.transfer import (BaseTransferEvent, DownloadEvent, TransferAgent, UploadEvent, UploadEventProgressTracker) # pylint: disable=attribute-defined-outside-init @@ -316,3 +318,21 @@ def test_handle_metadata_error(self): evt = self.transfer_agent.handle_metadata(self.test_site, "foo", "bar") assert evt.success is False assert isinstance(evt.exception, Exception) + + def test_handle_upload_with_persisted_progress(self): + upload_event = UploadEvent( + backup_site_name="test_site", + file_type=FileType.Basebackup, + file_path=Path(self.foo_basebackup_path), + source_data=Path(self.foo_basebackup_path), + metadata={}, + file_size=3, + callback_queue=CallbackQueue(), + remove_after_upload=True + ) + + self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) + updated_progress = PersistedProgress.read_persisted_progress() + assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + if os.path.exists(PROGRESS_FILE): + os.remove(PROGRESS_FILE)