Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Daemon concurrent rpc operations #682

Merged
merged 16 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/multipass/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ std::vector<std::string> split(const std::string& string, const std::string& del
std::string generate_mac_address();
std::string timestamp();
bool is_running(const VirtualMachine::State& state);
// TODO: Rename process_vm_events to something more meaningful
void wait_until_ssh_up(VirtualMachine* virtual_machine, std::chrono::milliseconds timeout,
std::function<void()> const& process_vm_events = []() { });

Expand Down
8 changes: 5 additions & 3 deletions include/multipass/virtual_machine.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2018 Canonical, Ltd.
* Copyright (C) 2017-2019 Canonical, Ltd.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -13,15 +13,15 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Authored by: Alberto Aguirre <[email protected]>
*
*/

#ifndef MULTIPASS_VIRTUAL_MACHINE_H
#define MULTIPASS_VIRTUAL_MACHINE_H

#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>

namespace multipass
Expand Down Expand Up @@ -63,6 +63,8 @@ class VirtualMachine
VirtualMachine::State state;
const SSHKeyProvider& key_provider;
const std::string vm_name;
std::condition_variable state_wait;
std::mutex state_mutex;

protected:
VirtualMachine(VirtualMachine::State state, const SSHKeyProvider& key_provider, const std::string& vm_name)
Expand Down
4 changes: 3 additions & 1 deletion include/multipass/vm_status_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifndef MULTIPASS_VM_STATUS_MONITOR_H
#define MULTIPASS_VM_STATUS_MONITOR_H

#include <multipass/virtual_machine.h>

#include <string>

#include <QJsonObject>
Expand All @@ -33,7 +35,7 @@ class VMStatusMonitor
virtual void on_shutdown() = 0;
virtual void on_suspend() = 0;
virtual void on_restart(const std::string& name) = 0;
virtual void persist_state_for(const std::string& name) = 0;
virtual void persist_state_for(const std::string& name, const VirtualMachine::State& state) = 0;
virtual void update_metadata_for(const std::string& name, const QJsonObject& metadata) = 0;
virtual QJsonObject retrieve_metadata_for(const std::string& name) = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/client/cmd/start.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mp::ReturnCode cmd::Start::run(mp::ArgParser* parser)

auto streaming_callback = [&spinner](mp::StartReply& reply) {
spinner.stop();
spinner.start(reply.start_message());
spinner.start(reply.reply_message());
};

spinner.start(instance_action_message_for(request.instance_names(), "Starting "));
Expand Down
460 changes: 250 additions & 210 deletions src/daemon/daemon.cpp

Large diffs are not rendered by default.

91 changes: 55 additions & 36 deletions src/daemon/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
#include <multipass/virtual_machine.h>
#include <multipass/vm_status_monitor.h>

#include <future>
#include <memory>
#include <unordered_map>
#include <vector>

#include <QFutureWatcher>

namespace multipass
{
Expand Down Expand Up @@ -60,7 +64,7 @@ struct MetricsOptInData
};

struct DaemonConfig;
class Daemon : public QObject, public multipass::Rpc::Service, public multipass::VMStatusMonitor
class Daemon : public QObject, public multipass::VMStatusMonitor
ricab marked this conversation as resolved.
Show resolved Hide resolved
{
Q_OBJECT
public:
Expand All @@ -74,58 +78,58 @@ class Daemon : public QObject, public multipass::Rpc::Service, public multipass:
void on_shutdown() override;
void on_suspend() override;
void on_restart(const std::string& name) override;
void persist_state_for(const std::string& name) override;
void persist_state_for(const std::string& name, const VirtualMachine::State& state) override;
void update_metadata_for(const std::string& name, const QJsonObject& metadata) override;
QJsonObject retrieve_metadata_for(const std::string& name) override;

public slots:
grpc::Status create(grpc::ServerContext* context, const CreateRequest* request,
grpc::ServerWriter<CreateReply>* reply) override;
virtual void create(const CreateRequest* request, grpc::ServerWriter<CreateReply>* reply,
std::promise<grpc::Status>* status_promise);

grpc::Status launch(grpc::ServerContext* context, const LaunchRequest* request,
grpc::ServerWriter<LaunchReply>* reply) override;
virtual void launch(const LaunchRequest* request, grpc::ServerWriter<LaunchReply>* reply,
std::promise<grpc::Status>* status_promise);

grpc::Status purge(grpc::ServerContext* context, const PurgeRequest* request,
grpc::ServerWriter<PurgeReply>* response) override;
virtual void purge(const PurgeRequest* request, grpc::ServerWriter<PurgeReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status find(grpc::ServerContext* context, const FindRequest* request,
grpc::ServerWriter<FindReply>* response) override;
virtual void find(const FindRequest* request, grpc::ServerWriter<FindReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status info(grpc::ServerContext* context, const InfoRequest* request,
grpc::ServerWriter<InfoReply>* response) override;
virtual void info(const InfoRequest* request, grpc::ServerWriter<InfoReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status list(grpc::ServerContext* context, const ListRequest* request,
grpc::ServerWriter<ListReply>* response) override;
virtual void list(const ListRequest* request, grpc::ServerWriter<ListReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status mount(grpc::ServerContext* context, const MountRequest* request,
grpc::ServerWriter<MountReply>* response) override;
virtual void mount(const MountRequest* request, grpc::ServerWriter<MountReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status recover(grpc::ServerContext* context, const RecoverRequest* request,
grpc::ServerWriter<RecoverReply>* response) override;
virtual void recover(const RecoverRequest* request, grpc::ServerWriter<RecoverReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request,
grpc::ServerWriter<SSHInfoReply>* response) override;
virtual void ssh_info(const SSHInfoRequest* request, grpc::ServerWriter<SSHInfoReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status start(grpc::ServerContext* context, const StartRequest* request,
grpc::ServerWriter<StartReply>* response) override;
virtual void start(const StartRequest* request, grpc::ServerWriter<StartReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status stop(grpc::ServerContext* context, const StopRequest* request,
grpc::ServerWriter<StopReply>* response) override;
virtual void stop(const StopRequest* request, grpc::ServerWriter<StopReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status suspend(grpc::ServerContext* context, const SuspendRequest* request,
grpc::ServerWriter<SuspendReply>* response) override;
virtual void suspend(const SuspendRequest* request, grpc::ServerWriter<SuspendReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status restart(grpc::ServerContext* context, const RestartRequest* request,
grpc::ServerWriter<RestartReply>* response) override;
virtual void restart(const RestartRequest* request, grpc::ServerWriter<RestartReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status delet(grpc::ServerContext* context, const DeleteRequest* request,
grpc::ServerWriter<DeleteReply>* response) override;
virtual void delet(const DeleteRequest* request, grpc::ServerWriter<DeleteReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status umount(grpc::ServerContext* context, const UmountRequest* request,
grpc::ServerWriter<UmountReply>* response) override;
virtual void umount(const UmountRequest* request, grpc::ServerWriter<UmountReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status version(grpc::ServerContext* context, const VersionRequest* request,
grpc::ServerWriter<VersionReply>* response) override;
virtual void version(const VersionRequest* request, grpc::ServerWriter<VersionReply>* response,
std::promise<grpc::Status>* status_promise);

private:
void persist_instances();
Expand All @@ -136,14 +140,28 @@ public slots:
void release_resources(const std::string& instance);
std::string check_instance_operational(const std::string& instance_name) const;
std::string check_instance_exists(const std::string& instance_name) const;
grpc::Status create_vm(grpc::ServerContext* context, const CreateRequest* request,
grpc::ServerWriter<CreateReply>* server, bool start);
void create_vm(const CreateRequest* request, grpc::ServerWriter<CreateReply>* server,
std::promise<grpc::Status>* status_promise, bool start);
grpc::Status reboot_vm(VirtualMachine& vm);
grpc::Status shutdown_vm(VirtualMachine& vm, const std::chrono::milliseconds delay);
grpc::Status cancel_vm_shutdown(const VirtualMachine& vm);
grpc::Status cmd_vms(const std::vector<std::string>& tgts, std::function<grpc::Status(VirtualMachine&)> cmd);
void install_sshfs(const VirtualMachine::UPtr& vm, const std::string& name);

struct AsyncOperationStatus
{
grpc::Status status;
std::promise<grpc::Status>* status_promise;
};

grpc::Status async_wait_for_ssh_for(const VirtualMachine::UPtr& vm);
template <typename Reply>
AsyncOperationStatus async_wait_for_ssh_and_start_mounts(grpc::ServerWriter<Reply>* server,
const std::vector<std::string>& vms,
std::promise<grpc::Status>* status_promise);
void finish_async_operation(QFuture<AsyncOperationStatus> async_future);
QFutureWatcher<AsyncOperationStatus>* create_future_watcher();

std::unique_ptr<const DaemonConfig> config;
std::unordered_map<std::string, VMSpecs> vm_instance_specs;
std::unordered_map<std::string, VirtualMachine::UPtr> vm_instances;
Expand All @@ -156,6 +174,7 @@ public slots:
QTimer source_images_maintenance_task;
MetricsProvider metrics_provider;
MetricsOptInData metrics_opt_in;
std::vector<std::unique_ptr<QFutureWatcher<AsyncOperationStatus>>> async_future_watchers;
};
} // namespace multipass
#endif // MULTIPASS_DAEMON_H
60 changes: 43 additions & 17 deletions src/daemon/daemon_rpc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017 Canonical, Ltd.
* Copyright (C) 2017-2019 Canonical, Ltd.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -84,6 +84,16 @@ auto make_server(const std::string& server_address, mp::RpcConnectionType conn_t

return server;
}

template <typename OperationSignal>
grpc::Status emit_signal_and_wait_for_result(OperationSignal operation_signal)
ricab marked this conversation as resolved.
Show resolved Hide resolved
{
std::promise<grpc::Status> status_promise;
auto status_future = status_promise.get_future();
emit operation_signal(&status_promise);

return status_future.get();
}
} // namespace

mp::DaemonRpc::DaemonRpc(const std::string& server_address, mp::RpcConnectionType type,
Expand All @@ -97,97 +107,113 @@ mp::DaemonRpc::DaemonRpc(const std::string& server_address, mp::RpcConnectionTyp
grpc::Status mp::DaemonRpc::create(grpc::ServerContext* context, const CreateRequest* request,
grpc::ServerWriter<CreateReply>* reply)
{
return emit on_create(context, request, reply); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_create, this, request, reply, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::launch(grpc::ServerContext* context, const LaunchRequest* request,
grpc::ServerWriter<LaunchReply>* reply)
{
return emit on_launch(context, request, reply); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_launch, this, request, reply, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::purge(grpc::ServerContext* context, const PurgeRequest* request,
grpc::ServerWriter<PurgeReply>* response)
{
return emit on_purge(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_purge, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::find(grpc::ServerContext* context, const FindRequest* request,
grpc::ServerWriter<FindReply>* response)
{
return emit on_find(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_find, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::info(grpc::ServerContext* context, const InfoRequest* request,
grpc::ServerWriter<InfoReply>* response)
{
return emit on_info(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_info, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::list(grpc::ServerContext* context, const ListRequest* request,
grpc::ServerWriter<ListReply>* response)
{
return emit on_list(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_list, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::mount(grpc::ServerContext* context, const MountRequest* request,
grpc::ServerWriter<MountReply>* response)
{
return emit on_mount(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_mount, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::recover(grpc::ServerContext* context, const RecoverRequest* request,
grpc::ServerWriter<RecoverReply>* response)
{
return emit on_recover(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_recover, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request,
grpc::ServerWriter<SSHInfoReply>* response)
{
return emit on_ssh_info(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_ssh_info, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::start(grpc::ServerContext* context, const StartRequest* request,
grpc::ServerWriter<StartReply>* response)
{
return emit on_start(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_start, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::stop(grpc::ServerContext* context, const StopRequest* request,
grpc::ServerWriter<StopReply>* response)
{
return emit on_stop(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_stop, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::suspend(grpc::ServerContext* context, const SuspendRequest* request,
grpc::ServerWriter<SuspendReply>* response)
{
return emit on_suspend(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_suspend, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::restart(grpc::ServerContext* context, const RestartRequest* request,
grpc::ServerWriter<RestartReply>* response)
{
return emit on_restart(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_restart, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::delet(grpc::ServerContext* context, const DeleteRequest* request,
grpc::ServerWriter<DeleteReply>* response)
{
return emit on_delete(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_delete, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::umount(grpc::ServerContext* context, const UmountRequest* request,
grpc::ServerWriter<UmountReply>* response)
{
return emit on_umount(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_umount, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::version(grpc::ServerContext* context, const VersionRequest* request,
grpc::ServerWriter<VersionReply>* response)
{
return emit on_version(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_version, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::ping(grpc::ServerContext* context, const PingRequest* request, PingReply* response)
Expand Down
Loading