Skip to content

Commit

Permalink
Merge #682
Browse files Browse the repository at this point in the history
682: Daemon concurrent rpc operations r=gerboland a=townsend2010

Behavior of this branch (so far);
1. The check for ssh up in `launch`, `start`, and `restart`, should be asynchronous.
2. Issuing a `multipass delete -p <instance_name>` on a starting instance should be safe when using the qemu & libvirt backends.
3. Installing `sshfs` during `start` should be asynchronous like when defining a mount for the first time on a stopped instance.

Left to do:
1. Make installing `sshfs` during the `mount` command asynchronous.
3. Making other operations asynchronous like preparing an image, downloading an image, uncompressing an image.

Co-authored-by: Chris Townsend <[email protected]>
  • Loading branch information
bors[bot] and Chris Townsend committed Apr 5, 2019
2 parents b47a7af + 25b2261 commit e956c92
Show file tree
Hide file tree
Showing 18 changed files with 515 additions and 355 deletions.
1 change: 1 addition & 0 deletions include/multipass/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ std::vector<std::string> split(const std::string& string, const std::string& del
std::string generate_mac_address();
std::string timestamp();
bool is_running(const VirtualMachine::State& state);
// TODO: Rename process_vm_events to something more meaningful
void wait_until_ssh_up(VirtualMachine* virtual_machine, std::chrono::milliseconds timeout,
std::function<void()> const& process_vm_events = []() { });

Expand Down
8 changes: 5 additions & 3 deletions include/multipass/virtual_machine.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2018 Canonical, Ltd.
* Copyright (C) 2017-2019 Canonical, Ltd.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -13,15 +13,15 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* Authored by: Alberto Aguirre <[email protected]>
*
*/

#ifndef MULTIPASS_VIRTUAL_MACHINE_H
#define MULTIPASS_VIRTUAL_MACHINE_H

#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>

namespace multipass
Expand Down Expand Up @@ -63,6 +63,8 @@ class VirtualMachine
VirtualMachine::State state;
const SSHKeyProvider& key_provider;
const std::string vm_name;
std::condition_variable state_wait;
std::mutex state_mutex;

protected:
VirtualMachine(VirtualMachine::State state, const SSHKeyProvider& key_provider, const std::string& vm_name)
Expand Down
4 changes: 3 additions & 1 deletion include/multipass/vm_status_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifndef MULTIPASS_VM_STATUS_MONITOR_H
#define MULTIPASS_VM_STATUS_MONITOR_H

#include <multipass/virtual_machine.h>

#include <string>

#include <QJsonObject>
Expand All @@ -33,7 +35,7 @@ class VMStatusMonitor
virtual void on_shutdown() = 0;
virtual void on_suspend() = 0;
virtual void on_restart(const std::string& name) = 0;
virtual void persist_state_for(const std::string& name) = 0;
virtual void persist_state_for(const std::string& name, const VirtualMachine::State& state) = 0;
virtual void update_metadata_for(const std::string& name, const QJsonObject& metadata) = 0;
virtual QJsonObject retrieve_metadata_for(const std::string& name) = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/client/cmd/start.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mp::ReturnCode cmd::Start::run(mp::ArgParser* parser)

auto streaming_callback = [&spinner](mp::StartReply& reply) {
spinner.stop();
spinner.start(reply.start_message());
spinner.start(reply.reply_message());
};

spinner.start(instance_action_message_for(request.instance_names(), "Starting "));
Expand Down
460 changes: 250 additions & 210 deletions src/daemon/daemon.cpp

Large diffs are not rendered by default.

91 changes: 55 additions & 36 deletions src/daemon/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
#include <multipass/virtual_machine.h>
#include <multipass/vm_status_monitor.h>

#include <future>
#include <memory>
#include <unordered_map>
#include <vector>

#include <QFutureWatcher>

namespace multipass
{
Expand Down Expand Up @@ -60,7 +64,7 @@ struct MetricsOptInData
};

struct DaemonConfig;
class Daemon : public QObject, public multipass::Rpc::Service, public multipass::VMStatusMonitor
class Daemon : public QObject, public multipass::VMStatusMonitor
{
Q_OBJECT
public:
Expand All @@ -74,58 +78,58 @@ class Daemon : public QObject, public multipass::Rpc::Service, public multipass:
void on_shutdown() override;
void on_suspend() override;
void on_restart(const std::string& name) override;
void persist_state_for(const std::string& name) override;
void persist_state_for(const std::string& name, const VirtualMachine::State& state) override;
void update_metadata_for(const std::string& name, const QJsonObject& metadata) override;
QJsonObject retrieve_metadata_for(const std::string& name) override;

public slots:
grpc::Status create(grpc::ServerContext* context, const CreateRequest* request,
grpc::ServerWriter<CreateReply>* reply) override;
virtual void create(const CreateRequest* request, grpc::ServerWriter<CreateReply>* reply,
std::promise<grpc::Status>* status_promise);

grpc::Status launch(grpc::ServerContext* context, const LaunchRequest* request,
grpc::ServerWriter<LaunchReply>* reply) override;
virtual void launch(const LaunchRequest* request, grpc::ServerWriter<LaunchReply>* reply,
std::promise<grpc::Status>* status_promise);

grpc::Status purge(grpc::ServerContext* context, const PurgeRequest* request,
grpc::ServerWriter<PurgeReply>* response) override;
virtual void purge(const PurgeRequest* request, grpc::ServerWriter<PurgeReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status find(grpc::ServerContext* context, const FindRequest* request,
grpc::ServerWriter<FindReply>* response) override;
virtual void find(const FindRequest* request, grpc::ServerWriter<FindReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status info(grpc::ServerContext* context, const InfoRequest* request,
grpc::ServerWriter<InfoReply>* response) override;
virtual void info(const InfoRequest* request, grpc::ServerWriter<InfoReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status list(grpc::ServerContext* context, const ListRequest* request,
grpc::ServerWriter<ListReply>* response) override;
virtual void list(const ListRequest* request, grpc::ServerWriter<ListReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status mount(grpc::ServerContext* context, const MountRequest* request,
grpc::ServerWriter<MountReply>* response) override;
virtual void mount(const MountRequest* request, grpc::ServerWriter<MountReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status recover(grpc::ServerContext* context, const RecoverRequest* request,
grpc::ServerWriter<RecoverReply>* response) override;
virtual void recover(const RecoverRequest* request, grpc::ServerWriter<RecoverReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request,
grpc::ServerWriter<SSHInfoReply>* response) override;
virtual void ssh_info(const SSHInfoRequest* request, grpc::ServerWriter<SSHInfoReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status start(grpc::ServerContext* context, const StartRequest* request,
grpc::ServerWriter<StartReply>* response) override;
virtual void start(const StartRequest* request, grpc::ServerWriter<StartReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status stop(grpc::ServerContext* context, const StopRequest* request,
grpc::ServerWriter<StopReply>* response) override;
virtual void stop(const StopRequest* request, grpc::ServerWriter<StopReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status suspend(grpc::ServerContext* context, const SuspendRequest* request,
grpc::ServerWriter<SuspendReply>* response) override;
virtual void suspend(const SuspendRequest* request, grpc::ServerWriter<SuspendReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status restart(grpc::ServerContext* context, const RestartRequest* request,
grpc::ServerWriter<RestartReply>* response) override;
virtual void restart(const RestartRequest* request, grpc::ServerWriter<RestartReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status delet(grpc::ServerContext* context, const DeleteRequest* request,
grpc::ServerWriter<DeleteReply>* response) override;
virtual void delet(const DeleteRequest* request, grpc::ServerWriter<DeleteReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status umount(grpc::ServerContext* context, const UmountRequest* request,
grpc::ServerWriter<UmountReply>* response) override;
virtual void umount(const UmountRequest* request, grpc::ServerWriter<UmountReply>* response,
std::promise<grpc::Status>* status_promise);

grpc::Status version(grpc::ServerContext* context, const VersionRequest* request,
grpc::ServerWriter<VersionReply>* response) override;
virtual void version(const VersionRequest* request, grpc::ServerWriter<VersionReply>* response,
std::promise<grpc::Status>* status_promise);

private:
void persist_instances();
Expand All @@ -136,14 +140,28 @@ public slots:
void release_resources(const std::string& instance);
std::string check_instance_operational(const std::string& instance_name) const;
std::string check_instance_exists(const std::string& instance_name) const;
grpc::Status create_vm(grpc::ServerContext* context, const CreateRequest* request,
grpc::ServerWriter<CreateReply>* server, bool start);
void create_vm(const CreateRequest* request, grpc::ServerWriter<CreateReply>* server,
std::promise<grpc::Status>* status_promise, bool start);
grpc::Status reboot_vm(VirtualMachine& vm);
grpc::Status shutdown_vm(VirtualMachine& vm, const std::chrono::milliseconds delay);
grpc::Status cancel_vm_shutdown(const VirtualMachine& vm);
grpc::Status cmd_vms(const std::vector<std::string>& tgts, std::function<grpc::Status(VirtualMachine&)> cmd);
void install_sshfs(const VirtualMachine::UPtr& vm, const std::string& name);

struct AsyncOperationStatus
{
grpc::Status status;
std::promise<grpc::Status>* status_promise;
};

grpc::Status async_wait_for_ssh_for(const VirtualMachine::UPtr& vm);
template <typename Reply>
AsyncOperationStatus async_wait_for_ssh_and_start_mounts(grpc::ServerWriter<Reply>* server,
const std::vector<std::string>& vms,
std::promise<grpc::Status>* status_promise);
void finish_async_operation(QFuture<AsyncOperationStatus> async_future);
QFutureWatcher<AsyncOperationStatus>* create_future_watcher();

std::unique_ptr<const DaemonConfig> config;
std::unordered_map<std::string, VMSpecs> vm_instance_specs;
std::unordered_map<std::string, VirtualMachine::UPtr> vm_instances;
Expand All @@ -156,6 +174,7 @@ public slots:
QTimer source_images_maintenance_task;
MetricsProvider metrics_provider;
MetricsOptInData metrics_opt_in;
std::vector<std::unique_ptr<QFutureWatcher<AsyncOperationStatus>>> async_future_watchers;
};
} // namespace multipass
#endif // MULTIPASS_DAEMON_H
60 changes: 43 additions & 17 deletions src/daemon/daemon_rpc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017 Canonical, Ltd.
* Copyright (C) 2017-2019 Canonical, Ltd.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -84,6 +84,16 @@ auto make_server(const std::string& server_address, mp::RpcConnectionType conn_t

return server;
}

template <typename OperationSignal>
grpc::Status emit_signal_and_wait_for_result(OperationSignal operation_signal)
{
std::promise<grpc::Status> status_promise;
auto status_future = status_promise.get_future();
emit operation_signal(&status_promise);

return status_future.get();
}
} // namespace

mp::DaemonRpc::DaemonRpc(const std::string& server_address, mp::RpcConnectionType type,
Expand All @@ -97,97 +107,113 @@ mp::DaemonRpc::DaemonRpc(const std::string& server_address, mp::RpcConnectionTyp
grpc::Status mp::DaemonRpc::create(grpc::ServerContext* context, const CreateRequest* request,
grpc::ServerWriter<CreateReply>* reply)
{
return emit on_create(context, request, reply); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_create, this, request, reply, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::launch(grpc::ServerContext* context, const LaunchRequest* request,
grpc::ServerWriter<LaunchReply>* reply)
{
return emit on_launch(context, request, reply); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_launch, this, request, reply, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::purge(grpc::ServerContext* context, const PurgeRequest* request,
grpc::ServerWriter<PurgeReply>* response)
{
return emit on_purge(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_purge, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::find(grpc::ServerContext* context, const FindRequest* request,
grpc::ServerWriter<FindReply>* response)
{
return emit on_find(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_find, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::info(grpc::ServerContext* context, const InfoRequest* request,
grpc::ServerWriter<InfoReply>* response)
{
return emit on_info(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_info, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::list(grpc::ServerContext* context, const ListRequest* request,
grpc::ServerWriter<ListReply>* response)
{
return emit on_list(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_list, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::mount(grpc::ServerContext* context, const MountRequest* request,
grpc::ServerWriter<MountReply>* response)
{
return emit on_mount(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_mount, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::recover(grpc::ServerContext* context, const RecoverRequest* request,
grpc::ServerWriter<RecoverReply>* response)
{
return emit on_recover(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_recover, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request,
grpc::ServerWriter<SSHInfoReply>* response)
{
return emit on_ssh_info(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_ssh_info, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::start(grpc::ServerContext* context, const StartRequest* request,
grpc::ServerWriter<StartReply>* response)
{
return emit on_start(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_start, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::stop(grpc::ServerContext* context, const StopRequest* request,
grpc::ServerWriter<StopReply>* response)
{
return emit on_stop(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_stop, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::suspend(grpc::ServerContext* context, const SuspendRequest* request,
grpc::ServerWriter<SuspendReply>* response)
{
return emit on_suspend(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_suspend, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::restart(grpc::ServerContext* context, const RestartRequest* request,
grpc::ServerWriter<RestartReply>* response)
{
return emit on_restart(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_restart, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::delet(grpc::ServerContext* context, const DeleteRequest* request,
grpc::ServerWriter<DeleteReply>* response)
{
return emit on_delete(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_delete, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::umount(grpc::ServerContext* context, const UmountRequest* request,
grpc::ServerWriter<UmountReply>* response)
{
return emit on_umount(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_umount, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::version(grpc::ServerContext* context, const VersionRequest* request,
grpc::ServerWriter<VersionReply>* response)
{
return emit on_version(context, request, response); // must block until slot returns
return emit_signal_and_wait_for_result(
std::bind(&DaemonRpc::on_version, this, request, response, std::placeholders::_1));
}

grpc::Status mp::DaemonRpc::ping(grpc::ServerContext* context, const PingRequest* request, PingReply* response)
Expand Down
Loading

0 comments on commit e956c92

Please sign in to comment.