From a1516c5ba7010f0ee3d23567b603c5d50de6c3c3 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Thu, 7 Mar 2019 16:31:02 -0500 Subject: [PATCH 01/16] daemon/rpc: Use a promise/future model for allowing more asynchronicity This will allow the daemon to spawn off long running operations and then service any new operations requested by the RPC. Fixes #643 --- src/daemon/daemon.cpp | 214 +++++++++++++++++++++----------------- src/daemon/daemon.h | 63 +++++------ src/daemon/daemon_rpc.cpp | 59 ++++++++--- src/daemon/daemon_rpc.h | 65 ++++++------ 4 files changed, 224 insertions(+), 177 deletions(-) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 91ac90a3b6d..1b5b8cd3f26 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -415,21 +415,21 @@ auto get_metrics_opt_in(const mp::Path& data_path) auto connect_rpc(mp::DaemonRpc& rpc, mp::Daemon& daemon) { QObject::connect(&rpc, &mp::DaemonRpc::on_create, &daemon, &mp::Daemon::create, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_launch, &daemon, &mp::Daemon::launch, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_purge, &daemon, &mp::Daemon::purge, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_find, &daemon, &mp::Daemon::find, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_info, &daemon, &mp::Daemon::info, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_list, &daemon, &mp::Daemon::list, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_mount, &daemon, &mp::Daemon::mount, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_recover, &daemon, &mp::Daemon::recover, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_ssh_info, &daemon, &mp::Daemon::ssh_info, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_start, &daemon, &mp::Daemon::start, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_stop, &daemon, &mp::Daemon::stop, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_suspend, &daemon, &mp::Daemon::suspend, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_restart, &daemon, &mp::Daemon::restart, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_delete, &daemon, &mp::Daemon::delet, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_umount, &daemon, &mp::Daemon::umount, Qt::BlockingQueuedConnection); - QObject::connect(&rpc, &mp::DaemonRpc::on_version, &daemon, &mp::Daemon::version, Qt::BlockingQueuedConnection); + QObject::connect(&rpc, &mp::DaemonRpc::on_launch, &daemon, &mp::Daemon::launch); + QObject::connect(&rpc, &mp::DaemonRpc::on_purge, &daemon, &mp::Daemon::purge); + QObject::connect(&rpc, &mp::DaemonRpc::on_find, &daemon, &mp::Daemon::find); + QObject::connect(&rpc, &mp::DaemonRpc::on_info, &daemon, &mp::Daemon::info); + QObject::connect(&rpc, &mp::DaemonRpc::on_list, &daemon, &mp::Daemon::list); + QObject::connect(&rpc, &mp::DaemonRpc::on_mount, &daemon, &mp::Daemon::mount); + QObject::connect(&rpc, &mp::DaemonRpc::on_recover, &daemon, &mp::Daemon::recover); + QObject::connect(&rpc, &mp::DaemonRpc::on_ssh_info, &daemon, &mp::Daemon::ssh_info); + QObject::connect(&rpc, &mp::DaemonRpc::on_start, &daemon, &mp::Daemon::start); + QObject::connect(&rpc, &mp::DaemonRpc::on_stop, &daemon, &mp::Daemon::stop); + QObject::connect(&rpc, &mp::DaemonRpc::on_suspend, &daemon, &mp::Daemon::suspend); + QObject::connect(&rpc, &mp::DaemonRpc::on_restart, &daemon, &mp::Daemon::restart); + QObject::connect(&rpc, &mp::DaemonRpc::on_delete, &daemon, &mp::Daemon::delet); + QObject::connect(&rpc, &mp::DaemonRpc::on_umount, &daemon, &mp::Daemon::umount); + QObject::connect(&rpc, &mp::DaemonRpc::on_version, &daemon, &mp::Daemon::version); } template @@ -665,8 +665,9 @@ catch (const std::exception& e) return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); } -grpc::Status mp::Daemon::launch(grpc::ServerContext* context, const LaunchRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::launch(grpc::ServerContext* context, const LaunchRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -681,7 +682,7 @@ try // clang-format on reply.set_metrics_pending(true); server->Write(reply); - return grpc::Status::OK; + return status_promise->set_value(grpc::Status::OK); } persist_metrics_opt_in_data(metrics_opt_in, config->data_directory); @@ -711,15 +712,16 @@ catch (const mp::StartException& e) vm_instances.erase(name); persist_instances(); - return grpc::Status(grpc::StatusCode::ABORTED, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::ABORTED, e.what(), "")); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::purge(grpc::ServerContext* context, const PurgeRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::purge(grpc::ServerContext* context, const PurgeRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { for (const auto& del : deleted_instances) @@ -728,15 +730,15 @@ try // clang-format on deleted_instances.clear(); persist_instances(); - return grpc::Status::OK; + status_promise->set_value(grpc::Status::OK); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::find(grpc::ServerContext* context, const FindRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -886,15 +888,15 @@ try // clang-format on } } server->Write(response); - return grpc::Status::OK; + status_promise->set_value(grpc::Status::OK); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::info(grpc::ServerContext* context, const InfoRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1053,15 +1055,15 @@ try // clang-format on if (status.ok()) server->Write(response); - return status; + status_promise->set_value(status); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::list(grpc::ServerContext* context, const ListRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1131,15 +1133,16 @@ try // clang-format on } server->Write(response); - return grpc::Status::OK; + status_promise->set_value(grpc::Status::OK); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::mount(grpc::ServerContext* context, const MountRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::mount(grpc::ServerContext* context, const MountRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1147,20 +1150,23 @@ try // clang-format on QFileInfo source_dir(QString::fromStdString(request->source_path())); if (!source_dir.exists()) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - fmt::format("source \"{}\" does not exist", request->source_path()), ""); + return status_promise->set_value( + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + fmt::format("source \"{}\" does not exist", request->source_path()), "")); } if (!source_dir.isDir()) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - fmt::format("source \"{}\" is not a directory", request->source_path()), ""); + return status_promise->set_value( + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + fmt::format("source \"{}\" is not a directory", request->source_path()), "")); } if (!source_dir.isReadable()) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, - fmt::format("source \"{}\" is not readable", request->source_path()), ""); + return status_promise->set_value( + grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + fmt::format("source \"{}\" is not readable", request->source_path()), "")); } std::unordered_map uid_map{request->mount_maps().uid_map().begin(), @@ -1213,7 +1219,7 @@ try // clang-format on } catch (const mp::SSHFSMissingError&) { - return grpc_status_for_mount_error(name); + return status_promise->set_value(grpc_status_for_mount_error(name)); } } catch (const std::exception& e) @@ -1236,15 +1242,16 @@ try // clang-format on persist_instances(); - return grpc_status_for(errors); + status_promise->set_value(grpc_status_for(errors)); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::recover(grpc::ServerContext* context, const RecoverRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1277,15 +1284,16 @@ try // clang-format on persist_instances(); } - return status; + status_promise->set_value(status); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1296,25 +1304,27 @@ try // clang-format on auto it = vm_instances.find(name); if (it == vm_instances.end()) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, fmt::format("instance \"{}\" does not exist", name), - ""); + return status_promise->set_value(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + fmt::format("instance \"{}\" does not exist", name), "")); } auto& vm = it->second; if (!mp::utils::is_running(vm->current_state())) { - return grpc::Status(grpc::StatusCode::ABORTED, fmt::format("instance \"{}\" is not running", name)); + return status_promise->set_value( + grpc::Status(grpc::StatusCode::ABORTED, fmt::format("instance \"{}\" is not running", name))); } if (vm->state == VirtualMachine::State::delayed_shutdown) { if (delayed_shutdown_instances[name]->get_time_remaining() <= std::chrono::minutes(1)) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, - fmt::format("\"{}\" is scheduled to shut down in less than a minute, use " - "'multipass stop --cancel {}' to cancel the shutdown.", - name, name), - ""); + return status_promise->set_value( + grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, + fmt::format("\"{}\" is scheduled to shut down in less than a minute, use " + "'multipass stop --cancel {}' to cancel the shutdown.", + name, name), + "")); } } @@ -1327,15 +1337,16 @@ try // clang-format on } server->Write(response); - return grpc::Status::OK; + status_promise->set_value(grpc::Status::OK); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::start(grpc::ServerContext* context, const StartRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::start(grpc::ServerContext* context, const StartRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1354,16 +1365,18 @@ try // clang-format on mp::StartError start_error; start_error.set_error_code(mp::StartError::DOES_NOT_EXIST); start_error.set_instance_name(name); - return grpc::Status(grpc::StatusCode::ABORTED, fmt::format("instance \"{}\" does not exist", name), - start_error.SerializeAsString()); + return status_promise->set_value(grpc::Status(grpc::StatusCode::ABORTED, + fmt::format("instance \"{}\" does not exist", name), + start_error.SerializeAsString())); } else { mp::StartError start_error; start_error.set_error_code(mp::StartError::INSTANCE_DELETED); start_error.set_instance_name(name); - return grpc::Status(grpc::StatusCode::ABORTED, fmt::format("instance \"{}\" is deleted", name), - start_error.SerializeAsString()); + return status_promise->set_value(grpc::Status(grpc::StatusCode::ABORTED, + fmt::format("instance \"{}\" is deleted", name), + start_error.SerializeAsString())); } continue; } @@ -1433,7 +1446,7 @@ try // clang-format on } catch (const mp::SSHFSMissingError&) { - return grpc_status_for_mount_error(name); + status_promise->set_value(grpc_status_for_mount_error(name)); } } catch (const std::exception& e) @@ -1458,15 +1471,15 @@ try // clang-format on server->Write(start_reply); } - return grpc_status_for(errors); + status_promise->set_value(grpc_status_for(errors)); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::stop(grpc::ServerContext* context, const StopRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1489,15 +1502,16 @@ try // clang-format on status = cmd_vms(instances, operation); } - return status; + status_promise->set_value(status); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::suspend(grpc::ServerContext* context, const SuspendRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1535,15 +1549,16 @@ try // clang-format on }); } - return status; + status_promise->set_value(status); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::restart(grpc::ServerContext* context, const RestartRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1570,15 +1585,16 @@ try // clang-format on } } - return status; + status_promise->set_value(status); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::delet(grpc::ServerContext* context, const DeleteRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::delet(grpc::ServerContext* context, const DeleteRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1630,15 +1646,16 @@ try // clang-format on persist_instances(); } - return status; + status_promise->set_value(status); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::umount(grpc::ServerContext* context, const UmountRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::umount(grpc::ServerContext* context, const UmountRequest* request, + grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1703,15 +1720,15 @@ try // clang-format on persist_instances(); - return grpc_status_for(errors); + status_promise->set_value(grpc_status_for(errors)); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -grpc::Status mp::Daemon::version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* server) +void mp::Daemon::version(grpc::ServerContext* context, const VersionRequest* request, + grpc::ServerWriter* server, std::promise* status_promise) { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; @@ -1719,7 +1736,7 @@ grpc::Status mp::Daemon::version(grpc::ServerContext* context, const VersionRequ reply.set_version(multipass::version_string); config->update_prompt->populate(reply.mutable_update_info()); server->Write(reply); - return grpc::Status::OK; + status_promise->set_value(grpc::Status::OK); } void mp::Daemon::on_shutdown() @@ -1938,8 +1955,9 @@ grpc::Status mp::Daemon::create_vm(grpc::ServerContext* context, const CreateReq CreateError create_error; create_error.add_error_codes(CreateError::INSTANCE_EXISTS); - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, fmt::format("instance \"{}\" already exists", name), - create_error.SerializeAsString()); + return status_promise->set_value(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + fmt::format("instance \"{}\" already exists", name), + create_error.SerializeAsString())); } auto query = query_from(request, name); @@ -2023,7 +2041,7 @@ grpc::Status mp::Daemon::create_vm(grpc::ServerContext* context, const CreateReq server->Write(reply); } - return grpc::Status::OK; + status_promise->set_value(grpc::Status::OK); } grpc::Status mp::Daemon::reboot_vm(VirtualMachine& vm) diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index 6c82271414a..227583c4e6e 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -28,6 +28,7 @@ #include #include +#include #include #include @@ -60,7 +61,7 @@ struct MetricsOptInData }; struct DaemonConfig; -class Daemon : public QObject, public multipass::Rpc::Service, public multipass::VMStatusMonitor +class Daemon : public QObject, public multipass::VMStatusMonitor { Q_OBJECT public: @@ -82,50 +83,50 @@ public slots: grpc::Status create(grpc::ServerContext* context, const CreateRequest* request, grpc::ServerWriter* reply) override; - grpc::Status launch(grpc::ServerContext* context, const LaunchRequest* request, - grpc::ServerWriter* reply) override; + void launch(grpc::ServerContext* context, const LaunchRequest* request, grpc::ServerWriter* reply, + std::promise* status_promise); - grpc::Status purge(grpc::ServerContext* context, const PurgeRequest* request, - grpc::ServerWriter* response) override; + void purge(grpc::ServerContext* context, const PurgeRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status find(grpc::ServerContext* context, const FindRequest* request, - grpc::ServerWriter* response) override; + void find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status info(grpc::ServerContext* context, const InfoRequest* request, - grpc::ServerWriter* response) override; + void info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status list(grpc::ServerContext* context, const ListRequest* request, - grpc::ServerWriter* response) override; + void list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status mount(grpc::ServerContext* context, const MountRequest* request, - grpc::ServerWriter* response) override; + void mount(grpc::ServerContext* context, const MountRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* response) override; + void recover(grpc::ServerContext* context, const RecoverRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); - grpc::Status ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* response) override; + void ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); - grpc::Status start(grpc::ServerContext* context, const StartRequest* request, - grpc::ServerWriter* response) override; + void start(grpc::ServerContext* context, const StartRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status stop(grpc::ServerContext* context, const StopRequest* request, - grpc::ServerWriter* response) override; + void stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* response) override; + void suspend(grpc::ServerContext* context, const SuspendRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); - grpc::Status restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* response) override; + void restart(grpc::ServerContext* context, const RestartRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); - grpc::Status delet(grpc::ServerContext* context, const DeleteRequest* request, - grpc::ServerWriter* response) override; + void delet(grpc::ServerContext* context, const DeleteRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status umount(grpc::ServerContext* context, const UmountRequest* request, - grpc::ServerWriter* response) override; + void umount(grpc::ServerContext* context, const UmountRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - grpc::Status version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* response) override; + void version(grpc::ServerContext* context, const VersionRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); private: void persist_instances(); diff --git a/src/daemon/daemon_rpc.cpp b/src/daemon/daemon_rpc.cpp index 820bf2d23d3..2bfb898c68a 100644 --- a/src/daemon/daemon_rpc.cpp +++ b/src/daemon/daemon_rpc.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017 Canonical, Ltd. + * Copyright (C) 2017-2019 Canonical, Ltd. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -84,6 +84,18 @@ auto make_server(const std::string& server_address, mp::RpcConnectionType conn_t return server; } + +template +grpc::Status emit_signal_and_wait_for_result(OperationSignal operation_signal) +{ + std::promise status_promise; + auto status_future = status_promise.get_future(); + emit operation_signal(&status_promise); + + status_future.wait(); + + return status_future.get(); +} } // namespace mp::DaemonRpc::DaemonRpc(const std::string& server_address, mp::RpcConnectionType type, @@ -103,91 +115,106 @@ grpc::Status mp::DaemonRpc::create(grpc::ServerContext* context, const CreateReq grpc::Status mp::DaemonRpc::launch(grpc::ServerContext* context, const LaunchRequest* request, grpc::ServerWriter* reply) { - return emit on_launch(context, request, reply); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_launch, this, context, request, reply, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::purge(grpc::ServerContext* context, const PurgeRequest* request, grpc::ServerWriter* response) { - return emit on_purge(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_purge, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* response) { - return emit on_find(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_find, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* response) { - return emit on_info(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_info, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* response) { - return emit on_list(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_list, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::mount(grpc::ServerContext* context, const MountRequest* request, grpc::ServerWriter* response) { - return emit on_mount(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_mount, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::recover(grpc::ServerContext* context, const RecoverRequest* request, grpc::ServerWriter* response) { - return emit on_recover(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_recover, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, grpc::ServerWriter* response) { - return emit on_ssh_info(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_ssh_info, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::start(grpc::ServerContext* context, const StartRequest* request, grpc::ServerWriter* response) { - return emit on_start(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_start, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* response) { - return emit on_stop(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_stop, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::suspend(grpc::ServerContext* context, const SuspendRequest* request, grpc::ServerWriter* response) { - return emit on_suspend(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_suspend, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::restart(grpc::ServerContext* context, const RestartRequest* request, grpc::ServerWriter* response) { - return emit on_restart(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_restart, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::delet(grpc::ServerContext* context, const DeleteRequest* request, grpc::ServerWriter* response) { - return emit on_delete(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_delete, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::umount(grpc::ServerContext* context, const UmountRequest* request, grpc::ServerWriter* response) { - return emit on_umount(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_umount, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::version(grpc::ServerContext* context, const VersionRequest* request, grpc::ServerWriter* response) { - return emit on_version(context, request, response); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_version, this, context, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::ping(grpc::ServerContext* context, const PingRequest* request, PingReply* response) diff --git a/src/daemon/daemon_rpc.h b/src/daemon/daemon_rpc.h index 49a987f3f9e..815974fad0e 100644 --- a/src/daemon/daemon_rpc.h +++ b/src/daemon/daemon_rpc.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017 Canonical, Ltd. + * Copyright (C) 2017-2019 Canonical, Ltd. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -27,6 +27,8 @@ #include #include + +#include #include namespace multipass @@ -47,39 +49,38 @@ class DaemonRpc : public QObject, public multipass::Rpc::Service DaemonRpc& operator=(const DaemonRpc&) = delete; signals: - // All these signals must be connected to with a BlockingQueuedConnection!!! grpc::Status on_create(grpc::ServerContext* context, const CreateRequest* request, grpc::ServerWriter* reply); - grpc::Status on_launch(grpc::ServerContext* context, const LaunchRequest* request, - grpc::ServerWriter* reply); - grpc::Status on_purge(grpc::ServerContext* context, const PurgeRequest* request, - grpc::ServerWriter* response); - grpc::Status on_find(grpc::ServerContext* context, const FindRequest* request, - grpc::ServerWriter* response); - grpc::Status on_info(grpc::ServerContext* context, const InfoRequest* request, - grpc::ServerWriter* response); - grpc::Status on_list(grpc::ServerContext* context, const ListRequest* request, - grpc::ServerWriter* response); - grpc::Status on_mount(grpc::ServerContext* context, const MountRequest* request, - grpc::ServerWriter* response); - grpc::Status on_recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* response); - grpc::Status on_ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* response); - grpc::Status on_start(grpc::ServerContext* context, const StartRequest* request, - grpc::ServerWriter* response); - grpc::Status on_stop(grpc::ServerContext* context, const StopRequest* request, - grpc::ServerWriter* response); - grpc::Status on_suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* response); - grpc::Status on_restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* response); - grpc::Status on_delete(grpc::ServerContext* context, const DeleteRequest* request, - grpc::ServerWriter* response); - grpc::Status on_umount(grpc::ServerContext* context, const UmountRequest* request, - grpc::ServerWriter* response); - grpc::Status on_version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* response); + void on_launch(grpc::ServerContext* context, const LaunchRequest* request, grpc::ServerWriter* reply, + std::promise* status_promise); + void on_purge(grpc::ServerContext* context, const PurgeRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_mount(grpc::ServerContext* context, const MountRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_recover(grpc::ServerContext* context, const RecoverRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); + void on_ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); + void on_start(grpc::ServerContext* context, const StartRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_suspend(grpc::ServerContext* context, const SuspendRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); + void on_restart(grpc::ServerContext* context, const RestartRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); + void on_delete(grpc::ServerContext* context, const DeleteRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); + void on_umount(grpc::ServerContext* context, const UmountRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); + void on_version(grpc::ServerContext* context, const VersionRequest* request, + grpc::ServerWriter* response, std::promise* status_promise); private: const std::string server_address; From 3467fe469fcf2ce091499f080e1b0fb5d2a63cb7 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Mon, 11 Mar 2019 09:56:04 -0400 Subject: [PATCH 02/16] daemon/rpc: Remove unneeded ServerContext parameter This is due to making the affected functions no longer override the grpc Server class functions. --- src/daemon/daemon.cpp | 42 ++++++++++++++----------------------- src/daemon/daemon.h | 40 +++++++++++++++++------------------ src/daemon/daemon_rpc.cpp | 30 +++++++++++++------------- src/daemon/daemon_rpc.h | 44 +++++++++++++++++++-------------------- 4 files changed, 73 insertions(+), 83 deletions(-) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 1b5b8cd3f26..933ffad6a92 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -665,8 +665,7 @@ catch (const std::exception& e) return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); } -void mp::Daemon::launch(grpc::ServerContext* context, const LaunchRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::launch(const LaunchRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -719,8 +718,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::purge(grpc::ServerContext* context, const PurgeRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::purge(const PurgeRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -737,7 +735,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* server, +void mp::Daemon::find(const FindRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -895,7 +893,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* server, +void mp::Daemon::info(const InfoRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1062,7 +1060,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* server, +void mp::Daemon::list(const ListRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1140,8 +1138,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::mount(grpc::ServerContext* context, const MountRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::mount(const MountRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1249,8 +1246,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::recover(const RecoverRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1291,8 +1287,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::ssh_info(const SSHInfoRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1344,8 +1339,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::start(grpc::ServerContext* context, const StartRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::start(const StartRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1478,7 +1472,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* server, +void mp::Daemon::stop(const StopRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1509,8 +1503,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::suspend(const SuspendRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1556,8 +1549,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::restart(const RestartRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1592,8 +1584,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::delet(grpc::ServerContext* context, const DeleteRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::delet(const DeleteRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1653,8 +1644,7 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::umount(grpc::ServerContext* context, const UmountRequest* request, - grpc::ServerWriter* server, +void mp::Daemon::umount(const UmountRequest* request, grpc::ServerWriter* server, std::promise* status_promise) // clang-format off try // clang-format on { @@ -1727,8 +1717,8 @@ catch (const std::exception& e) status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } -void mp::Daemon::version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* server, std::promise* status_promise) +void mp::Daemon::version(const VersionRequest* request, grpc::ServerWriter* server, + std::promise* status_promise) { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index 227583c4e6e..a0a2a27f41d 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -83,50 +83,50 @@ public slots: grpc::Status create(grpc::ServerContext* context, const CreateRequest* request, grpc::ServerWriter* reply) override; - void launch(grpc::ServerContext* context, const LaunchRequest* request, grpc::ServerWriter* reply, + void launch(const LaunchRequest* request, grpc::ServerWriter* reply, std::promise* status_promise); - void purge(grpc::ServerContext* context, const PurgeRequest* request, grpc::ServerWriter* response, + void purge(const PurgeRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* response, + void find(const FindRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* response, + void info(const InfoRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* response, + void list(const ListRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void mount(grpc::ServerContext* context, const MountRequest* request, grpc::ServerWriter* response, + void mount(const MountRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); + void recover(const RecoverRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); + void ssh_info(const SSHInfoRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void start(grpc::ServerContext* context, const StartRequest* request, grpc::ServerWriter* response, + void start(const StartRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* response, + void stop(const StopRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); + void suspend(const SuspendRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); + void restart(const RestartRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void delet(grpc::ServerContext* context, const DeleteRequest* request, grpc::ServerWriter* response, + void delet(const DeleteRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void umount(grpc::ServerContext* context, const UmountRequest* request, grpc::ServerWriter* response, + void umount(const UmountRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); + void version(const VersionRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); private: void persist_instances(); diff --git a/src/daemon/daemon_rpc.cpp b/src/daemon/daemon_rpc.cpp index 2bfb898c68a..df39e1ff33f 100644 --- a/src/daemon/daemon_rpc.cpp +++ b/src/daemon/daemon_rpc.cpp @@ -116,105 +116,105 @@ grpc::Status mp::DaemonRpc::launch(grpc::ServerContext* context, const LaunchReq grpc::ServerWriter* reply) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_launch, this, context, request, reply, std::placeholders::_1)); + std::bind(&DaemonRpc::on_launch, this, request, reply, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::purge(grpc::ServerContext* context, const PurgeRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_purge, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_purge, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_find, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_find, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_info, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_info, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_list, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_list, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::mount(grpc::ServerContext* context, const MountRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_mount, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_mount, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::recover(grpc::ServerContext* context, const RecoverRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_recover, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_recover, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_ssh_info, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_ssh_info, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::start(grpc::ServerContext* context, const StartRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_start, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_start, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_stop, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_stop, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::suspend(grpc::ServerContext* context, const SuspendRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_suspend, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_suspend, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::restart(grpc::ServerContext* context, const RestartRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_restart, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_restart, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::delet(grpc::ServerContext* context, const DeleteRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_delete, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_delete, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::umount(grpc::ServerContext* context, const UmountRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_umount, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_umount, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::version(grpc::ServerContext* context, const VersionRequest* request, grpc::ServerWriter* response) { return emit_signal_and_wait_for_result( - std::bind(&DaemonRpc::on_version, this, context, request, response, std::placeholders::_1)); + std::bind(&DaemonRpc::on_version, this, request, response, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::ping(grpc::ServerContext* context, const PingRequest* request, PingReply* response) diff --git a/src/daemon/daemon_rpc.h b/src/daemon/daemon_rpc.h index 815974fad0e..9b5bee0b8cd 100644 --- a/src/daemon/daemon_rpc.h +++ b/src/daemon/daemon_rpc.h @@ -51,36 +51,36 @@ class DaemonRpc : public QObject, public multipass::Rpc::Service signals: grpc::Status on_create(grpc::ServerContext* context, const CreateRequest* request, grpc::ServerWriter* reply); - void on_launch(grpc::ServerContext* context, const LaunchRequest* request, grpc::ServerWriter* reply, + void on_launch(const LaunchRequest* request, grpc::ServerWriter* reply, std::promise* status_promise); - void on_purge(grpc::ServerContext* context, const PurgeRequest* request, grpc::ServerWriter* response, + void on_purge(const PurgeRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_find(grpc::ServerContext* context, const FindRequest* request, grpc::ServerWriter* response, + void on_find(const FindRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_info(grpc::ServerContext* context, const InfoRequest* request, grpc::ServerWriter* response, + void on_info(const InfoRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_list(grpc::ServerContext* context, const ListRequest* request, grpc::ServerWriter* response, + void on_list(const ListRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_mount(grpc::ServerContext* context, const MountRequest* request, grpc::ServerWriter* response, + void on_mount(const MountRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); - void on_ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); - void on_start(grpc::ServerContext* context, const StartRequest* request, grpc::ServerWriter* response, + void on_recover(const RecoverRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_ssh_info(const SSHInfoRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_start(const StartRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_stop(grpc::ServerContext* context, const StopRequest* request, grpc::ServerWriter* response, + void on_stop(const StopRequest* request, grpc::ServerWriter* response, std::promise* status_promise); - void on_suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); - void on_restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); - void on_delete(grpc::ServerContext* context, const DeleteRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); - void on_umount(grpc::ServerContext* context, const UmountRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); - void on_version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* response, std::promise* status_promise); + void on_suspend(const SuspendRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_restart(const RestartRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_delete(const DeleteRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_umount(const UmountRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); + void on_version(const VersionRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); private: const std::string server_address; From 8fe3fc427efdaf075cce9ec6c17e3f38435dfd51 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Wed, 13 Mar 2019 15:50:26 -0400 Subject: [PATCH 03/16] daemon: Make wait_until_ssh_up() async in launch and start Also makes mounting in start async. --- src/daemon/daemon.cpp | 158 +++++++++++++++++++++++++++--------------- src/daemon/daemon.h | 17 +++++ 2 files changed, 119 insertions(+), 56 deletions(-) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 933ffad6a92..173c6ebdc1a 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -49,6 +49,7 @@ #include #include #include +#include #include #include @@ -1399,64 +1400,16 @@ try // clang-format on } } - // Start them all first before we go and do a blocking wait_until_ssh_up call for (const auto& name : vms) { auto it = vm_instances.find(name); it->second->start(); } - bool update_instance_db{false}; - fmt::memory_buffer errors; - for (const auto& name : vms) - { - auto it = vm_instances.find(name); - auto& vm = it->second; - auto& mounts = vm_instance_specs[name].mounts; - - vm->wait_until_ssh_up(std::chrono::minutes(2)); - - std::vector invalid_mounts; - for (const auto& mount_entry : mounts) - { - auto& target_path = mount_entry.first; - auto& source_path = mount_entry.second.source_path; - auto& uid_map = mount_entry.second.uid_map; - auto& gid_map = mount_entry.second.gid_map; - - try - { - start_mount(vm, name, source_path, target_path, gid_map, uid_map); - } - catch (const mp::SSHFSMissingError&) - { - try - { - StartReply start_reply; - start_reply.set_start_message("Enabling support for mounting"); - server->Write(start_reply); - install_sshfs(vm, name); - start_mount(vm, name, source_path, target_path, gid_map, uid_map); - } - catch (const mp::SSHFSMissingError&) - { - status_promise->set_value(grpc_status_for_mount_error(name)); - } - } - catch (const std::exception& e) - { - fmt::format_to(errors, "Removing \"{}\": {}", target_path, e.what()); - invalid_mounts.push_back(target_path); - } - } - - update_instance_db = !invalid_mounts.empty(); - for (const auto& invalid_mount : invalid_mounts) - mounts.erase(invalid_mount); - } - - if (update_instance_db) - persist_instances(); + async_future_watchers.emplace_back(std::make_unique>()); + auto& future_watcher = async_future_watchers.back(); + QObject::connect(future_watcher.get(), &QFutureWatcher::finished, + [this, &future_watcher]() { finish_async_operation(future_watcher->future()); }); if (config->update_prompt->is_time_to_show()) { @@ -1465,7 +1418,8 @@ try // clang-format on server->Write(start_reply); } - status_promise->set_value(grpc_status_for(errors)); + future_watcher->setFuture( + QtConcurrent::run(this, &Daemon::async_wait_for_ssh_and_start_mounts, server, vms, status_promise)); } catch (const std::exception& e) { @@ -2024,14 +1978,18 @@ grpc::Status mp::Daemon::create_vm(grpc::ServerContext* context, const CreateReq auto& vm = vm_instances[name]; vm->start(); - vm->wait_until_ssh_up(std::chrono::minutes(5)); reply.set_vm_instance_name(name); config->update_prompt->populate_if_time_to_show(reply.mutable_update_info()); server->Write(reply); - } - status_promise->set_value(grpc::Status::OK); + async_future_watchers.emplace_back(std::make_unique>()); + auto& future_watcher = async_future_watchers.back(); + QObject::connect(future_watcher.get(), &QFutureWatcher::finished, + [this, &future_watcher]() { finish_async_operation(future_watcher->future()); }); + + future_watcher->setFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh, std::ref(vm), status_promise)); + } } grpc::Status mp::Daemon::reboot_vm(VirtualMachine& vm) @@ -2144,3 +2102,91 @@ void mp::Daemon::install_sshfs(const VirtualMachine::UPtr& vm, const std::string if (retries > max_install_sshfs_retries) throw mp::SSHFSMissingError(); } + +mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh(const VirtualMachine::UPtr& vm, + std::promise* status_promise) +{ + try + { + vm->wait_until_ssh_up(up_timeout); + } + catch (const std::exception& e) + { + return {grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""), status_promise}; + } + + return {grpc::Status::OK, status_promise}; +} + +template +mp::Daemon::AsyncOperationStatus +mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* server, const std::vector& vms, + std::promise* status_promise) +{ + fmt::memory_buffer errors; + for (const auto& name : vms) + { + auto it = vm_instances.find(name); + auto& vm = it->second; + auto& mounts = vm_instance_specs[name].mounts; + + async_wait_for_ssh(vm, nullptr); + + std::vector invalid_mounts; + for (const auto& mount_entry : mounts) + { + auto& target_path = mount_entry.first; + auto& source_path = mount_entry.second.source_path; + auto& uid_map = mount_entry.second.uid_map; + auto& gid_map = mount_entry.second.gid_map; + + try + { + start_mount(vm, name, source_path, target_path, gid_map, uid_map); + } + catch (const mp::SSHFSMissingError&) + { + try + { + StartReply start_reply; + start_reply.set_start_message("Enabling support for mounting"); + server->Write(start_reply); + install_sshfs(vm, name); + start_mount(vm, name, source_path, target_path, gid_map, uid_map); + } + catch (const mp::SSHFSMissingError&) + { + fmt::format_to(errors, "Error enabling mount support in '{}'", name); + break; + } + } + catch (const std::exception& e) + { + fmt::format_to(errors, "Removing \"{}\": {}", target_path, e.what()); + invalid_mounts.push_back(target_path); + } + } + } + + return {grpc_status_for(errors), status_promise}; +} + +void mp::Daemon::finish_async_operation(QFuture async_future) +{ + auto it = std::find_if(async_future_watchers.begin(), async_future_watchers.end(), + [&async_future](const std::unique_ptr>& watcher) { + return watcher->future() == async_future; + }); + + if (it != async_future_watchers.end()) + { + async_future_watchers.erase(it); + } + + auto async_op_result = async_future.result(); + + if (!async_op_result.status.ok()) + persist_instances(); + + async_op_result.status_promise->set_value(async_op_result.status); +} diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index a0a2a27f41d..9e919785824 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -31,6 +31,9 @@ #include #include #include +#include + +#include namespace multipass { @@ -145,6 +148,20 @@ public slots: grpc::Status cmd_vms(const std::vector& tgts, std::function cmd); void install_sshfs(const VirtualMachine::UPtr& vm, const std::string& name); + struct AsyncOperationStatus + { + grpc::Status status; + std::promise* status_promise; + }; + + AsyncOperationStatus async_wait_for_ssh(const VirtualMachine::UPtr& vm, std::promise* status_promise); + template + AsyncOperationStatus async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* server, + const std::vector& vms, + std::promise* status_promise); + void finish_async_operation(QFuture async_future); + std::vector>> async_future_watchers; + std::unique_ptr config; std::unordered_map vm_instance_specs; std::unordered_map vm_instances; From 98b669157f6404887babdc2c36a0275da5ef4feb Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Thu, 14 Mar 2019 13:13:48 -0400 Subject: [PATCH 04/16] daemon: Make wait_until_ssh_up() async in restart operations Also commonize the creation of the QFutureWatcher. --- src/client/cmd/start.cpp | 2 +- src/daemon/daemon.cpp | 120 ++++++++++++++++++++------------------- src/daemon/daemon.h | 6 +- src/rpc/multipass.proto | 2 +- 4 files changed, 70 insertions(+), 60 deletions(-) diff --git a/src/client/cmd/start.cpp b/src/client/cmd/start.cpp index e7d0804fd09..a10c915d78a 100644 --- a/src/client/cmd/start.cpp +++ b/src/client/cmd/start.cpp @@ -66,7 +66,7 @@ mp::ReturnCode cmd::Start::run(mp::ArgParser* parser) auto streaming_callback = [&spinner](mp::StartReply& reply) { spinner.stop(); - spinner.start(reply.start_message()); + spinner.start(reply.reply_message()); }; spinner.start(instance_action_message_for(request.instance_names(), "Starting ")); diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 173c6ebdc1a..ab3c5e0aad8 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -1406,11 +1406,6 @@ try // clang-format on it->second->start(); } - async_future_watchers.emplace_back(std::make_unique>()); - auto& future_watcher = async_future_watchers.back(); - QObject::connect(future_watcher.get(), &QFutureWatcher::finished, - [this, &future_watcher]() { finish_async_operation(future_watcher->future()); }); - if (config->update_prompt->is_time_to_show()) { StartReply start_reply; @@ -1418,6 +1413,7 @@ try // clang-format on server->Write(start_reply); } + auto future_watcher = create_future_watcher(); future_watcher->setFuture( QtConcurrent::run(this, &Daemon::async_wait_for_ssh_and_start_mounts, server, vms, status_promise)); } @@ -1522,16 +1518,11 @@ try // clang-format on if (status.ok()) { - status = cmd_vms(instances, [](auto& vm) { - // 2nd pass waits for them (only works because SSH was manually killed before rebooting) - vm.wait_until_ssh_up(up_timeout); - - return grpc::Status::OK; - }); + auto future_watcher = create_future_watcher(); + future_watcher->setFuture( + QtConcurrent::run(this, &Daemon::async_wait_for_ssh_all, instances, status_promise)); } } - - status_promise->set_value(status); } catch (const std::exception& e) { @@ -1701,37 +1692,9 @@ void mp::Daemon::on_suspend() void mp::Daemon::on_restart(const std::string& name) { - auto& vm = vm_instances[name]; - vm->wait_until_ssh_up(std::chrono::minutes(5)); - - auto& mounts = vm_instance_specs[name].mounts; - std::vector invalid_mounts; - - for (const auto& mount_entry : mounts) - { - auto& target_path = mount_entry.first; - auto& source_path = mount_entry.second.source_path; - auto& uid_map = mount_entry.second.uid_map; - auto& gid_map = mount_entry.second.gid_map; - - try - { - start_mount(vm, name, source_path, target_path, gid_map, uid_map); - } - catch (const std::exception& e) - { - mpl::log( - mpl::Level::error, name, - fmt::format("Mount error detected during instance reboot. Removing \"{}\": {}", target_path, e.what())); - invalid_mounts.push_back(target_path); - } - } - - for (const auto& invalid_mount : invalid_mounts) - mounts.erase(invalid_mount); - - if (!invalid_mounts.empty()) - persist_instances(); + auto future_watcher = create_future_watcher(); + future_watcher->setFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh_and_start_mounts, nullptr, + std::vector{name}, nullptr)); } void mp::Daemon::persist_state_for(const std::string& name) @@ -1983,12 +1946,8 @@ grpc::Status mp::Daemon::create_vm(grpc::ServerContext* context, const CreateReq config->update_prompt->populate_if_time_to_show(reply.mutable_update_info()); server->Write(reply); - async_future_watchers.emplace_back(std::make_unique>()); - auto& future_watcher = async_future_watchers.back(); - QObject::connect(future_watcher.get(), &QFutureWatcher::finished, - [this, &future_watcher]() { finish_async_operation(future_watcher->future()); }); - - future_watcher->setFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh, std::ref(vm), status_promise)); + auto future_watcher = create_future_watcher(); + future_watcher->setFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh_for, std::ref(vm), status_promise)); } } @@ -2103,8 +2062,19 @@ void mp::Daemon::install_sshfs(const VirtualMachine::UPtr& vm, const std::string throw mp::SSHFSMissingError(); } -mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh(const VirtualMachine::UPtr& vm, - std::promise* status_promise) +QFutureWatcher* mp::Daemon::create_future_watcher() +{ + async_future_watchers.emplace_back(std::make_unique>()); + + auto future_watcher = async_future_watchers.back().get(); + QObject::connect(future_watcher, &QFutureWatcher::finished, + [this, future_watcher]() { finish_async_operation(future_watcher->future()); }); + + return future_watcher; +} + +mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_for(const VirtualMachine::UPtr& vm, + std::promise* status_promise) { try { @@ -2118,6 +2088,29 @@ mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh(const VirtualMac return {grpc::Status::OK, status_promise}; } +mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_all(const std::vector& vms, + std::promise* status_promise) +{ + fmt::memory_buffer errors; + for (const auto& name : vms) + { + auto it = vm_instances.find(name); + auto& vm = it->second; + + try + { + async_wait_for_ssh_for(vm, nullptr); + } + catch (const std::exception& e) + { + fmt::format_to(errors, "Error starting '{}': {}", name, e.what()); + continue; + } + } + + return {grpc_status_for(errors), status_promise}; +} + template mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* server, const std::vector& vms, @@ -2130,7 +2123,15 @@ mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* serve auto& vm = it->second; auto& mounts = vm_instance_specs[name].mounts; - async_wait_for_ssh(vm, nullptr); + try + { + async_wait_for_ssh_for(vm, nullptr); + } + catch (const std::exception& e) + { + fmt::format_to(errors, "Error starting '{}': {}", name, e.what()); + continue; + } std::vector invalid_mounts; for (const auto& mount_entry : mounts) @@ -2148,9 +2149,13 @@ mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* serve { try { - StartReply start_reply; - start_reply.set_start_message("Enabling support for mounting"); - server->Write(start_reply); + if (server) + { + Reply reply; + reply.set_reply_message("Enabling support for mounting"); + server->Write(reply); + } + install_sshfs(vm, name); start_mount(vm, name, source_path, target_path, gid_map, uid_map); } @@ -2188,5 +2193,6 @@ void mp::Daemon::finish_async_operation(QFuture async_futu if (!async_op_result.status.ok()) persist_instances(); - async_op_result.status_promise->set_value(async_op_result.status); + if (async_op_result.status_promise) + async_op_result.status_promise->set_value(async_op_result.status); } diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index 9e919785824..3246fc4ed8f 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -154,12 +154,16 @@ public slots: std::promise* status_promise; }; - AsyncOperationStatus async_wait_for_ssh(const VirtualMachine::UPtr& vm, std::promise* status_promise); + AsyncOperationStatus async_wait_for_ssh_for(const VirtualMachine::UPtr& vm, + std::promise* status_promise); + AsyncOperationStatus async_wait_for_ssh_all(const std::vector& vms, + std::promise* status_promise); template AsyncOperationStatus async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* server, const std::vector& vms, std::promise* status_promise); void finish_async_operation(QFuture async_future); + QFutureWatcher* create_future_watcher(); std::vector>> async_future_watchers; std::unique_ptr config; diff --git a/src/rpc/multipass.proto b/src/rpc/multipass.proto index b0ce557fb11..2d8620ba730 100644 --- a/src/rpc/multipass.proto +++ b/src/rpc/multipass.proto @@ -287,7 +287,7 @@ message StartRequest { message StartReply { string log_line = 1; - string start_message = 2; + string reply_message = 2; UpdateInfo update_info = 3; } From 42af446592209e6d83b7e49b48bf77ddc6d3fc31 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Thu, 14 Mar 2019 16:25:42 -0400 Subject: [PATCH 05/16] backends/qemu: Add state synchronization for starting and shutdown --- include/multipass/virtual_machine.h | 10 ++++++--- src/daemon/daemon.cpp | 18 ++++++---------- .../backends/qemu/qemu_virtual_machine.cpp | 21 ++++++++++++++++++- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/include/multipass/virtual_machine.h b/include/multipass/virtual_machine.h index 982476a46e5..e9a31b07a8f 100644 --- a/include/multipass/virtual_machine.h +++ b/include/multipass/virtual_machine.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2018 Canonical, Ltd. + * Copyright (C) 2017-2019 Canonical, Ltd. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -13,13 +13,14 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . * - * Authored by: Alberto Aguirre - * */ #ifndef MULTIPASS_VIRTUAL_MACHINE_H #define MULTIPASS_VIRTUAL_MACHINE_H +#include +#include + #include #include #include @@ -71,6 +72,9 @@ class VirtualMachine : VirtualMachine(State::off, key_provider, vm_name){}; VirtualMachine(const VirtualMachine&) = delete; VirtualMachine& operator=(const VirtualMachine&) = delete; + + QWaitCondition state_wait; + QMutex state_mutex; }; } #endif // MULTIPASS_VIRTUAL_MACHINE_H diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index ab3c5e0aad8..56315df13f4 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -2097,13 +2097,10 @@ mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_all(const std::v auto it = vm_instances.find(name); auto& vm = it->second; - try - { - async_wait_for_ssh_for(vm, nullptr); - } - catch (const std::exception& e) + auto ssh_status = async_wait_for_ssh_for(vm, nullptr); + if (!ssh_status.status.ok()) { - fmt::format_to(errors, "Error starting '{}': {}", name, e.what()); + fmt::format_to(errors, "Error starting '{}': {}", name, ssh_status.status.error_message()); continue; } } @@ -2123,13 +2120,10 @@ mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* serve auto& vm = it->second; auto& mounts = vm_instance_specs[name].mounts; - try - { - async_wait_for_ssh_for(vm, nullptr); - } - catch (const std::exception& e) + auto ssh_status = async_wait_for_ssh_for(vm, nullptr); + if (!ssh_status.status.ok()) { - fmt::format_to(errors, "Error starting '{}': {}", name, e.what()); + fmt::format_to(errors, "Error starting '{}': {}", name, ssh_status.status.error_message()); continue; } diff --git a/src/platform/backends/qemu/qemu_virtual_machine.cpp b/src/platform/backends/qemu/qemu_virtual_machine.cpp index 24ec1b14fea..512594d487f 100644 --- a/src/platform/backends/qemu/qemu_virtual_machine.cpp +++ b/src/platform/backends/qemu/qemu_virtual_machine.cpp @@ -230,7 +230,7 @@ mp::QemuVirtualMachine::QemuVirtualMachine(const ProcessFactory* process_factory [this](int exitCode, QProcess::ExitStatus exitStatus) { mpl::log(mpl::Level::info, vm_name, fmt::format("process finished with exit code {}", exitCode)); - if (update_shutdown_status) + if (update_shutdown_status || state == State::starting) { on_shutdown(); } @@ -308,6 +308,9 @@ void mp::QemuVirtualMachine::shutdown() } else { + if (state == State::starting) + update_shutdown_status = false; + vm_process->kill(); vm_process->waitForFinished(); } @@ -365,6 +368,16 @@ void mp::QemuVirtualMachine::on_error() void mp::QemuVirtualMachine::on_shutdown() { + state_mutex.lock(); + + if (state == State::starting) + { + saved_error_msg = "shutdown called while starting"; + state_wait.wait(&state_mutex); + } + + state_mutex.unlock(); + state = State::off; ip = nullopt; update_state(); @@ -389,8 +402,14 @@ void mp::QemuVirtualMachine::on_restart() void mp::QemuVirtualMachine::ensure_vm_is_running() { + state_mutex.lock(); if (vm_process->state() == QProcess::NotRunning) + { + state_wait.notify_all(); + state_mutex.unlock(); throw mp::StartException(vm_name, saved_error_msg); + } + state_mutex.unlock(); } std::string mp::QemuVirtualMachine::ssh_hostname() From 05a86304c1ac9741088bdc03606b5eec8825f8b0 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Mon, 25 Mar 2019 11:01:56 -0400 Subject: [PATCH 06/16] tests/client: Add StubDaemonRpc for testing the cli --- tests/stub_daemon_rpc.h | 127 ++++++++++++++++++++++++++++++++++++++ tests/test_cli_client.cpp | 4 +- 2 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 tests/stub_daemon_rpc.h diff --git a/tests/stub_daemon_rpc.h b/tests/stub_daemon_rpc.h new file mode 100644 index 00000000000..82bd4ccc8e8 --- /dev/null +++ b/tests/stub_daemon_rpc.h @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2019 Canonical, Ltd. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 3. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +#ifndef MULTIPASS_STUB_DAEMON_RPC_H +#define MULTIPASS_STUB_DAEMON_RPC_H + +#include + +namespace multipass +{ +namespace test +{ +struct StubDaemonRpc final : public DaemonRpc +{ + StubDaemonRpc(const std::string& server_address, RpcConnectionType type, const CertProvider& cert_provider, + const CertStore& client_cert_store) + : DaemonRpc{server_address, type, cert_provider, client_cert_store} + { + } + + grpc::Status launch(grpc::ServerContext* context, const LaunchRequest* request, + grpc::ServerWriter* reply) override + { + return grpc::Status::OK; + } + + grpc::Status purge(grpc::ServerContext* context, const PurgeRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status find(grpc::ServerContext* context, const FindRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status info(grpc::ServerContext* context, const InfoRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status list(grpc::ServerContext* context, const ListRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status mount(grpc::ServerContext* context, const MountRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status recover(grpc::ServerContext* context, const RecoverRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status start(grpc::ServerContext* context, const StartRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status stop(grpc::ServerContext* context, const StopRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status suspend(grpc::ServerContext* context, const SuspendRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status restart(grpc::ServerContext* context, const RestartRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status delet(grpc::ServerContext* context, const DeleteRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status umount(grpc::ServerContext* context, const UmountRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } + + grpc::Status version(grpc::ServerContext* context, const VersionRequest* request, + grpc::ServerWriter* response) override + { + return grpc::Status::OK; + } +}; +} // namespace test +} // namespace multipass +#endif // MULTIPASS_STUB_DAEMON_RPC_H diff --git a/tests/test_cli_client.cpp b/tests/test_cli_client.cpp index 36e1c065bd6..a03435f9221 100644 --- a/tests/test_cli_client.cpp +++ b/tests/test_cli_client.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2018 Canonical, Ltd. + * Copyright (C) 2017-2019 Canonical, Ltd. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -19,11 +19,11 @@ #include "path.h" #include "stub_cert_store.h" #include "stub_certprovider.h" +#include "stub_daemon_rpc.h" #include "stub_terminal.h" #include #include -#include #include #include From b2461efc24ac56dae36da305ac59c0ea2e633740 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Mon, 25 Mar 2019 12:59:15 -0400 Subject: [PATCH 07/16] tests/daemon: Change mocks to match base Daemon class Make Daemon command functions virtual for mocking. Add default mock method calls to set the value of the promise. --- src/daemon/daemon.h | 60 ++++++++++++++++----------------- tests/test_daemon.cpp | 77 +++++++++++++++++++++++++++++++++---------- 2 files changed, 89 insertions(+), 48 deletions(-) diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index 3246fc4ed8f..8cd19ede1a0 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -86,50 +86,50 @@ public slots: grpc::Status create(grpc::ServerContext* context, const CreateRequest* request, grpc::ServerWriter* reply) override; - void launch(const LaunchRequest* request, grpc::ServerWriter* reply, - std::promise* status_promise); + virtual void launch(const LaunchRequest* request, grpc::ServerWriter* reply, + std::promise* status_promise); - void purge(const PurgeRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void purge(const PurgeRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void find(const FindRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void find(const FindRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void info(const InfoRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void info(const InfoRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void list(const ListRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void list(const ListRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void mount(const MountRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void mount(const MountRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void recover(const RecoverRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void recover(const RecoverRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void ssh_info(const SSHInfoRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void ssh_info(const SSHInfoRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void start(const StartRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void start(const StartRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void stop(const StopRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void stop(const StopRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void suspend(const SuspendRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void suspend(const SuspendRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void restart(const RestartRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void restart(const RestartRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void delet(const DeleteRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void delet(const DeleteRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void umount(const UmountRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void umount(const UmountRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); - void version(const VersionRequest* request, grpc::ServerWriter* response, - std::promise* status_promise); + virtual void version(const VersionRequest* request, grpc::ServerWriter* response, + std::promise* status_promise); private: void persist_instances(); diff --git a/tests/test_daemon.cpp b/tests/test_daemon.cpp index 409d9e4570c..75acb8d910f 100644 --- a/tests/test_daemon.cpp +++ b/tests/test_daemon.cpp @@ -61,35 +61,76 @@ namespace struct MockDaemon : public mp::Daemon { using mp::Daemon::Daemon; + + MockDaemon(std::unique_ptr config) : Daemon{std::move(config)} + { + ON_CALL(*this, launch(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, purge(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, find(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, info(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, list(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, mount(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, recover(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, ssh_info(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, start(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, stop(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, suspend(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, restart(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, delet(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, umount(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + ON_CALL(*this, version(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); + } + MOCK_METHOD3(create, grpc::Status(grpc::ServerContext*, const mp::CreateRequest*, grpc::ServerWriter*)); MOCK_METHOD3(launch, - grpc::Status(grpc::ServerContext*, const mp::LaunchRequest*, grpc::ServerWriter*)); + void(const mp::LaunchRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(purge, - grpc::Status(grpc::ServerContext*, const mp::PurgeRequest*, grpc::ServerWriter*)); - MOCK_METHOD3(find, grpc::Status(grpc::ServerContext* context, const mp::FindRequest* request, - grpc::ServerWriter*)); - MOCK_METHOD3(info, grpc::Status(grpc::ServerContext*, const mp::InfoRequest*, grpc::ServerWriter*)); - MOCK_METHOD3(list, grpc::Status(grpc::ServerContext*, const mp::ListRequest*, grpc::ServerWriter*)); - MOCK_METHOD3(mount, grpc::Status(grpc::ServerContext* context, const mp::MountRequest* request, - grpc::ServerWriter*)); + void(const mp::PurgeRequest*, grpc::ServerWriter*, std::promise*)); + MOCK_METHOD3(find, + void(const mp::FindRequest* request, grpc::ServerWriter*, std::promise*)); + MOCK_METHOD3(info, void(const mp::InfoRequest*, grpc::ServerWriter*, std::promise*)); + MOCK_METHOD3(list, void(const mp::ListRequest*, grpc::ServerWriter*, std::promise*)); + MOCK_METHOD3(mount, void(const mp::MountRequest* request, grpc::ServerWriter*, + std::promise*)); MOCK_METHOD3(recover, - grpc::Status(grpc::ServerContext*, const mp::RecoverRequest*, grpc::ServerWriter*)); + void(const mp::RecoverRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(ssh_info, - grpc::Status(grpc::ServerContext*, const mp::SSHInfoRequest*, grpc::ServerWriter*)); + void(const mp::SSHInfoRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(start, - grpc::Status(grpc::ServerContext*, const mp::StartRequest*, grpc::ServerWriter*)); - MOCK_METHOD3(stop, grpc::Status(grpc::ServerContext*, const mp::StopRequest*, grpc::ServerWriter*)); + void(const mp::StartRequest*, grpc::ServerWriter*, std::promise*)); + MOCK_METHOD3(stop, void(const mp::StopRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(suspend, - grpc::Status(grpc::ServerContext*, const mp::SuspendRequest*, grpc::ServerWriter*)); + void(const mp::SuspendRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(restart, - grpc::Status(grpc::ServerContext*, const mp::RestartRequest*, grpc::ServerWriter*)); + void(const mp::RestartRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(delet, - grpc::Status(grpc::ServerContext*, const mp::DeleteRequest*, grpc::ServerWriter*)); - MOCK_METHOD3(umount, grpc::Status(grpc::ServerContext* context, const mp::UmountRequest* request, - grpc::ServerWriter* response)); + void(const mp::DeleteRequest*, grpc::ServerWriter*, std::promise*)); + MOCK_METHOD3(umount, + void(const mp::UmountRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(version, - grpc::Status(grpc::ServerContext*, const mp::VersionRequest*, grpc::ServerWriter*)); + void(const mp::VersionRequest*, grpc::ServerWriter*, std::promise*)); + + template + void set_promise_value(const Request*, grpc::ServerWriter*, std::promise* status_promise) + { + status_promise->set_value(grpc::Status::OK); + } }; struct StubNameGenerator : public mp::NameGenerator From cf6ec5c5bf5fea2d92b55c7fb44027a240496a53 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Mon, 25 Mar 2019 14:47:28 -0400 Subject: [PATCH 08/16] daemon: Account for new `create` command merged in wrt concurrency --- src/daemon/daemon.cpp | 20 ++++++++++---------- src/daemon/daemon.h | 8 ++++---- src/daemon/daemon_rpc.cpp | 3 ++- src/daemon/daemon_rpc.h | 4 ++-- tests/test_cli_client.cpp | 2 +- tests/test_daemon.cpp | 4 +++- 6 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 56315df13f4..c0377c7942d 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -415,7 +415,7 @@ auto get_metrics_opt_in(const mp::Path& data_path) auto connect_rpc(mp::DaemonRpc& rpc, mp::Daemon& daemon) { - QObject::connect(&rpc, &mp::DaemonRpc::on_create, &daemon, &mp::Daemon::create, Qt::BlockingQueuedConnection); + QObject::connect(&rpc, &mp::DaemonRpc::on_create, &daemon, &mp::Daemon::create); QObject::connect(&rpc, &mp::DaemonRpc::on_launch, &daemon, &mp::Daemon::launch); QObject::connect(&rpc, &mp::DaemonRpc::on_purge, &daemon, &mp::Daemon::purge); QObject::connect(&rpc, &mp::DaemonRpc::on_find, &daemon, &mp::Daemon::find); @@ -654,16 +654,16 @@ mp::Daemon::Daemon(std::unique_ptr the_config) source_images_maintenance_task.start(config->image_refresh_timer); } -grpc::Status mp::Daemon::create(grpc::ServerContext* context, const CreateRequest* request, - grpc::ServerWriter* server) // clang-format off +void mp::Daemon::create(const CreateRequest* request, grpc::ServerWriter* server, + std::promise* status_promise) // clang-format off try // clang-format on { mpl::ClientLogger logger{mpl::level_from(request->verbosity_level()), *config->logger, server}; - return create_vm(context, request, server, /*start=*/false); + return create_vm(request, server, status_promise, /*start=*/false); } catch (const std::exception& e) { - return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); + status_promise->set_value(grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), "")); } void mp::Daemon::launch(const LaunchRequest* request, grpc::ServerWriter* server, @@ -702,7 +702,7 @@ try // clang-format on if (metrics_opt_in.opt_in_status == OptInStatus::ACCEPTED) metrics_provider.send_metrics(); - return create_vm(context, request, server, /*start=*/true); + return create_vm(request, server, status_promise, /*start=*/true); } catch (const mp::StartException& e) { @@ -1844,15 +1844,15 @@ std::string mp::Daemon::check_instance_exists(const std::string& instance_name) return {}; } -grpc::Status mp::Daemon::create_vm(grpc::ServerContext* context, const CreateRequest* request, - grpc::ServerWriter* server, bool start) +void mp::Daemon::create_vm(const CreateRequest* request, grpc::ServerWriter* server, + std::promise* status_promise, bool start) { auto checked_args = validate_create_arguments(request); if (!checked_args.option_errors.error_codes().empty()) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid arguments supplied", - checked_args.option_errors.SerializeAsString()); + return status_promise->set_value(grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid arguments supplied", + checked_args.option_errors.SerializeAsString())); } auto name = name_from(checked_args.instance_name, *config->name_generator, vm_instances); diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index 8cd19ede1a0..f49258f5684 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -83,8 +83,8 @@ class Daemon : public QObject, public multipass::VMStatusMonitor QJsonObject retrieve_metadata_for(const std::string& name) override; public slots: - grpc::Status create(grpc::ServerContext* context, const CreateRequest* request, - grpc::ServerWriter* reply) override; + virtual void create(const CreateRequest* request, grpc::ServerWriter* reply, + std::promise* status_promise); virtual void launch(const LaunchRequest* request, grpc::ServerWriter* reply, std::promise* status_promise); @@ -140,8 +140,8 @@ public slots: void release_resources(const std::string& instance); std::string check_instance_operational(const std::string& instance_name) const; std::string check_instance_exists(const std::string& instance_name) const; - grpc::Status create_vm(grpc::ServerContext* context, const CreateRequest* request, - grpc::ServerWriter* server, bool start); + void create_vm(const CreateRequest* request, grpc::ServerWriter* server, + std::promise* status_promise, bool start); grpc::Status reboot_vm(VirtualMachine& vm); grpc::Status shutdown_vm(VirtualMachine& vm, const std::chrono::milliseconds delay); grpc::Status cancel_vm_shutdown(const VirtualMachine& vm); diff --git a/src/daemon/daemon_rpc.cpp b/src/daemon/daemon_rpc.cpp index df39e1ff33f..939f7467b33 100644 --- a/src/daemon/daemon_rpc.cpp +++ b/src/daemon/daemon_rpc.cpp @@ -109,7 +109,8 @@ mp::DaemonRpc::DaemonRpc(const std::string& server_address, mp::RpcConnectionTyp grpc::Status mp::DaemonRpc::create(grpc::ServerContext* context, const CreateRequest* request, grpc::ServerWriter* reply) { - return emit on_create(context, request, reply); // must block until slot returns + return emit_signal_and_wait_for_result( + std::bind(&DaemonRpc::on_create, this, request, reply, std::placeholders::_1)); } grpc::Status mp::DaemonRpc::launch(grpc::ServerContext* context, const LaunchRequest* request, diff --git a/src/daemon/daemon_rpc.h b/src/daemon/daemon_rpc.h index 9b5bee0b8cd..1796e39ee0e 100644 --- a/src/daemon/daemon_rpc.h +++ b/src/daemon/daemon_rpc.h @@ -49,8 +49,8 @@ class DaemonRpc : public QObject, public multipass::Rpc::Service DaemonRpc& operator=(const DaemonRpc&) = delete; signals: - grpc::Status on_create(grpc::ServerContext* context, const CreateRequest* request, - grpc::ServerWriter* reply); + void on_create(const CreateRequest* request, grpc::ServerWriter* reply, + std::promise* status_promise); void on_launch(const LaunchRequest* request, grpc::ServerWriter* reply, std::promise* status_promise); void on_purge(const PurgeRequest* request, grpc::ServerWriter* response, diff --git a/tests/test_cli_client.cpp b/tests/test_cli_client.cpp index a03435f9221..2533ae54e3c 100644 --- a/tests/test_cli_client.cpp +++ b/tests/test_cli_client.cpp @@ -19,11 +19,11 @@ #include "path.h" #include "stub_cert_store.h" #include "stub_certprovider.h" -#include "stub_daemon_rpc.h" #include "stub_terminal.h" #include #include +#include #include #include diff --git a/tests/test_daemon.cpp b/tests/test_daemon.cpp index 75acb8d910f..1bcf72cd356 100644 --- a/tests/test_daemon.cpp +++ b/tests/test_daemon.cpp @@ -64,6 +64,8 @@ struct MockDaemon : public mp::Daemon MockDaemon(std::unique_ptr config) : Daemon{std::move(config)} { + ON_CALL(*this, create(_, _, _)) + .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); ON_CALL(*this, launch(_, _, _)) .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); ON_CALL(*this, purge(_, _, _)) @@ -97,7 +99,7 @@ struct MockDaemon : public mp::Daemon } MOCK_METHOD3(create, - grpc::Status(grpc::ServerContext*, const mp::CreateRequest*, grpc::ServerWriter*)); + void(const mp::CreateRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(launch, void(const mp::LaunchRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(purge, From 1d5edab4f358c26e50b78a12dfa116d1b9af0f46 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Mon, 25 Mar 2019 16:19:15 -0400 Subject: [PATCH 09/16] tests: Fix the last of the tests regarding 'create' and concurrency --- src/daemon/daemon.cpp | 4 ++ tests/stub_daemon_rpc.h | 127 -------------------------------------- tests/test_cli_client.cpp | 2 +- tests/test_daemon.cpp | 84 ++++++++++--------------- 4 files changed, 37 insertions(+), 180 deletions(-) delete mode 100644 tests/stub_daemon_rpc.h diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index c0377c7942d..b08578406e3 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -1949,6 +1949,10 @@ void mp::Daemon::create_vm(const CreateRequest* request, grpc::ServerWritersetFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh_for, std::ref(vm), status_promise)); } + else + { + status_promise->set_value(grpc::Status::OK); + } } grpc::Status mp::Daemon::reboot_vm(VirtualMachine& vm) diff --git a/tests/stub_daemon_rpc.h b/tests/stub_daemon_rpc.h deleted file mode 100644 index 82bd4ccc8e8..00000000000 --- a/tests/stub_daemon_rpc.h +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (C) 2019 Canonical, Ltd. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; version 3. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - * - */ - -#ifndef MULTIPASS_STUB_DAEMON_RPC_H -#define MULTIPASS_STUB_DAEMON_RPC_H - -#include - -namespace multipass -{ -namespace test -{ -struct StubDaemonRpc final : public DaemonRpc -{ - StubDaemonRpc(const std::string& server_address, RpcConnectionType type, const CertProvider& cert_provider, - const CertStore& client_cert_store) - : DaemonRpc{server_address, type, cert_provider, client_cert_store} - { - } - - grpc::Status launch(grpc::ServerContext* context, const LaunchRequest* request, - grpc::ServerWriter* reply) override - { - return grpc::Status::OK; - } - - grpc::Status purge(grpc::ServerContext* context, const PurgeRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status find(grpc::ServerContext* context, const FindRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status info(grpc::ServerContext* context, const InfoRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status list(grpc::ServerContext* context, const ListRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status mount(grpc::ServerContext* context, const MountRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status recover(grpc::ServerContext* context, const RecoverRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status ssh_info(grpc::ServerContext* context, const SSHInfoRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status start(grpc::ServerContext* context, const StartRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status stop(grpc::ServerContext* context, const StopRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status suspend(grpc::ServerContext* context, const SuspendRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status restart(grpc::ServerContext* context, const RestartRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status delet(grpc::ServerContext* context, const DeleteRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status umount(grpc::ServerContext* context, const UmountRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } - - grpc::Status version(grpc::ServerContext* context, const VersionRequest* request, - grpc::ServerWriter* response) override - { - return grpc::Status::OK; - } -}; -} // namespace test -} // namespace multipass -#endif // MULTIPASS_STUB_DAEMON_RPC_H diff --git a/tests/test_cli_client.cpp b/tests/test_cli_client.cpp index 2533ae54e3c..36e1c065bd6 100644 --- a/tests/test_cli_client.cpp +++ b/tests/test_cli_client.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2019 Canonical, Ltd. + * Copyright (C) 2017-2018 Canonical, Ltd. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by diff --git a/tests/test_daemon.cpp b/tests/test_daemon.cpp index 1bcf72cd356..7d15243fb1f 100644 --- a/tests/test_daemon.cpp +++ b/tests/test_daemon.cpp @@ -62,42 +62,6 @@ struct MockDaemon : public mp::Daemon { using mp::Daemon::Daemon; - MockDaemon(std::unique_ptr config) : Daemon{std::move(config)} - { - ON_CALL(*this, create(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, launch(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, purge(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, find(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, info(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, list(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, mount(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, recover(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, ssh_info(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, start(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, stop(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, suspend(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, restart(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, delet(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, umount(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - ON_CALL(*this, version(_, _, _)) - .WillByDefault(Invoke(this, &MockDaemon::set_promise_value)); - } - MOCK_METHOD3(create, void(const mp::CreateRequest*, grpc::ServerWriter*, std::promise*)); MOCK_METHOD3(launch, @@ -311,22 +275,38 @@ TEST_F(Daemon, receives_commands) { MockDaemon daemon{config_builder.build()}; - EXPECT_CALL(daemon, create(_, _, _)); - EXPECT_CALL(daemon, launch(_, _, _)); - EXPECT_CALL(daemon, purge(_, _, _)); - EXPECT_CALL(daemon, find(_, _, _)); - EXPECT_CALL(daemon, ssh_info(_, _, _)); - EXPECT_CALL(daemon, info(_, _, _)); - EXPECT_CALL(daemon, list(_, _, _)); - EXPECT_CALL(daemon, recover(_, _, _)); - EXPECT_CALL(daemon, start(_, _, _)); - EXPECT_CALL(daemon, stop(_, _, _)); - EXPECT_CALL(daemon, suspend(_, _, _)); - EXPECT_CALL(daemon, restart(_, _, _)); - EXPECT_CALL(daemon, delet(_, _, _)); - EXPECT_CALL(daemon, version(_, _, _)); - EXPECT_CALL(daemon, mount(_, _, _)); - EXPECT_CALL(daemon, umount(_, _, _)); + EXPECT_CALL(daemon, create(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, launch(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, purge(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, find(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, ssh_info(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, info(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, list(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, recover(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, start(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, stop(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, suspend(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, restart(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, delet(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, version(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, mount(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); + EXPECT_CALL(daemon, umount(_, _, _)) + .WillOnce(Invoke(&daemon, &MockDaemon::set_promise_value)); send_commands({{"test_create", "foo"}, {"launch", "foo"}, From 27518b1ade17b89984f92c47899e10d02a56e8e5 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Tue, 26 Mar 2019 14:20:24 -0400 Subject: [PATCH 10/16] backends/libvirt: Add critical section locks --- include/multipass/virtual_machine.h | 10 ++++----- .../libvirt/libvirt_virtual_machine.cpp | 22 ++++++++++++++++++- .../libvirt/libvirt_virtual_machine.h | 4 +++- .../backends/qemu/qemu_virtual_machine.cpp | 16 +++++--------- src/utils/utils.cpp | 2 ++ 5 files changed, 35 insertions(+), 19 deletions(-) diff --git a/include/multipass/virtual_machine.h b/include/multipass/virtual_machine.h index e9a31b07a8f..da607642d45 100644 --- a/include/multipass/virtual_machine.h +++ b/include/multipass/virtual_machine.h @@ -18,11 +18,10 @@ #ifndef MULTIPASS_VIRTUAL_MACHINE_H #define MULTIPASS_VIRTUAL_MACHINE_H -#include -#include - #include +#include #include +#include #include namespace multipass @@ -64,6 +63,8 @@ class VirtualMachine VirtualMachine::State state; const SSHKeyProvider& key_provider; const std::string vm_name; + std::condition_variable state_wait; + std::mutex state_mutex; protected: VirtualMachine(VirtualMachine::State state, const SSHKeyProvider& key_provider, const std::string& vm_name) @@ -72,9 +73,6 @@ class VirtualMachine : VirtualMachine(State::off, key_provider, vm_name){}; VirtualMachine(const VirtualMachine&) = delete; VirtualMachine& operator=(const VirtualMachine&) = delete; - - QWaitCondition state_wait; - QMutex state_mutex; }; } #endif // MULTIPASS_VIRTUAL_MACHINE_H diff --git a/src/platform/backends/libvirt/libvirt_virtual_machine.cpp b/src/platform/backends/libvirt/libvirt_virtual_machine.cpp index 903158c0988..5b393190a20 100644 --- a/src/platform/backends/libvirt/libvirt_virtual_machine.cpp +++ b/src/platform/backends/libvirt/libvirt_virtual_machine.cpp @@ -17,6 +17,7 @@ #include "libvirt_virtual_machine.h" +#include #include #include #include @@ -287,17 +288,26 @@ void mp::LibVirtVirtualMachine::stop() void mp::LibVirtVirtualMachine::shutdown() { + std::unique_lock lock{state_mutex}; if (state == State::running || state == State::delayed_shutdown) { virDomainShutdown(domain.get()); state = State::off; update_state(); } + else if (state == State::starting) + { + virDomainDestroy(domain.get()); + state = State::off; + update_state(); + state_wait.wait(lock); + } else if (state == State::suspended) { mpl::log(mpl::Level::info, vm_name, fmt::format("Ignoring shutdown issued while suspended")); } + lock.unlock(); monitor->on_shutdown(); } @@ -332,6 +342,16 @@ int mp::LibVirtVirtualMachine::ssh_port() return 22; } +void mp::LibVirtVirtualMachine::ensure_vm_is_running() +{ + std::lock_guard lock{state_mutex}; + if (domain_state_for(domain.get()) != VirtualMachine::State::running) + { + state_wait.notify_all(); + throw mp::StartException(vm_name, "Instance shutdown during start"); + } +} + std::string mp::LibVirtVirtualMachine::ssh_hostname() { auto action = [this] { @@ -378,7 +398,7 @@ std::string mp::LibVirtVirtualMachine::ipv6() void mp::LibVirtVirtualMachine::wait_until_ssh_up(std::chrono::milliseconds timeout) { - mp::utils::wait_until_ssh_up(this, timeout); + mp::utils::wait_until_ssh_up(this, timeout, std::bind(&LibVirtVirtualMachine::ensure_vm_is_running, this)); } void mp::LibVirtVirtualMachine::update_state() diff --git a/src/platform/backends/libvirt/libvirt_virtual_machine.h b/src/platform/backends/libvirt/libvirt_virtual_machine.h index f66b8d141c2..06dbad2dd08 100644 --- a/src/platform/backends/libvirt/libvirt_virtual_machine.h +++ b/src/platform/backends/libvirt/libvirt_virtual_machine.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018 Canonical, Ltd. + * Copyright (C) 2018-2019 Canonical, Ltd. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -53,6 +53,8 @@ class LibVirtVirtualMachine final : public VirtualMachine void update_state() override; private: + void ensure_vm_is_running(); + virConnectPtr connection; DomainUPtr domain; const std::string mac_addr; diff --git a/src/platform/backends/qemu/qemu_virtual_machine.cpp b/src/platform/backends/qemu/qemu_virtual_machine.cpp index 512594d487f..fa2631bb7c7 100644 --- a/src/platform/backends/qemu/qemu_virtual_machine.cpp +++ b/src/platform/backends/qemu/qemu_virtual_machine.cpp @@ -368,19 +368,17 @@ void mp::QemuVirtualMachine::on_error() void mp::QemuVirtualMachine::on_shutdown() { - state_mutex.lock(); - + std::unique_lock lock{state_mutex}; if (state == State::starting) { saved_error_msg = "shutdown called while starting"; - state_wait.wait(&state_mutex); + state_wait.wait(lock); } - state_mutex.unlock(); - state = State::off; ip = nullopt; update_state(); + lock.unlock(); monitor->on_shutdown(); } @@ -402,14 +400,12 @@ void mp::QemuVirtualMachine::on_restart() void mp::QemuVirtualMachine::ensure_vm_is_running() { - state_mutex.lock(); + std::lock_guard lock{state_mutex}; if (vm_process->state() == QProcess::NotRunning) { state_wait.notify_all(); - state_mutex.unlock(); throw mp::StartException(vm_name, saved_error_msg); } - state_mutex.unlock(); } std::string mp::QemuVirtualMachine::ssh_hostname() @@ -462,9 +458,7 @@ std::string mp::QemuVirtualMachine::ipv6() void mp::QemuVirtualMachine::wait_until_ssh_up(std::chrono::milliseconds timeout) { - auto process_vm_events = [this] { ensure_vm_is_running(); }; - - mp::utils::wait_until_ssh_up(this, timeout, process_vm_events); + mp::utils::wait_until_ssh_up(this, timeout, std::bind(&QemuVirtualMachine::ensure_vm_is_running, this)); if (delete_memory_snapshot) { diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index 3a42b944111..bcdc112df80 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -143,6 +143,7 @@ void mp::utils::wait_until_ssh_up(VirtualMachine* virtual_machine, std::chrono:: process_vm_events(); try { + std::lock_guardstate_mutex)> lock{virtual_machine->state_mutex}; mp::SSHSession session{virtual_machine->ssh_hostname(), virtual_machine->ssh_port()}; virtual_machine->state = VirtualMachine::State::running; virtual_machine->update_state(); @@ -154,6 +155,7 @@ void mp::utils::wait_until_ssh_up(VirtualMachine* virtual_machine, std::chrono:: } }; auto on_timeout = [virtual_machine] { + std::lock_guardstate_mutex)> lock{virtual_machine->state_mutex}; virtual_machine->state = VirtualMachine::State::unknown; virtual_machine->update_state(); throw std::runtime_error("timed out waiting for instance to respond"); From 718cdf939788b7a2b79c66df0839f17329bc02b9 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Wed, 27 Mar 2019 10:59:22 -0400 Subject: [PATCH 11/16] daemon: Pass state into persist_state_for() to avoid unnecessary callback into vm object --- include/multipass/vm_status_monitor.h | 4 +++- src/daemon/daemon.cpp | 5 ++--- src/daemon/daemon.h | 2 +- src/platform/backends/libvirt/libvirt_virtual_machine.cpp | 2 +- src/platform/backends/qemu/qemu_virtual_machine.cpp | 2 +- tests/mock_status_monitor.h | 2 +- tests/stub_status_monitor.h | 2 +- tests/test_libvirt_backend.cpp | 6 +++--- tests/test_qemu_backend.cpp | 8 ++++---- 9 files changed, 17 insertions(+), 16 deletions(-) diff --git a/include/multipass/vm_status_monitor.h b/include/multipass/vm_status_monitor.h index a0df313f92e..43948594ab4 100644 --- a/include/multipass/vm_status_monitor.h +++ b/include/multipass/vm_status_monitor.h @@ -18,6 +18,8 @@ #ifndef MULTIPASS_VM_STATUS_MONITOR_H #define MULTIPASS_VM_STATUS_MONITOR_H +#include + #include #include @@ -33,7 +35,7 @@ class VMStatusMonitor virtual void on_shutdown() = 0; virtual void on_suspend() = 0; virtual void on_restart(const std::string& name) = 0; - virtual void persist_state_for(const std::string& name) = 0; + virtual void persist_state_for(const std::string& name, const VirtualMachine::State& state) = 0; virtual void update_metadata_for(const std::string& name, const QJsonObject& metadata) = 0; virtual QJsonObject retrieve_metadata_for(const std::string& name) = 0; diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index b08578406e3..8e3904fa766 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -1697,10 +1697,9 @@ void mp::Daemon::on_restart(const std::string& name) std::vector{name}, nullptr)); } -void mp::Daemon::persist_state_for(const std::string& name) +void mp::Daemon::persist_state_for(const std::string& name, const VirtualMachine::State& state) { - auto& vm = vm_instances[name]; - vm_instance_specs[name].state = vm->current_state(); + vm_instance_specs[name].state = state; persist_instances(); } diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index f49258f5684..a8febf4d54f 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -78,7 +78,7 @@ class Daemon : public QObject, public multipass::VMStatusMonitor void on_shutdown() override; void on_suspend() override; void on_restart(const std::string& name) override; - void persist_state_for(const std::string& name) override; + void persist_state_for(const std::string& name, const VirtualMachine::State& state) override; void update_metadata_for(const std::string& name, const QJsonObject& metadata) override; QJsonObject retrieve_metadata_for(const std::string& name) override; diff --git a/src/platform/backends/libvirt/libvirt_virtual_machine.cpp b/src/platform/backends/libvirt/libvirt_virtual_machine.cpp index 5b393190a20..e041b702f88 100644 --- a/src/platform/backends/libvirt/libvirt_virtual_machine.cpp +++ b/src/platform/backends/libvirt/libvirt_virtual_machine.cpp @@ -403,5 +403,5 @@ void mp::LibVirtVirtualMachine::wait_until_ssh_up(std::chrono::milliseconds time void mp::LibVirtVirtualMachine::update_state() { - monitor->persist_state_for(vm_name); + monitor->persist_state_for(vm_name, state); } diff --git a/src/platform/backends/qemu/qemu_virtual_machine.cpp b/src/platform/backends/qemu/qemu_virtual_machine.cpp index fa2631bb7c7..e2f22fe2bda 100644 --- a/src/platform/backends/qemu/qemu_virtual_machine.cpp +++ b/src/platform/backends/qemu/qemu_virtual_machine.cpp @@ -350,7 +350,7 @@ int mp::QemuVirtualMachine::ssh_port() void mp::QemuVirtualMachine::update_state() { - monitor->persist_state_for(vm_name); + monitor->persist_state_for(vm_name, state); } void mp::QemuVirtualMachine::on_started() diff --git a/tests/mock_status_monitor.h b/tests/mock_status_monitor.h index cf29e5dcee7..479ad7b6d80 100644 --- a/tests/mock_status_monitor.h +++ b/tests/mock_status_monitor.h @@ -32,7 +32,7 @@ struct MockVMStatusMonitor : public VMStatusMonitor MOCK_METHOD0(on_shutdown, void()); MOCK_METHOD0(on_suspend, void()); MOCK_METHOD1(on_restart, void(const std::string&)); - MOCK_METHOD1(persist_state_for, void(const std::string&)); + MOCK_METHOD2(persist_state_for, void(const std::string&, const VirtualMachine::State&)); MOCK_METHOD2(update_metadata_for, void(const std::string&, const QJsonObject&)); MOCK_METHOD1(retrieve_metadata_for, QJsonObject(const std::string&)); }; diff --git a/tests/stub_status_monitor.h b/tests/stub_status_monitor.h index 5a582062d7d..f9b3290c1ec 100644 --- a/tests/stub_status_monitor.h +++ b/tests/stub_status_monitor.h @@ -31,7 +31,7 @@ struct StubVMStatusMonitor : public multipass::VMStatusMonitor void on_shutdown() override{}; void on_suspend() override{}; void on_restart(const std::string& name) override{}; - void persist_state_for(const std::string& name) override{}; + void persist_state_for(const std::string& name, const VirtualMachine::State& state) override{}; void update_metadata_for(const std::string& name, const QJsonObject& metadata) override{}; QJsonObject retrieve_metadata_for(const std::string& name) override { diff --git a/tests/test_libvirt_backend.cpp b/tests/test_libvirt_backend.cpp index 2380dbcae99..9a6e7d36506 100644 --- a/tests/test_libvirt_backend.cpp +++ b/tests/test_libvirt_backend.cpp @@ -193,7 +193,7 @@ TEST_F(LibVirtBackend, machine_persists_and_sets_state_on_start) NiceMock mock_monitor; auto machine = backend.create_virtual_machine(default_description, mock_monitor); - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); machine->start(); EXPECT_THAT(machine->current_state(), Eq(mp::VirtualMachine::State::starting)); @@ -224,7 +224,7 @@ TEST_F(LibVirtBackend, machine_persists_and_sets_state_on_shutdown) auto machine = backend.create_virtual_machine(default_description, mock_monitor); machine->state = mp::VirtualMachine::State::running; - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); machine->shutdown(); EXPECT_THAT(machine->current_state(), Eq(mp::VirtualMachine::State::off)); @@ -255,7 +255,7 @@ TEST_F(LibVirtBackend, machine_persists_and_sets_state_on_suspend) auto machine = backend.create_virtual_machine(default_description, mock_monitor); machine->state = mp::VirtualMachine::State::running; - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); machine->suspend(); EXPECT_THAT(machine->current_state(), Eq(mp::VirtualMachine::State::suspended)); diff --git a/tests/test_qemu_backend.cpp b/tests/test_qemu_backend.cpp index adc8504ba7b..710d31be655 100644 --- a/tests/test_qemu_backend.cpp +++ b/tests/test_qemu_backend.cpp @@ -68,13 +68,13 @@ TEST_F(QemuBackend, machine_start_shutdown_sends_monitoring_events) auto machine = backend.create_virtual_machine(default_description, mock_monitor); - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); EXPECT_CALL(mock_monitor, on_resume()); machine->start(); machine->state = mp::VirtualMachine::State::running; - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); EXPECT_CALL(mock_monitor, on_shutdown()); machine->shutdown(); } @@ -86,14 +86,14 @@ TEST_F(QemuBackend, machine_start_suspend_sends_monitoring_event) auto machine = backend.create_virtual_machine(default_description, mock_monitor); - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); EXPECT_CALL(mock_monitor, on_resume()); machine->start(); machine->state = mp::VirtualMachine::State::running; EXPECT_CALL(mock_monitor, on_suspend()); - EXPECT_CALL(mock_monitor, persist_state_for(_)); + EXPECT_CALL(mock_monitor, persist_state_for(_, _)); machine->suspend(); } From 52b313365c16db480e3a25efbe6d44776596c7e6 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Wed, 27 Mar 2019 11:34:35 -0400 Subject: [PATCH 12/16] daemon: Small fixes/changes based on review feedback --- src/daemon/daemon.cpp | 3 ++- src/daemon/daemon.h | 2 +- src/daemon/daemon_rpc.cpp | 2 -- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 8e3904fa766..91c5c806f13 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -1946,7 +1946,8 @@ void mp::Daemon::create_vm(const CreateRequest* request, grpc::ServerWriterWrite(reply); auto future_watcher = create_future_watcher(); - future_watcher->setFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh_for, std::ref(vm), status_promise)); + future_watcher->setFuture( + QtConcurrent::run(this, &Daemon::async_wait_for_ssh_for, std::ref(vm), status_promise)); } else { diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index a8febf4d54f..ddf9e13ff68 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -164,7 +164,6 @@ public slots: std::promise* status_promise); void finish_async_operation(QFuture async_future); QFutureWatcher* create_future_watcher(); - std::vector>> async_future_watchers; std::unique_ptr config; std::unordered_map vm_instance_specs; @@ -178,6 +177,7 @@ public slots: QTimer source_images_maintenance_task; MetricsProvider metrics_provider; MetricsOptInData metrics_opt_in; + std::vector>> async_future_watchers; }; } // namespace multipass #endif // MULTIPASS_DAEMON_H diff --git a/src/daemon/daemon_rpc.cpp b/src/daemon/daemon_rpc.cpp index 939f7467b33..5e120454138 100644 --- a/src/daemon/daemon_rpc.cpp +++ b/src/daemon/daemon_rpc.cpp @@ -92,8 +92,6 @@ grpc::Status emit_signal_and_wait_for_result(OperationSignal operation_signal) auto status_future = status_promise.get_future(); emit operation_signal(&status_promise); - status_future.wait(); - return status_future.get(); } } // namespace From e59f17874c0566a0a975caa8998d04d4204be894 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Fri, 29 Mar 2019 10:11:40 -0400 Subject: [PATCH 13/16] utils: Move lock in wait_until_ssh_up() to avoid deadlock --- src/utils/utils.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/utils/utils.cpp b/src/utils/utils.cpp index bcdc112df80..c90471d9a49 100644 --- a/src/utils/utils.cpp +++ b/src/utils/utils.cpp @@ -143,8 +143,9 @@ void mp::utils::wait_until_ssh_up(VirtualMachine* virtual_machine, std::chrono:: process_vm_events(); try { - std::lock_guardstate_mutex)> lock{virtual_machine->state_mutex}; mp::SSHSession session{virtual_machine->ssh_hostname(), virtual_machine->ssh_port()}; + + std::lock_guardstate_mutex)> lock{virtual_machine->state_mutex}; virtual_machine->state = VirtualMachine::State::running; virtual_machine->update_state(); return mp::utils::TimeoutAction::done; From 779e221643e82ed8ba1993fd6c232455bfae188f Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Thu, 4 Apr 2019 14:06:42 -0400 Subject: [PATCH 14/16] daemon: Simplify async by using just one commom method --- include/multipass/utils.h | 1 + src/daemon/daemon.cpp | 41 ++++++++++----------------------------- src/daemon/daemon.h | 5 +---- src/rpc/multipass.proto | 2 ++ 4 files changed, 14 insertions(+), 35 deletions(-) diff --git a/include/multipass/utils.h b/include/multipass/utils.h index 0bcca46cc70..e9cb969fb4e 100644 --- a/include/multipass/utils.h +++ b/include/multipass/utils.h @@ -62,6 +62,7 @@ std::vector split(const std::string& string, const std::string& del std::string generate_mac_address(); std::string timestamp(); bool is_running(const VirtualMachine::State& state); +// TODO: Rename process_vm_events to something more meaningful void wait_until_ssh_up(VirtualMachine* virtual_machine, std::chrono::milliseconds timeout, std::function const& process_vm_events = []() { }); diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 91c5c806f13..53892e4abff 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -1519,8 +1519,8 @@ try // clang-format on if (status.ok()) { auto future_watcher = create_future_watcher(); - future_watcher->setFuture( - QtConcurrent::run(this, &Daemon::async_wait_for_ssh_all, instances, status_promise)); + future_watcher->setFuture(QtConcurrent::run( + this, &Daemon::async_wait_for_ssh_and_start_mounts, server, instances, status_promise)); } } } @@ -1946,8 +1946,8 @@ void mp::Daemon::create_vm(const CreateRequest* request, grpc::ServerWriterWrite(reply); auto future_watcher = create_future_watcher(); - future_watcher->setFuture( - QtConcurrent::run(this, &Daemon::async_wait_for_ssh_for, std::ref(vm), status_promise)); + future_watcher->setFuture(QtConcurrent::run(this, &Daemon::async_wait_for_ssh_and_start_mounts, + server, std::vector{name}, status_promise)); } else { @@ -2077,8 +2077,7 @@ QFutureWatcher* mp::Daemon::create_future_watc return future_watcher; } -mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_for(const VirtualMachine::UPtr& vm, - std::promise* status_promise) +grpc::Status mp::Daemon::async_wait_for_ssh_for(const VirtualMachine::UPtr& vm) { try { @@ -2086,30 +2085,10 @@ mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_for(const Virtua } catch (const std::exception& e) { - return {grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""), status_promise}; + return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, e.what(), ""); } - return {grpc::Status::OK, status_promise}; -} - -mp::Daemon::AsyncOperationStatus mp::Daemon::async_wait_for_ssh_all(const std::vector& vms, - std::promise* status_promise) -{ - fmt::memory_buffer errors; - for (const auto& name : vms) - { - auto it = vm_instances.find(name); - auto& vm = it->second; - - auto ssh_status = async_wait_for_ssh_for(vm, nullptr); - if (!ssh_status.status.ok()) - { - fmt::format_to(errors, "Error starting '{}': {}", name, ssh_status.status.error_message()); - continue; - } - } - - return {grpc_status_for(errors), status_promise}; + return grpc::Status::OK; } template @@ -2124,10 +2103,10 @@ mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* serve auto& vm = it->second; auto& mounts = vm_instance_specs[name].mounts; - auto ssh_status = async_wait_for_ssh_for(vm, nullptr); - if (!ssh_status.status.ok()) + auto status = async_wait_for_ssh_for(vm); + if (!status.ok()) { - fmt::format_to(errors, "Error starting '{}': {}", name, ssh_status.status.error_message()); + fmt::format_to(errors, "Error starting '{}': {}", name, status.error_message()); continue; } diff --git a/src/daemon/daemon.h b/src/daemon/daemon.h index ddf9e13ff68..ccc0d28701b 100644 --- a/src/daemon/daemon.h +++ b/src/daemon/daemon.h @@ -154,10 +154,7 @@ public slots: std::promise* status_promise; }; - AsyncOperationStatus async_wait_for_ssh_for(const VirtualMachine::UPtr& vm, - std::promise* status_promise); - AsyncOperationStatus async_wait_for_ssh_all(const std::vector& vms, - std::promise* status_promise); + grpc::Status async_wait_for_ssh_for(const VirtualMachine::UPtr& vm); template AsyncOperationStatus async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* server, const std::vector& vms, diff --git a/src/rpc/multipass.proto b/src/rpc/multipass.proto index 2d8620ba730..41ea4e45a6e 100644 --- a/src/rpc/multipass.proto +++ b/src/rpc/multipass.proto @@ -112,6 +112,7 @@ message LaunchReply { MetricsShowInfo metrics_show_info = 5; string log_line = 6; UpdateInfo update_info = 7; + string reply_message = 8; } message PurgeRequest { @@ -318,6 +319,7 @@ message RestartRequest { message RestartReply { string log_line = 1; + string reply_message = 2; } message DeleteRequest { From 8dc849913004066b9c09c87e1f0f24fbadb916e1 Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Fri, 5 Apr 2019 09:45:05 -0400 Subject: [PATCH 15/16] backends: Use cond var wait predicate to handle spurious wakes --- src/platform/backends/libvirt/libvirt_virtual_machine.cpp | 6 ++++-- src/platform/backends/qemu/qemu_virtual_machine.cpp | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/platform/backends/libvirt/libvirt_virtual_machine.cpp b/src/platform/backends/libvirt/libvirt_virtual_machine.cpp index e041b702f88..aec7889c4e0 100644 --- a/src/platform/backends/libvirt/libvirt_virtual_machine.cpp +++ b/src/platform/backends/libvirt/libvirt_virtual_machine.cpp @@ -298,9 +298,8 @@ void mp::LibVirtVirtualMachine::shutdown() else if (state == State::starting) { virDomainDestroy(domain.get()); - state = State::off; + state_wait.wait(lock, [this] { return state == State::off; }); update_state(); - state_wait.wait(lock); } else if (state == State::suspended) { @@ -347,6 +346,9 @@ void mp::LibVirtVirtualMachine::ensure_vm_is_running() std::lock_guard lock{state_mutex}; if (domain_state_for(domain.get()) != VirtualMachine::State::running) { + // Have to set 'off' here so there is an actual state change to compare to for + // the cond var's predicate + state = State::off; state_wait.notify_all(); throw mp::StartException(vm_name, "Instance shutdown during start"); } diff --git a/src/platform/backends/qemu/qemu_virtual_machine.cpp b/src/platform/backends/qemu/qemu_virtual_machine.cpp index e2f22fe2bda..365f9da9bfe 100644 --- a/src/platform/backends/qemu/qemu_virtual_machine.cpp +++ b/src/platform/backends/qemu/qemu_virtual_machine.cpp @@ -372,7 +372,7 @@ void mp::QemuVirtualMachine::on_shutdown() if (state == State::starting) { saved_error_msg = "shutdown called while starting"; - state_wait.wait(lock); + state_wait.wait(lock, [this] { return state == State::off; }); } state = State::off; @@ -403,6 +403,9 @@ void mp::QemuVirtualMachine::ensure_vm_is_running() std::lock_guard lock{state_mutex}; if (vm_process->state() == QProcess::NotRunning) { + // Have to set 'off' here so there is an actual state change to compare to for + // the cond var's predicate + state = State::off; state_wait.notify_all(); throw mp::StartException(vm_name, saved_error_msg); } From 25b22610afdad6544aa8c4e8facbfa6e56ba58bc Mon Sep 17 00:00:00 2001 From: Chris Townsend Date: Fri, 5 Apr 2019 12:09:51 -0400 Subject: [PATCH 16/16] daemon/async: Change where the check for update is done for 'start' --- src/daemon/daemon.cpp | 17 ++++++++++------- src/rpc/multipass.proto | 1 + 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 53892e4abff..b50413a4716 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -1406,13 +1406,6 @@ try // clang-format on it->second->start(); } - if (config->update_prompt->is_time_to_show()) - { - StartReply start_reply; - config->update_prompt->populate(start_reply.mutable_update_info()); - server->Write(start_reply); - } - auto future_watcher = create_future_watcher(); future_watcher->setFuture( QtConcurrent::run(this, &Daemon::async_wait_for_ssh_and_start_mounts, server, vms, status_promise)); @@ -2150,6 +2143,16 @@ mp::Daemon::async_wait_for_ssh_and_start_mounts(grpc::ServerWriter* serve } } + if (server && std::is_same::value) + { + if (config->update_prompt->is_time_to_show()) + { + Reply reply; + config->update_prompt->populate(reply.mutable_update_info()); + server->Write(reply); + } + } + return {grpc_status_for(errors), status_promise}; } diff --git a/src/rpc/multipass.proto b/src/rpc/multipass.proto index 41ea4e45a6e..5c955aba2d0 100644 --- a/src/rpc/multipass.proto +++ b/src/rpc/multipass.proto @@ -320,6 +320,7 @@ message RestartRequest { message RestartReply { string log_line = 1; string reply_message = 2; + UpdateInfo update_info = 3; } message DeleteRequest {