Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve backup and restore 2 #1

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions src/Backups/ArchiveBackup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#include <Backups/ArchiveBackup.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/Archives/IArchiveReader.h>
#include <IO/Archives/IArchiveWriter.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/Archives/createArchiveWriter.h>


namespace DB
{
ArchiveBackup::ArchiveBackup(
const String & backup_name_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_)
: BackupImpl(backup_name_, context_, base_backup_info_), disk(disk_), path(path_)
{
}

ArchiveBackup::~ArchiveBackup()
{
close();
}

bool ArchiveBackup::backupExists() const
{
return disk ? disk->exists(path) : fs::exists(path);
}

void ArchiveBackup::openImpl(OpenMode open_mode_)
{
/// mutex is already locked
if (open_mode_ == OpenMode::WRITE)
{
if (disk)
writer = createArchiveWriter(path, disk->writeFile(path));
else
writer = createArchiveWriter(path);

writer->setCompression(compression_method, compression_level);
writer->setPassword(password);
}
else if (open_mode_ == OpenMode::READ)
{
if (disk)
{
auto archive_read_function = [d = disk, p = path]() -> std::unique_ptr<SeekableReadBuffer> { return d->readFile(p); };
size_t archive_size = disk->getFileSize(path);
reader = createArchiveReader(path, archive_read_function, archive_size);
}
else
reader = createArchiveReader(path);

reader->setPassword(password);
}
}

void ArchiveBackup::closeImpl(bool writing_finalized_)
{
/// mutex is already locked
if (writer && writer->isWritingFile())
throw Exception("There is some writing unfinished on close", ErrorCodes::LOGICAL_ERROR);

writer.reset();
reader.reset();

if ((getOpenModeNoLock() == OpenMode::WRITE) && !writing_finalized_)
fs::remove(path);
}

std::unique_ptr<ReadBuffer> ArchiveBackup::readFileImpl(const String & file_name) const
{
/// mutex is already locked
return reader->readFile(file_name);
}

std::unique_ptr<WriteBuffer> ArchiveBackup::writeFileImpl(const String & file_name)
{
/// mutex is already locked
return writer->writeFile(file_name);
}

void ArchiveBackup::setCompression(const String & compression_method_, int compression_level_)
{
std::lock_guard lock{mutex};
compression_method = compression_method_;
compression_level = compression_level_;
if (writer)
writer->setCompression(compression_method, compression_level);
}

void ArchiveBackup::setPassword(const String & password_)
{
std::lock_guard lock{mutex};
password = password_;
if (writer)
writer->setPassword(password);
if (reader)
reader->setPassword(password);
}

}
52 changes: 52 additions & 0 deletions src/Backups/ArchiveBackup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <Backups/BackupImpl.h>


namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class IArchiveReader;
class IArchiveWriter;

/// Stores a backup as a single .zip file.
class ArchiveBackup : public BackupImpl
{
public:
/// `disk`_ is allowed to be nullptr and that means the `path_` is a path in the local filesystem.
ArchiveBackup(
const String & backup_name_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_ = {});

~ArchiveBackup() override;

static constexpr const int kDefaultCompressionLevel = -1;

/// Sets compression method and level.
void setCompression(const String & compression_method_, int compression_level_ = kDefaultCompressionLevel);

/// Sets password.
void setPassword(const String & password_);

private:
bool backupExists() const override;
void openImpl(OpenMode open_mode_) override;
void closeImpl(bool writing_finalized_) override;
bool supportsWritingInMultipleThreads() const override { return false; }
std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const override;
std::unique_ptr<WriteBuffer> writeFileImpl(const String & file_name) override;

const DiskPtr disk;
const String path;
std::shared_ptr<IArchiveReader> reader;
std::shared_ptr<IArchiveWriter> writer;
String compression_method;
int compression_level = kDefaultCompressionLevel;
String password;
};

}
28 changes: 0 additions & 28 deletions src/Backups/BackupEntryConcat.cpp

This file was deleted.

30 changes: 0 additions & 30 deletions src/Backups/BackupEntryConcat.h

This file was deleted.

2 changes: 1 addition & 1 deletion src/Backups/BackupEntryFromAppendOnlyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
std::unique_ptr<ReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
{
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
return std::make_unique<LimitReadBuffer>(std::move(buf), limit, true);
return std::make_unique<LimitReadBuffer>(std::move(buf), limit, false);
}

}
31 changes: 0 additions & 31 deletions src/Backups/BackupEntryFromCallback.h

This file was deleted.

11 changes: 9 additions & 2 deletions src/Backups/BackupFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ BackupMutablePtr BackupFactory::createBackup(const CreateParams & params) const
auto it = creators.find(engine_name);
if (it == creators.end())
throw Exception(ErrorCodes::BACKUP_ENGINE_NOT_FOUND, "Not found backup engine {}", engine_name);
return (it->second)(params);
BackupMutablePtr backup = (it->second)(params);
backup->open(params.open_mode);
return backup;
}

void BackupFactory::registerBackupEngine(const String & engine_name, const CreatorFn & creator_fn)
Expand All @@ -31,7 +33,12 @@ void BackupFactory::registerBackupEngine(const String & engine_name, const Creat
creators[engine_name] = creator_fn;
}

void registerBackupEngines(BackupFactory & factory);
void registerBackupEnginesFileAndDisk(BackupFactory &);

void registerBackupEngines(BackupFactory & factory)
{
registerBackupEnginesFileAndDisk(factory);
}

BackupFactory::BackupFactory()
{
Expand Down
3 changes: 3 additions & 0 deletions src/Backups/BackupFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ class BackupFactory : boost::noncopyable
OpenMode open_mode = OpenMode::WRITE;
BackupInfo backup_info;
std::optional<BackupInfo> base_backup_info;
String compression_method;
int compression_level = -1;
String password;
ContextPtr context;
};

Expand Down
Loading