From 7ca931eac8af3801aedf0c956abde415bd566d45 Mon Sep 17 00:00:00 2001 From: John Demme Date: Tue, 11 Jun 2024 23:07:22 +0000 Subject: [PATCH 1/4] [ESI Runtime] Rip out all Cap'n Proto RPC stuff After spending a truely obnoxious amount of time fighting capnp and libkj, we made the decision to switch to another RPC system. We're no longer modeling and serializing message types in Capnp and we don't need the performance which capnp/libkj RPC promises, so there's really no need for the additional complexity. A slower system which is thread safe should work fine. This commit breaks the build in a pretty horrible way and is not intended to be merged on its own. It simply breaks up the diff. --- lib/Dialect/ESI/runtime/CMakeLists.txt | 50 +--- lib/Dialect/ESI/runtime/cosim/CMakeLists.txt | 35 --- lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp | 60 ---- .../cosim/include/cosim/CapnpThreads.h | 121 -------- .../runtime/cosim/include/cosim/Endpoint.h | 149 ---------- .../runtime/cosim/include/cosim/LowLevel.h | 41 --- .../ESI/runtime/cosim/lib/CapnpThreads.cpp | 58 ---- lib/Dialect/ESI/runtime/cosim/lib/Client.cpp | 160 ---------- .../ESI/runtime/cosim/lib/Endpoint.cpp | 70 ----- lib/Dialect/ESI/runtime/cosim/lib/Server.cpp | 279 ------------------ 10 files changed, 5 insertions(+), 1018 deletions(-) delete mode 100644 lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp delete mode 100644 lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h delete mode 100644 lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h delete mode 100644 lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h delete mode 100644 lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp delete mode 100644 lib/Dialect/ESI/runtime/cosim/lib/Client.cpp delete mode 100644 lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp delete mode 100644 lib/Dialect/ESI/runtime/cosim/lib/Server.cpp diff --git a/lib/Dialect/ESI/runtime/CMakeLists.txt b/lib/Dialect/ESI/runtime/CMakeLists.txt index 123a0d7aecc9..66308e2a68bd 100644 --- a/lib/Dialect/ESI/runtime/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/CMakeLists.txt @@ -98,45 +98,6 @@ if(ESI_COSIM) # backends should only need to be linked in. # TODO: Once the hack in the python bindings is remidied, remove this. add_compile_definitions(ESI_COSIM) - - # Try to find Cap'nProto. If the user has set CAPNP_PATH, use that. - if(DEFINED CAPNP_PATH) - set(ENV{PKG_CONFIG_PATH} - "${CAPNP_PATH}/lib/pkgconfig:$ENV{PKG_CONFIG_PATH}") - find_package(CapnProto CONFIG PATHS ${CAPNP_PATH}) - else() - set(ENV{PKG_CONFIG_PATH} - "${CMAKE_CURRENT_SOURCE_DIR}/ext/lib/pkgconfig:$ENV{PKG_CONFIG_PATH}") - find_package(CapnProto CONFIG PATHS "${CMAKE_CURRENT_SOURCE_DIR}/ext") - endif() - - # If Cap'nProto has been found, generate the headers and definitions. - if(CapnProto_FOUND) - message("-- ESI cosim enabled") - - message(STATUS "Found Cap'nProto at ${CapnProto_DIR}.") - add_subdirectory(cosim) - - set(ESIRuntimeSources - ${ESIRuntimeSources} - ${CMAKE_CURRENT_SOURCE_DIR}/cpp/lib/backends/Cosim.cpp - ) - set(ESIRuntimeBackendHeaders - ${ESIRuntimeBackendHeaders} - ${CMAKE_CURRENT_SOURCE_DIR}/cpp/include/esi/backends/Cosim.h - ) - set(ESIRuntimeLinkLibraries - ${ESIRuntimeLinkLibraries} - EsiCosimCapnp - ) - set(ESIRuntimeIncludeDirs - ${ESIRuntimeIncludeDirs} - ${CMAKE_CURRENT_SOURCE_DIR}/cosim/include - ) - else() - message(FATAL_ERROR "ESI cosimulation requires Cap'nProto. Either install - Cap'nProto or disable ESI cosim with -DESI_COSIM=OFF.") - endif() else() message("-- ESI cosim disabled") endif() @@ -198,12 +159,11 @@ install(IMPORTED_RUNTIME_ARTIFACTS ESIRuntime DESTINATION lib COMPONENT ESIRuntime ) -install(RUNTIME_DEPENDENCY_SET ESIRuntime_RUNTIME_DEPS - DESTINATION lib - PRE_EXCLUDE_REGEXES .* - PRE_INCLUDE_REGEXES capnp kj - COMPONENT ESIRuntime -) +# install(RUNTIME_DEPENDENCY_SET ESIRuntime_RUNTIME_DEPS +# DESTINATION lib +# PRE_EXCLUDE_REGEXES .* +# COMPONENT ESIRuntime +# ) install(FILES ${ESIRuntimeHeaders} DESTINATION include/esi COMPONENT ESIRuntime-dev diff --git a/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt b/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt index 5c55946ca82f..c7a178bae603 100644 --- a/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt @@ -2,41 +2,6 @@ ## ##===----------------------------------------------------------------------===// -# Compile Capnp file. -add_definitions(${CAPNP_DEFINITIONS}) -set(CAPNPC_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}/../cpp/include/backends) -include_directories(${CAPNPC_OUTPUT_DIR}) -file(MAKE_DIRECTORY ${CAPNPC_OUTPUT_DIR}) -capnp_generate_cpp( - COSIM_CAPNP_SRCS COSIM_CANPN_HDRS - "CosimDpi.capnp" -) - -# Compile a library for ESI cosim capnp for both the API runtime backend and the -# cosim DPI server to use. -add_library(EsiCosimCapnp SHARED - ${COSIM_CAPNP_HDRS} - ${COSIM_CAPNP_SRCS} - ${COSIM_SCHEMA_HDR} - - lib/CapnpThreads.cpp - lib/Client.cpp - lib/Endpoint.cpp - lib/Server.cpp -) -target_include_directories(EsiCosimCapnp PUBLIC ${CAPNPC_OUTPUT_DIR}) -target_include_directories(EsiCosimCapnp PUBLIC ${CAPNP_INCLUDE_DIRS}) -target_include_directories(EsiCosimCapnp PUBLIC - ${CMAKE_CURRENT_SOURCE_DIR}/../cpp/include) -target_link_libraries(EsiCosimCapnp PUBLIC - CapnProto::kj CapnProto::kj-async CapnProto::kj-gzip - CapnProto::capnp CapnProto::capnp-rpc -) -install(TARGETS EsiCosimCapnp - DESTINATION lib - COMPONENT ESIRuntime -) - set(cosim_collateral Cosim_DpiPkg.sv Cosim_Endpoint.sv diff --git a/lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp b/lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp deleted file mode 100644 index 9f61dca05be5..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/CosimDpi.capnp +++ /dev/null @@ -1,60 +0,0 @@ -##===- CosimDpi.capnp - ESI cosim RPC schema ------------------*- CAPNP -*-===// -## -## Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -## See https://llvm.org/LICENSE.txt for license information. -## SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -## -##===----------------------------------------------------------------------===// -## -## The ESI cosimulation RPC Cap'nProto schema. Documentation is in -## docs/ESI/cosim.md. TL;DR: Run the simulation, then connect to its RPC server -## with a client generated by the Cap'nProto implementation for your language of -## choice! (https://capnproto.org/otherlang.html) -## -##===----------------------------------------------------------------------===// - -@0x9fd65fec6e2d2779; - -# The primary interface exposed by an ESI cosim simulation. -interface CosimDpiServer @0xe3d7f70c7065c46a { - # List all the registered endpoints. - list @0 () -> (ifaces :List(EsiDpiInterfaceDesc)); - # Open one of them. Specify both the send and recv data types if want type - # safety and your language supports it. - open @1 (iface :EsiDpiInterfaceDesc) -> (endpoint :EsiDpiEndpoint); - - # Get the zlib-compressed JSON system manifest. - getCompressedManifest @2 () -> (version :Int32, compressedManifest :Data); - - # Create a low level interface into the simulation. - openLowLevel @3 () -> (lowLevel :EsiLowLevel); -} - -# Description of a registered endpoint. -struct EsiDpiInterfaceDesc @0xd2584d2506f01c8c { - # Capn'Proto ID of the struct type being sent _to_ the simulator. - fromHostType @0 :Text; - # Capn'Proto ID of the struct type being sent _from_ the simulator. - toHostType @1 :Text; - # Numerical identifier of the endpoint. Defined in the design. - endpointID @2 :Text; -} - -# Interactions with an open endpoint. Optionally typed. -interface EsiDpiEndpoint @0xfb0a36bf859be47b { - # Send a message to the endpoint. - sendFromHost @0 (msg :Data); - # Recieve a message from the endpoint. Non-blocking. - recvToHost @1 () -> (hasData :Bool, resp :Data); - # Close the connect to this endpoint. - close @2 (); -} - -# A low level interface simply provides MMIO and host memory access. In all -# cases, hardware errors become exceptions. -interface EsiLowLevel @0xae716100ef82f6d6 { - # Write to an MMIO register. - writeMMIO @0 (address :UInt32, data :UInt32) -> (); - # Read from an MMIO register. - readMMIO @1 (address :UInt32) -> (data :UInt32); -} diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h deleted file mode 100644 index 736a90443d2c..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/CapnpThreads.h +++ /dev/null @@ -1,121 +0,0 @@ -//===- CapnpThreads.h - ESI cosim RPC ---------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Various classes used to implement the RPC server classes generated by -// CapnProto. Capnp C++ RPC servers are based on 'libkj' and its asynchrony -// model, which is very foreign. This is what the 'kj' namespace is along with -// alternate collections and other utility code. -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_SERVER_H -#define COSIM_SERVER_H - -#include "cosim/Endpoint.h" -#include "cosim/LowLevel.h" - -#include -#include - -namespace kj { -class WaitScope; -} // namespace kj - -namespace esi { -namespace cosim { - -/// Since Capnp is not thread-safe, client and server must be run in their own -/// threads and communicate with the outside world through thread safe channels. -class CapnpCosimThread { -public: - CapnpCosimThread(); - ~CapnpCosimThread(); - - /// Stop the thread. This is a blocking call -- it will not return until the - /// capnp thread has stopped. - void stop(); - - // Get an endpoint by its ID. - Endpoint *getEndpoint(std::string epId); - // Get the low level bridge. - LowLevel *getLowLevel() { return &lowLevelBridge; } - - // Get the ESI version and compressed manifest. Returns false if the manifest - // has yet to be loaded. - bool getCompressedManifest(unsigned int &esiVersion, - std::vector &manifest) { - esiVersion = this->esiVersion; - manifest = compressedManifest; - return this->esiVersion >= 0; - } - -protected: - /// Start capnp polling loop. Does not return until stop() is called. Must be - /// called in the same thread the RPC server/client was created. 'poll' is - /// called on each iteration of the loop. - void loop(kj::WaitScope &waitScope, std::function poll); - - using Lock = std::lock_guard; - - EndpointRegistry endpoints; - LowLevel lowLevelBridge; - - std::thread *myThread; - volatile bool stopSig; - std::mutex m; - - unsigned int esiVersion = -1; - std::vector compressedManifest; -}; - -/// The main RpcServer. Does not implement any capnp RPC interfaces but contains -/// the capnp main RPC server. We run the capnp server in its own thread to be -/// more responsive to network traffic and so as to not slow down the -/// simulation. -class RpcServer : public CapnpCosimThread { -public: - /// Start and stop the server thread. - void run(uint16_t port); - - void setManifest(unsigned int esiVersion, - const std::vector &manifest) { - this->esiVersion = esiVersion; - compressedManifest = manifest; - } - - bool registerEndpoint(std::string epId, std::string fromHostTypeId, - std::string toHostTypeId) { - return endpoints.registerEndpoint(epId, fromHostTypeId, toHostTypeId); - } - -private: - /// The thread's main loop function. Exits on shutdown. - void mainLoop(uint16_t port); -}; - -/// The Capnp RpcClient. -class RpcClient : public CapnpCosimThread { - // To hide the ugly details of the capnp headers. - struct Impl; - friend struct Impl; - -public: - /// Start client thread. - void run(std::string host, uint16_t port); - -private: - void mainLoop(std::string host, uint16_t port); - - /// The 'capnp' sets this to true when it is ready to go. - std::atomic started; -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h deleted file mode 100644 index 21d36b87a464..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/Endpoint.h +++ /dev/null @@ -1,149 +0,0 @@ -//===- Endpoint.h - Cosim endpoint server -----------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Declare the class which is used to model DPI endpoints. -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_ENDPOINT_H -#define COSIM_ENDPOINT_H - -#include "esi/Common.h" - -#include -#include -#include -#include -#include -#include - -namespace esi { -namespace cosim { - -/// Implements a bi-directional, thread-safe bridge between the RPC server and -/// DPI functions. -/// -/// Several of the methods below are inline with the declaration to make them -/// candidates for inlining during compilation. This is particularly important -/// on the simulation side since polling happens at each clock and we do not -/// want to slow down the simulation any more than necessary. -class Endpoint { -public: - /// Representing messages as shared pointers to vectors may be a performance - /// issue in the future but it is the easiest way to ensure memory - /// correctness. - using MessageDataPtr = std::unique_ptr; - - /// Construct an endpoint which knows and the type IDs in both directions. - Endpoint(std::string fromHostTypeId, std::string toHostTypeId); - ~Endpoint(); - /// Disallow copying. There is only ONE endpoint object per logical endpoint - /// so copying is almost always a bug. - Endpoint(const Endpoint &) = delete; - - std::string getSendTypeId() const { return fromHostTypeId; } - std::string getRecvTypeId() const { return toHostTypeId; } - - /// These two are used to set and unset the inUse flag, to ensure that an open - /// endpoint is not opened again. - bool setInUse(); - void returnForUse(); - - /// Queue message to the simulation. - void pushMessageToSim(MessageDataPtr msg) { - Lock g(m); - toCosim.push(std::move(msg)); - } - - /// Pop from the to-simulator queue. Return true if there was a message in the - /// queue. - bool getMessageToSim(MessageDataPtr &msg) { - Lock g(m); - if (toCosim.empty()) - return false; - msg = std::move(toCosim.front()); - toCosim.pop(); - return true; - } - - /// Queue message to the RPC client. - void pushMessageToClient(MessageDataPtr msg) { - Lock g(m); - toClient.push(std::move(msg)); - } - - /// Pop from the to-RPC-client queue. Return true if there was a message in - /// the queue. - bool getMessageToClient(MessageDataPtr &msg) { - Lock g(m); - if (toClient.empty()) - return false; - msg = std::move(toClient.front()); - toClient.pop(); - return true; - } - -private: - const std::string fromHostTypeId; - const std::string toHostTypeId; - bool inUse; - - using Lock = std::lock_guard; - - /// This class needs to be thread-safe. All of the mutable member variables - /// are protected with this object-wide lock. This may be a performance issue - /// in the future. - std::mutex m; - /// Message queue from RPC client to the simulation. - std::queue toCosim; - /// Message queue to RPC client from the simulation. - std::queue toClient; -}; - -/// The Endpoint registry is where Endpoints report their existence (register) -/// and they are looked up by RPC clients. -class EndpointRegistry { -public: - /// Register an Endpoint. Creates the Endpoint object and owns it. Returns - /// false if unsuccessful. - bool registerEndpoint(std::string epId, std::string fromHostTypeId, - std::string toHostTypeId); - - /// Get the specified endpoint. Return nullptr if it does not exist. This - /// method is defined inline so it can be inlined at compile time. Performance - /// is important here since this method is used in the polling call from the - /// simulator. Returns nullptr if the endpoint cannot be found. - Endpoint *operator[](const std::string &epId) { - Lock g(m); - auto it = endpoints.find(epId); - if (it == endpoints.end()) - return nullptr; - return &it->second; - } - - /// Iterate over the list of endpoints, calling the provided function for each - /// endpoint. - void iterateEndpoints( - const std::function &f) const; - /// Return the number of endpoints. - size_t size() const; - -private: - using Lock = std::lock_guard; - - /// This object needs to be thread-safe. An object-wide mutex is sufficient. - std::mutex m; - - /// Endpoint ID to object pointer mapping. - std::map endpoints; -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h deleted file mode 100644 index 390651fee666..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/LowLevel.h +++ /dev/null @@ -1,41 +0,0 @@ -//===- LowLevel.h - Cosim low level implementation --------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_LOWLEVEL_H -#define COSIM_LOWLEVEL_H - -#include - -#include "cosim/Utils.h" - -namespace esi { -namespace cosim { - -// Implements a bi-directional, thread-safe bridge between the RPC server and -// DPI functions for low level functionality. -class LowLevel { -public: - LowLevel() = default; - ~LowLevel() = default; - /// Disallow copying. There is only ONE low level object per RPC server, so - /// copying is almost always a bug. - LowLevel(const LowLevel &) = delete; - - TSQueue readReqs; - TSQueue> readResps; - std::atomic readsOutstanding = 0; - - TSQueue> writeReqs; - TSQueue writeResps; - std::atomic writesOutstanding = 0; -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp b/lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp deleted file mode 100644 index ac4a40d5eeb4..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/CapnpThreads.cpp +++ /dev/null @@ -1,58 +0,0 @@ -//===- CapnpThreads.cpp - Cosim RPC common code -----------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#include "cosim/CapnpThreads.h" -#include "CosimDpi.capnp.h" -#include -#include -#ifdef _WIN32 -#include -#else -#include -#endif - -using namespace capnp; -using namespace esi::cosim; - -CapnpCosimThread::CapnpCosimThread() : myThread(nullptr), stopSig(false) {} -CapnpCosimThread::~CapnpCosimThread() { stop(); } - -void CapnpCosimThread::loop(kj::WaitScope &waitScope, - std::function poll) { - // OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The - // problem is that I can't figure out how read the stop signal from libkj - // asyncrony land. - // - // IIRC the main libkj wait loop uses `select()` (or something similar on - // Windows) on its FDs. As a result, any code which checks the stop variable - // doesn't run until there is some I/O. Probably the right way is to set up a - // pipe to deliver a shutdown signal. - // - // TODO: Figure out how to do this properly, if possible. - while (!stopSig) { - waitScope.poll(); - poll(); - waitScope.poll(); - std::this_thread::sleep_for(std::chrono::microseconds(100)); - } -} - -/// Signal the RPC server thread to stop. Wait for it to exit. -void CapnpCosimThread::stop() { - Lock g(m); - if (myThread == nullptr) { - fprintf(stderr, "CapnpCosimThread not Run()\n"); - } else if (!stopSig) { - stopSig = true; - myThread->join(); - } -} - -Endpoint *CapnpCosimThread::getEndpoint(std::string epId) { - return endpoints[epId]; -} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/Client.cpp b/lib/Dialect/ESI/runtime/cosim/lib/Client.cpp deleted file mode 100644 index 27f3ffd4d20d..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/Client.cpp +++ /dev/null @@ -1,160 +0,0 @@ -//===- Client.cpp - Cosim RPC client ----------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#include "CosimDpi.capnp.h" -#include "cosim/CapnpThreads.h" -#include - -#include -#include -#ifdef _WIN32 -#include -#else -#include -#endif - -using namespace capnp; -using namespace esi::cosim; - -/// Internal implementation to hide all the capnp details. -struct esi::cosim::RpcClient::Impl { - - Impl(RpcClient &client, capnp::EzRpcClient &rpcClient) - : client(client), waitScope(rpcClient.getWaitScope()), cosim(nullptr), - lowLevel(nullptr) { - // Get the main interface. - cosim = rpcClient.getMain(); - - // Grab a reference to the low level interface. - auto llReq = cosim.openLowLevelRequest(); - auto llPromise = llReq.send(); - lowLevel = llPromise.wait(waitScope).getLowLevel(); - - // Get the ESI version and compressed manifest. - auto maniResp = cosim.getCompressedManifestRequest().send().wait(waitScope); - capnp::Data::Reader data = maniResp.getCompressedManifest(); - client.esiVersion = maniResp.getVersion(); - client.compressedManifest = std::vector(data.begin(), data.end()); - - // Iterate through the endpoints and register them. - auto capnpEndpointsResp = cosim.listRequest().send().wait(waitScope); - for (const auto &capnpEndpoint : capnpEndpointsResp.getIfaces()) { - assert(capnpEndpoint.hasEndpointID() && - "Response did not contain endpoint ID not found!"); - std::string fromHostType, toHostType; - if (capnpEndpoint.hasFromHostType()) - fromHostType = capnpEndpoint.getFromHostType(); - if (capnpEndpoint.hasToHostType()) - toHostType = capnpEndpoint.getToHostType(); - bool rc = client.endpoints.registerEndpoint(capnpEndpoint.getEndpointID(), - fromHostType, toHostType); - assert(rc && "Endpoint ID already exists!"); - Endpoint *ep = client.endpoints[capnpEndpoint.getEndpointID()]; - // TODO: delay opening until client calls connect(). - auto openReq = cosim.openRequest(); - openReq.setIface(capnpEndpoint); - EsiDpiEndpoint::Client dpiEp = - openReq.send().wait(waitScope).getEndpoint(); - endpointMap.emplace(ep, dpiEp); - } - } - - RpcClient &client; - kj::WaitScope &waitScope; - CosimDpiServer::Client cosim; - EsiLowLevel::Client lowLevel; - std::map endpointMap; - - /// Called from the event loop periodically. - // TODO: try to reduce work in here. Ideally, eliminate polling altogether - // though I can't figure out how with libkj's event loop. - void pollInternal(); -}; - -void esi::cosim::RpcClient::Impl::pollInternal() { - // Iterate through the endpoints checking for messages. - for (auto &[ep, capnpEp] : endpointMap) { - // Process writes to the simulation. - Endpoint::MessageDataPtr msg; - if (!ep->getSendTypeId().empty() && ep->getMessageToSim(msg)) { - auto req = capnpEp.sendFromHostRequest(); - req.setMsg(capnp::Data::Reader(msg->getBytes(), msg->getSize())); - req.send().detach([](kj::Exception &&e) -> void { - throw std::runtime_error("Error sending message to simulation: " + - std::string(e.getDescription().cStr())); - }); - } - - // Process reads from the simulation. - // TODO: polling for a response is horribly slow and inefficient. Rework - // the capnp protocol to avoid it. - if (!ep->getRecvTypeId().empty()) { - auto resp = capnpEp.recvToHostRequest().send().wait(waitScope); - if (resp.getHasData()) { - auto data = resp.getResp(); - ep->pushMessageToClient( - std::make_unique(data.begin(), data.size())); - } - } - } - - // Process MMIO read requests. - if (auto readReq = client.lowLevelBridge.readReqs.pop()) { - auto req = lowLevel.readMMIORequest(); - req.setAddress(*readReq); - auto respPromise = req.send(); - respPromise - .then([&](auto resp) -> void { - client.lowLevelBridge.readResps.push( - std::make_pair(resp.getData(), 0)); - }) - .detach([&](kj::Exception &&e) -> void { - client.lowLevelBridge.readResps.push(std::make_pair(0, 1)); - }); - } - - // Process MMIO write requests. - if (auto writeReq = client.lowLevelBridge.writeReqs.pop()) { - auto req = lowLevel.writeMMIORequest(); - req.setAddress(writeReq->first); - req.setData(writeReq->second); - req.send() - .then([&](auto resp) -> void { - client.lowLevelBridge.writeResps.push(0); - }) - .detach([&](kj::Exception &&e) -> void { - client.lowLevelBridge.writeResps.push(1); - }); - } -} - -void RpcClient::mainLoop(std::string host, uint16_t port) { - capnp::EzRpcClient rpcClient(host, port); - kj::WaitScope &waitScope = rpcClient.getWaitScope(); - Impl impl(*this, rpcClient); - - // Signal that we're good to go. - started.store(true); - - // Start the event loop. Does not return until stop() is called. - loop(waitScope, [&]() { impl.pollInternal(); }); -} - -/// Start the client if not already started. -void RpcClient::run(std::string host, uint16_t port) { - Lock g(m); - if (myThread == nullptr) { - started.store(false); - myThread = new std::thread(&RpcClient::mainLoop, this, host, port); - // Spin until the capnp thread is started and ready to go. - while (!started.load()) - std::this_thread::sleep_for(std::chrono::microseconds(10)); - } else { - fprintf(stderr, "Warning: cannot Run() RPC client more than once!"); - } -} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp b/lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp deleted file mode 100644 index cd1c871c52b6..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/Endpoint.cpp +++ /dev/null @@ -1,70 +0,0 @@ -//===- EndPoint.cpp - Definitions for EndPointRegistry ----------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Definitions for Cosim EndPoint and EndPointRegistry. -// -//===----------------------------------------------------------------------===// - -#include "cosim/Endpoint.h" - -using namespace esi::cosim; - -Endpoint::Endpoint(std::string fromHostTypeId, std::string toHostTypeId) - : fromHostTypeId(fromHostTypeId), toHostTypeId(toHostTypeId), inUse(false) { -} -Endpoint::~Endpoint() {} - -bool Endpoint::setInUse() { - Lock g(m); - if (inUse) - return false; - inUse = true; - return true; -} - -void Endpoint::returnForUse() { - Lock g(m); - if (!inUse) - fprintf(stderr, "Warning: Returning an endpoint which was not in use.\n"); - inUse = false; -} - -bool EndpointRegistry::registerEndpoint(std::string epId, - std::string fromHostTypeId, - std::string toHostTypeId) { - Lock g(m); - if (endpoints.find(epId) != endpoints.end()) { - fprintf(stderr, "Endpoint ID already exists!\n"); - return false; - } - // The following ugliness adds an Endpoint to the map of Endpoints. The - // Endpoint class has its copy constructor deleted, thus the metaprogramming. - endpoints.emplace(std::piecewise_construct, - // Map key. - std::forward_as_tuple(epId), - // Endpoint constructor args. - std::forward_as_tuple(fromHostTypeId, toHostTypeId)); - return true; -} - -void EndpointRegistry::iterateEndpoints( - const std::function &f) const { - // This function is logically const, but modification is needed to obtain a - // lock. - Lock g(const_cast(this)->m); - for (const auto &ep : endpoints) { - f(ep.first, ep.second); - } -} - -size_t EndpointRegistry::size() const { - // This function is logically const, but modification is needed to obtain a - // lock. - Lock g(const_cast(this)->m); - return endpoints.size(); -} diff --git a/lib/Dialect/ESI/runtime/cosim/lib/Server.cpp b/lib/Dialect/ESI/runtime/cosim/lib/Server.cpp deleted file mode 100644 index 57c34204c7c8..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/lib/Server.cpp +++ /dev/null @@ -1,279 +0,0 @@ -//===- Server.cpp - Cosim RPC server ----------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// -// -// Definitions for the RPC server class. Capnp C++ RPC servers are based on -// 'libkj' and its asyncrony model plus the capnp C++ API, both of which feel -// very foreign. In general, both RPC arguments and returns are passed as a C++ -// object. In order to return data, the capnp message must be constructed inside -// that object. -// -// A [capnp encoded message](https://capnproto.org/encoding.html) can have -// multiple 'segments', which is a pain to deal with. (See comments below.) -// -//===----------------------------------------------------------------------===// - -#include "CosimDpi.capnp.h" -#include "cosim/CapnpThreads.h" -#include -#include -#ifdef _WIN32 -#include -#else -#include -#endif - -using namespace capnp; -using namespace esi::cosim; - -namespace { -/// Implements the `EsiDpiEndpoint` interface from the RPC schema. Mostly a -/// wrapper around an `Endpoint` object. Whereas the `Endpoint`s are long-lived -/// (associated with the HW endpoint), this class is constructed/destructed -/// when the client open()s it. -class EndpointServer final : public EsiDpiEndpoint::Server { - /// The wrapped endpoint. - Endpoint &endpoint; - /// Signals that this endpoint has been opened by a client and hasn't been - /// closed by said client. - bool open; - -public: - EndpointServer(Endpoint &ep); - /// Release the Endpoint should the client disconnect without properly closing - /// it. - ~EndpointServer(); - /// Disallow copying as the 'open' variable needs to track the endpoint. - EndpointServer(const EndpointServer &) = delete; - - /// Implement the EsiDpiEndpoint RPC interface. - kj::Promise sendFromHost(SendFromHostContext) override; - kj::Promise recvToHost(RecvToHostContext) override; - kj::Promise close(CloseContext) override; -}; - -/// Implement the low level cosim RPC protocol. -class LowLevelServer final : public EsiLowLevel::Server { - // Queues to and from the simulation. - LowLevel &bridge; - - // Functions which poll for responses without blocking the main loop. Polling - // ain't great, but it's the only way (AFAICT) to do inter-thread - // communication between a libkj concurrent thread and other threads. There is - // a non-polling way to do it by setting up a queue over a OS-level pipe - // (since the libkj event loop uses 'select'). - kj::Promise pollReadResp(ReadMMIOContext context); - kj::Promise pollWriteResp(WriteMMIOContext context); - -public: - LowLevelServer(LowLevel &bridge); - /// Release the Endpoint should the client disconnect without properly closing - /// it. - ~LowLevelServer(); - /// Disallow copying as the 'open' variable needs to track the endpoint. - LowLevelServer(const LowLevelServer &) = delete; - - // Implement the protocol methods. - kj::Promise readMMIO(ReadMMIOContext) override; - kj::Promise writeMMIO(WriteMMIOContext) override; -}; - -/// Implements the `CosimDpiServer` interface from the RPC schema. -class CosimServer final : public CosimDpiServer::Server { - /// The registry of endpoints. The RpcServer class owns this. - EndpointRegistry ® - LowLevel &lowLevelBridge; - const unsigned int &esiVersion; - const std::vector &compressedManifest; - -public: - CosimServer(EndpointRegistry ®, LowLevel &lowLevelBridge, - const unsigned int &esiVersion, - const std::vector &compressedManifest); - - /// List all the registered interfaces. - kj::Promise list(ListContext ctxt) override; - /// Open a specific interface, locking it in the process. - kj::Promise open(OpenContext ctxt) override; - - kj::Promise - getCompressedManifest(GetCompressedManifestContext) override; - - kj::Promise openLowLevel(OpenLowLevelContext ctxt) override; -}; -} // anonymous namespace - -/// ------ EndpointServer definitions. - -EndpointServer::EndpointServer(Endpoint &ep) : endpoint(ep), open(true) {} -EndpointServer::~EndpointServer() { - if (open) - endpoint.returnForUse(); -} - -/// This is the client polling for a message. If one is available, send it. -/// TODO: implement a blocking call with a timeout. -kj::Promise EndpointServer::recvToHost(RecvToHostContext context) { - KJ_REQUIRE(open, "EndPoint closed already"); - - // Try to pop a message. - Endpoint::MessageDataPtr blob; - auto msgPresent = endpoint.getMessageToClient(blob); - context.getResults().setHasData(msgPresent); - if (msgPresent) { - Data::Builder data(const_cast(blob->getBytes()), blob->getSize()); - context.getResults().setResp(data.asReader()); - } - return kj::READY_NOW; -} - -/// 'Send' is from the client perspective, so this is a message we are -/// recieving. The only way I could figure out to copy the raw message is a -/// double copy. I was have issues getting libkj's arrays to play nice with -/// others. -kj::Promise EndpointServer::sendFromHost(SendFromHostContext context) { - KJ_REQUIRE(open, "EndPoint closed already"); - KJ_REQUIRE(context.getParams().hasMsg(), "Send request must have a message."); - kj::ArrayPtr data = context.getParams().getMsg().asBytes(); - Endpoint::MessageDataPtr blob = std::make_unique( - (const uint8_t *)data.begin(), data.size()); - endpoint.pushMessageToSim(std::move(blob)); - return kj::READY_NOW; -} - -kj::Promise EndpointServer::close(CloseContext context) { - KJ_REQUIRE(open, "EndPoint closed already"); - open = false; - endpoint.returnForUse(); - return kj::READY_NOW; -} - -/// ------ LowLevelServer definitions. - -LowLevelServer::LowLevelServer(LowLevel &bridge) : bridge(bridge) {} -LowLevelServer::~LowLevelServer() {} - -kj::Promise LowLevelServer::pollReadResp(ReadMMIOContext context) { - auto respMaybe = bridge.readResps.pop(); - if (!respMaybe.has_value()) { - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollReadResp(context); }); - } - auto resp = respMaybe.value(); - KJ_REQUIRE(resp.second == 0, "Read MMIO register encountered an error"); - context.getResults().setData(resp.first); - return kj::READY_NOW; -} - -kj::Promise LowLevelServer::readMMIO(ReadMMIOContext context) { - bridge.readReqs.push(context.getParams().getAddress()); - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollReadResp(context); }); -} - -kj::Promise LowLevelServer::pollWriteResp(WriteMMIOContext context) { - auto respMaybe = bridge.writeResps.pop(); - if (!respMaybe.has_value()) { - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollWriteResp(context); }); - } - auto resp = respMaybe.value(); - KJ_REQUIRE(resp == 0, "write MMIO register encountered an error"); - return kj::READY_NOW; -} - -kj::Promise LowLevelServer::writeMMIO(WriteMMIOContext context) { - bridge.writeReqs.push(context.getParams().getAddress(), - context.getParams().getData()); - return kj::evalLast( - [this, KJ_CPCAP(context)]() mutable { return pollWriteResp(context); }); -} - -/// ----- CosimServer definitions. - -CosimServer::CosimServer(EndpointRegistry ®, LowLevel &lowLevelBridge, - const unsigned int &esiVersion, - const std::vector &compressedManifest) - : reg(reg), lowLevelBridge(lowLevelBridge), esiVersion(esiVersion), - compressedManifest(compressedManifest) { - printf("version: %d\n", esiVersion); -} - -kj::Promise CosimServer::list(ListContext context) { - auto ifaces = context.getResults().initIfaces((unsigned int)reg.size()); - unsigned int ctr = 0u; - reg.iterateEndpoints([&](std::string id, const Endpoint &ep) { - ifaces[ctr].setEndpointID(id); - ifaces[ctr].setFromHostType(ep.getSendTypeId()); - ifaces[ctr].setToHostType(ep.getRecvTypeId()); - ++ctr; - }); - return kj::READY_NOW; -} - -kj::Promise CosimServer::open(OpenContext ctxt) { - Endpoint *ep = reg[ctxt.getParams().getIface().getEndpointID()]; - KJ_REQUIRE(ep != nullptr, "Could not find endpoint"); - - auto gotLock = ep->setInUse(); - KJ_REQUIRE(gotLock, "Endpoint in use"); - - ctxt.getResults().setEndpoint( - EsiDpiEndpoint::Client(kj::heap(*ep))); - return kj::READY_NOW; -} - -kj::Promise -CosimServer::getCompressedManifest(GetCompressedManifestContext ctxt) { - ctxt.getResults().setVersion(esiVersion); - ctxt.getResults().setCompressedManifest( - Data::Reader(compressedManifest.data(), compressedManifest.size())); - return kj::READY_NOW; -} - -kj::Promise CosimServer::openLowLevel(OpenLowLevelContext ctxt) { - ctxt.getResults().setLowLevel(kj::heap(lowLevelBridge)); - return kj::READY_NOW; -} - -/// ----- RpcServer definitions. - -/// Write the port number to a file. Necessary when we allow 'EzRpcServer' to -/// select its own port. We can't use stdout/stderr because the flushing -/// semantics are undefined (as in `flush()` doesn't work on all simulators). -static void writePort(uint16_t port) { - // "cosim.cfg" since we may want to include other info in the future. - FILE *fd = fopen("cosim.cfg", "w"); - fprintf(fd, "port: %u\n", (unsigned int)port); - fclose(fd); -} - -void RpcServer::mainLoop(uint16_t port) { - capnp::EzRpcServer rpcServer(kj::heap(endpoints, lowLevelBridge, - esiVersion, - compressedManifest), - /* bindAddress */ "*", port); - auto &waitScope = rpcServer.getWaitScope(); - // If port is 0, ExRpcSever selects one and we have to wait to get the port. - if (port == 0) { - auto portPromise = rpcServer.getPort(); - port = portPromise.wait(waitScope); - } - writePort(port); - printf("[COSIM] Listening on port: %u\n", (unsigned int)port); - loop(waitScope, []() {}); -} - -/// Start the server if not already started. -void RpcServer::run(uint16_t port) { - Lock g(m); - if (myThread == nullptr) { - myThread = new std::thread(&RpcServer::mainLoop, this, port); - } else { - fprintf(stderr, "Warning: cannot Run() RPC server more than once!"); - } -} From 39cbd72f7154e766a204364b171f9239fda5adaa Mon Sep 17 00:00:00 2001 From: John Demme Date: Wed, 12 Jun 2024 23:31:18 +0000 Subject: [PATCH 2/4] [ESI Runtime] Replace Cap'nProto with gRPC MUCH simpler --- frontends/PyCDE/integration_test/esitester.py | 2 +- .../Dialect/ESI/runtime/basic_mmio.sv | 1 + lib/Dialect/ESI/runtime/CMakeLists.txt | 27 ++ lib/Dialect/ESI/runtime/cosim/CMakeLists.txt | 30 +- lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv | 2 +- lib/Dialect/ESI/runtime/cosim/cosim.proto | 62 ++++ .../cosim/cosim_dpi_server/CMakeLists.txt | 11 +- .../cosim/cosim_dpi_server/DpiEntryPoints.cpp | 156 +++++---- .../ESI/runtime/cosim/cosim_dpi_server/dpi.h | 2 +- .../ESI/runtime/cosim/include/cosim/Utils.h | 50 --- .../cosim/include/esi/cosim/RpcServer.h | 47 +++ .../ESI/runtime/cosim/lib/RpcServer.cpp | 309 ++++++++++++++++++ .../ESI/runtime/cpp/include/esi/Utils.h | 58 ++++ .../runtime/cpp/include/esi/backends/Cosim.h | 16 +- .../ESI/runtime/cpp/lib/backends/Cosim.cpp | 287 ++++++++++------ .../ESI/runtime/cpp/tools/esitester.cpp | 2 + utils/get-capnp.sh | 42 --- utils/get-grpc.sh | 36 ++ 18 files changed, 870 insertions(+), 270 deletions(-) create mode 100644 lib/Dialect/ESI/runtime/cosim/cosim.proto delete mode 100644 lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h create mode 100644 lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h create mode 100644 lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp delete mode 100755 utils/get-capnp.sh create mode 100755 utils/get-grpc.sh diff --git a/frontends/PyCDE/integration_test/esitester.py b/frontends/PyCDE/integration_test/esitester.py index 95fea9cc1ffe..c7bce36538bb 100644 --- a/frontends/PyCDE/integration_test/esitester.py +++ b/frontends/PyCDE/integration_test/esitester.py @@ -17,7 +17,7 @@ # RUN: rm -rf %t # RUN: mkdir %t && cd %t # RUN: %PYTHON% %s %t 2>&1 -# RUN: esi-cosim.py -- esitester cosim env | FileCheck %s +# RUN: esi-cosim.py --source %t -- esitester cosim env wait | FileCheck %s import pycde from pycde import AppID, Clock, Module, Reset, generator diff --git a/integration_test/Dialect/ESI/runtime/basic_mmio.sv b/integration_test/Dialect/ESI/runtime/basic_mmio.sv index 51e0e0f8e4d1..91456b678e7b 100644 --- a/integration_test/Dialect/ESI/runtime/basic_mmio.sv +++ b/integration_test/Dialect/ESI/runtime/basic_mmio.sv @@ -1,4 +1,5 @@ // REQUIRES: esi-cosim, esi-runtime, rtl-sim +// XFAIL: * // RUN: rm -rf %t && mkdir %t && cp %s %t // RUN: esi-cosim.py --source %t --top top -- %python %s.py cosim env diff --git a/lib/Dialect/ESI/runtime/CMakeLists.txt b/lib/Dialect/ESI/runtime/CMakeLists.txt index 66308e2a68bd..4cdf9bc9249b 100644 --- a/lib/Dialect/ESI/runtime/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/CMakeLists.txt @@ -94,10 +94,37 @@ ENDIF(MSVC) option(ESI_COSIM "Enable ESI cosimulation." ON) if(ESI_COSIM) + # gRPC for cosimulation. Local install required. + option(GRPC_PATH "Location of gRPC install.") + if (${GRPC_PATH}) + find_package(Protobuf REQUIRED CONFIG HINTS ${GRPC_PATH}) + find_package(gRPC REQUIRED CONFIG HINTS ${GRPC_PATH}) + else() + find_package(Protobuf REQUIRED CONFIG) + find_package(gRPC REQUIRED CONFIG) + endif() + + add_subdirectory(cosim) # Inform the runtime code that Cosimulation is enabled. Kinda hacky since all # backends should only need to be linked in. # TODO: Once the hack in the python bindings is remidied, remove this. add_compile_definitions(ESI_COSIM) + set(ESIRuntimeSources + ${ESIRuntimeSources} + ${CMAKE_CURRENT_SOURCE_DIR}/cpp/lib/backends/Cosim.cpp + ) + set(ESIRuntimeBackendHeaders + ${ESIRuntimeBackendHeaders} + ${CMAKE_CURRENT_SOURCE_DIR}/cpp/include/esi/backends/Cosim.h + ) + set(ESIRuntimeLinkLibraries + ${ESIRuntimeLinkLibraries} + EsiCosimGRPC + ) + set(ESIRuntimeIncludeDirs + ${ESIRuntimeIncludeDirs} + ${CMAKE_CURRENT_SOURCE_DIR}/cosim/include + ) else() message("-- ESI cosim disabled") endif() diff --git a/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt b/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt index c7a178bae603..0572f350a5d4 100644 --- a/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/cosim/CMakeLists.txt @@ -38,13 +38,31 @@ install(FILES COMPONENT ESIRuntime ) -# Cap'nProto MUST be built with exceptions enabled. -if (MSVC) - target_compile_options(EsiCosimCapnp PRIVATE /EHsc) -else() - target_compile_options(EsiCosimCapnp PRIVATE -fexceptions) -endif() +add_library(EsiCosimGRPC OBJECT "${CMAKE_CURRENT_LIST_DIR}/cosim.proto") +target_link_libraries(EsiCosimGRPC PUBLIC protobuf::libprotobuf gRPC::grpc++) +set(PROTO_BINARY_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") +target_include_directories(EsiCosimGRPC PUBLIC "$") + +protobuf_generate( + TARGET EsiCosimGRPC + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}") +protobuf_generate( + TARGET EsiCosimGRPC + LANGUAGE grpc + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc + PLUGIN "protoc-gen-grpc=\$" + PROTOC_OUT_DIR "${PROTO_BINARY_DIR}") include_directories("${CMAKE_CURRENT_SOURCE_DIR}/include") + +add_library(CosimRpcServer + lib/RpcServer.cpp +) +target_link_libraries(CosimRpcServer + PUBLIC + EsiCosimGRPC + ESIRuntime +) + add_subdirectory(cosim_dpi_server) add_subdirectory(MtiPliStub) diff --git a/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv b/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv index e352afb781b7..0612ab50a9c9 100644 --- a/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv +++ b/lib/Dialect/ESI/runtime/cosim/Cosim_DpiPkg.sv @@ -78,7 +78,7 @@ import "DPI-C" sv2cCosimserverEpTryGet = import "DPI-C" sv2cCosimserverSetManifest = function void cosim_set_manifest( - input int unsigned esi_version, + input int signed esi_version, input byte unsigned compressed_manifest[] ); diff --git a/lib/Dialect/ESI/runtime/cosim/cosim.proto b/lib/Dialect/ESI/runtime/cosim/cosim.proto new file mode 100644 index 000000000000..1f14a9163df3 --- /dev/null +++ b/lib/Dialect/ESI/runtime/cosim/cosim.proto @@ -0,0 +1,62 @@ +//===- DpiEntryPoints.cpp - ESI cosim DPI calls -----------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// Cosim DPI function implementations. Mostly C-C++ gaskets to the C++ +// RpcServer. +// +// These function signatures were generated by an HW simulator (see dpi.h) so +// we don't change them to be more rational here. The resulting code gets +// dynamically linked in and I'm concerned about maintaining binary +// compatibility with all the simulators. +// +//===----------------------------------------------------------------------===// + +syntax = "proto3"; + +package esi.cosim; + +message ChannelDesc { + string name = 1; + + enum Direction { + TO_SERVER = 0; + TO_CLIENT = 1; + } + Direction dir = 2; + + string type = 3; +} + +message ListOfChannels { repeated ChannelDesc channels = 1; } + +message VoidMessage {} + +message Manifest { + int32 esi_version = 1; + bytes compressed_manifest = 2; +} + +message Message { bytes data = 1; } +message AddressedMessage { + string channel_name = 1; + Message message = 2; +} + +// message MessageStream { +// oneof PayloadOrHeader { +// string channel_name = 1; +// Message message = 2; +// } +// } + +service ChannelServer { + rpc GetManifest(VoidMessage) returns (Manifest) {} + rpc ListChannels(VoidMessage) returns (ListOfChannels) {} + rpc SendToServer(AddressedMessage) returns (VoidMessage) {} + rpc ConnectToClientChannel(ChannelDesc) returns (stream Message) {} +} diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt index 562f05fec9fc..46a7d7bc16b1 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt @@ -13,8 +13,15 @@ set_target_properties(EsiCosimDpiServer RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib CXX_VISIBILITY_PRESET "default" ) -add_dependencies(EsiCosimDpiServer EsiCosimCapnp MtiPli) -target_link_libraries(EsiCosimDpiServer PRIVATE EsiCosimCapnp MtiPli) +add_dependencies(EsiCosimDpiServer ESIRuntime MtiPli) +target_link_libraries(EsiCosimDpiServer + PRIVATE + EsiCosimCapnp + ESIRuntime + CosimRpcServer + MtiPli +) + install(TARGETS EsiCosimDpiServer DESTINATION lib COMPONENT ESIRuntime diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp index c7170228142e..9e1c017508bc 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp @@ -16,32 +16,33 @@ // //===----------------------------------------------------------------------===// -#include "cosim/CapnpThreads.h" #include "dpi.h" +#include "esi/Ports.h" +#include "esi/cosim/RpcServer.h" #include #include #include +using namespace esi; using namespace esi::cosim; /// If non-null, log to this file. Protected by 'serverMutex`. static FILE *logFile; -static RpcServer *server = nullptr; +static std::unique_ptr server = nullptr; static std::mutex serverMutex; // ---- Helper functions ---- /// Emit the contents of 'msg' to the log file in hex. -static void log(char *epId, bool toClient, - const Endpoint::MessageDataPtr &msg) { +static void log(char *epId, bool toClient, const MessageData &msg) { std::lock_guard g(serverMutex); if (!logFile) return; fprintf(logFile, "[ep: %50s to: %4s]", epId, toClient ? "host" : "sim"); - size_t msgSize = msg->getSize(); - auto bytes = msg->getBytes(); + size_t msgSize = msg.getSize(); + auto bytes = msg.getBytes(); for (size_t i = 0; i < msgSize; ++i) { auto b = bytes[i]; // Separate 32-bit words. @@ -61,7 +62,8 @@ static void log(char *epId, bool toClient, static int findPort() { const char *portEnv = getenv("COSIM_PORT"); if (portEnv == nullptr) { - printf("[COSIM] RPC server port not found. Letting CapnpRPC select one\n"); + printf( + "[COSIM] RPC server port not found. Letting RPC server select one\n"); return 0; } printf("[COSIM] Opening RPC server on port %s\n", portEnv); @@ -101,17 +103,39 @@ static int validateSvOpenArray(const svOpenArrayHandle data, // ---- Traditional cosim DPI entry points ---- +std::map readPorts; +std::map writePorts; + // Register simulated device endpoints. // - return 0 on success, non-zero on failure (duplicate EP registered). -DPI int sv2cCosimserverEpRegister(char *endpointId, char *fromHostTypeId, - int fromHostTypeSize, char *toHostTypeId, +DPI int sv2cCosimserverEpRegister(char *endpointId, char *fromHostTypeIdC, + int fromHostTypeSize, char *toHostTypeIdC, int toHostTypeSize) { // Ensure the server has been constructed. sv2cCosimserverInit(); - // Then register with it. - if (server->registerEndpoint(endpointId, fromHostTypeId, toHostTypeId)) - return 0; - return -1; + std::string fromHostTypeId(fromHostTypeIdC), toHostTypeId(toHostTypeIdC); + + // Both only one type allowed. + if (!fromHostTypeId.empty() && !toHostTypeId.empty()) { + printf("ERROR: Only one of fromHostTypeId and toHostTypeId can be set!\n"); + return -1; + } + if (fromHostTypeId.empty() && toHostTypeId.empty()) { + printf("ERROR: One of fromHostTypeId and toHostTypeId must be set!\n"); + return -2; + } + if (readPorts.contains(endpointId)) { + printf("ERROR: Endpoint already registered!\n"); + return -3; + } + + if (!fromHostTypeId.empty()) + readPorts.emplace(endpointId, + server->registerReadPort(endpointId, fromHostTypeId)); + else + writePorts.emplace(endpointId, + server->registerWritePort(endpointId, toHostTypeId)); + return 0; } // Attempt to recieve data from a client. @@ -127,25 +151,25 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, if (server == nullptr) return -1; - Endpoint *ep = server->getEndpoint(endpointId); - if (!ep) { + auto portF = readPorts.find(endpointId); + if (portF == readPorts.end()) { fprintf(stderr, "Endpoint not found in registry!\n"); return -4; } - Endpoint::MessageDataPtr msg; + ReadChannelPort &port = portF->second; + MessageData msg; // Poll for a message. - if (!ep->getMessageToSim(msg)) { + if (!port.read(msg)) { // No message. *dataSize = 0; return 0; } + log(endpointId, false, msg); + // Do the validation only if there's a message available. Since the // simulator is going to poll up to every tick and there's not going to be // a message most of the time, this is important for performance. - - log(endpointId, false, msg); - if (validateSvOpenArray(data, sizeof(int8_t)) != 0) { printf("ERROR: DPI-func=%s line=%d event=invalid-sv-array\n", __func__, __LINE__); @@ -161,7 +185,7 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, return -3; } // Verify it'll fit. - size_t msgSize = msg->getSize(); + size_t msgSize = msg.getSize(); if (msgSize > *dataSize) { printf("ERROR: Message size too big to fit in HW buffer\n"); return -5; @@ -169,7 +193,7 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, // Copy the message data. size_t i; - auto bytes = msg->getBytes(); + auto bytes = msg.getBytes(); for (i = 0; i < msgSize; ++i) { auto b = bytes[i]; *(char *)svGetArrElemPtr1(data, i) = b; @@ -179,7 +203,7 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, *(char *)svGetArrElemPtr1(data, i) = 0; } // Set the output data size. - *dataSize = msg->getSize(); + *dataSize = msg.getSize(); return 0; } @@ -213,16 +237,17 @@ DPI int sv2cCosimserverEpTryPut(char *endpointId, for (int i = 0; i < dataSize; ++i) { dataVec[i] = *(char *)svGetArrElemPtr1(data, i); } - Endpoint::MessageDataPtr blob = std::make_unique(dataVec); + auto blob = std::make_unique(dataVec); // Queue the blob. - Endpoint *ep = server->getEndpoint(endpointId); - if (!ep) { + auto portF = writePorts.find(endpointId); + if (portF == writePorts.end()) { fprintf(stderr, "Endpoint not found in registry!\n"); return -4; } - log(endpointId, true, blob); - ep->pushMessageToClient(std::move(blob)); + log(endpointId, true, *blob); + WriteChannelPort &port = portF->second; + port.write(*blob); return 0; } @@ -254,7 +279,7 @@ DPI int sv2cCosimserverInit() { // Find the port and run. printf("[cosim] Starting RPC server.\n"); - server = new RpcServer(); + server = std::make_unique(); server->run(findPort()); } return 0; @@ -263,7 +288,7 @@ DPI int sv2cCosimserverInit() { // ---- Manifest DPI entry points ---- DPI void -sv2cCosimserverSetManifest(unsigned int esiVersion, +sv2cCosimserverSetManifest(int esiVersion, const svOpenArrayHandle compressedManifest) { if (server == nullptr) sv2cCosimserverInit(); @@ -299,47 +324,50 @@ DPI int sv2cCosimserverMMIORegister() { } DPI int sv2cCosimserverMMIOReadTryGet(uint32_t *address) { - assert(server); - LowLevel *ll = server->getLowLevel(); - std::optional reqAddress = ll->readReqs.pop(); - if (!reqAddress.has_value()) - return -1; - *address = reqAddress.value(); - ll->readsOutstanding++; - return 0; + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // std::optional reqAddress = ll->readReqs.pop(); + // if (!reqAddress.has_value()) + return -1; + // *address = reqAddress.value(); + // ll->readsOutstanding++; + // return 0; } DPI void sv2cCosimserverMMIOReadRespond(uint32_t data, char error) { - assert(server); - LowLevel *ll = server->getLowLevel(); - if (ll->readsOutstanding == 0) { - printf("ERROR: More read responses than requests! Not queuing response.\n"); - return; - } - ll->readsOutstanding--; - ll->readResps.push(data, error); + assert(false && "unimplemented"); + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // if (ll->readsOutstanding == 0) { + // printf("ERROR: More read responses than requests! Not queuing + // response.\n"); return; + // } + // ll->readsOutstanding--; + // ll->readResps.push(data, error); } DPI void sv2cCosimserverMMIOWriteRespond(char error) { - assert(server); - LowLevel *ll = server->getLowLevel(); - if (ll->writesOutstanding == 0) { - printf( - "ERROR: More write responses than requests! Not queuing response.\n"); - return; - } - ll->writesOutstanding--; - ll->writeResps.push(error); + assert(false && "unimplemented"); + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // if (ll->writesOutstanding == 0) { + // printf( + // "ERROR: More write responses than requests! Not queuing + // response.\n"); + // return; + // } + // ll->writesOutstanding--; + // ll->writeResps.push(error); } DPI int sv2cCosimserverMMIOWriteTryGet(uint32_t *address, uint32_t *data) { - assert(server); - LowLevel *ll = server->getLowLevel(); - auto req = ll->writeReqs.pop(); - if (!req.has_value()) - return -1; - *address = req.value().first; - *data = req.value().second; - ll->writesOutstanding++; - return 0; + // assert(server); + // LowLevel *ll = server->getLowLevel(); + // auto req = ll->writeReqs.pop(); + // if (!req.has_value()) + return -1; + // *address = req.value().first; + // *data = req.value().second; + // ll->writesOutstanding++; + // return 0; } diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h index 9ebf4146c35b..2713746527d6 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/dpi.h @@ -47,7 +47,7 @@ DPI int sv2cCosimserverInit(); DPI void sv2cCosimserverFinish(); /// Set the system zlib-compressed manifest. -DPI void sv2cCosimserverSetManifest(unsigned int esiVersion, +DPI void sv2cCosimserverSetManifest(int esiVersion, const svOpenArrayHandle compressedManifest); /// Register an MMIO module. Just checks that there is only one instantiated. diff --git a/lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h b/lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h deleted file mode 100644 index 4ef3c84d5539..000000000000 --- a/lib/Dialect/ESI/runtime/cosim/include/cosim/Utils.h +++ /dev/null @@ -1,50 +0,0 @@ -//===- Utils.h - utility code for cosim -------------------------*- C++ -*-===// -// -// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -// See https://llvm.org/LICENSE.txt for license information. -// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -// -//===----------------------------------------------------------------------===// - -#ifndef COSIM_UTILS_H -#define COSIM_UTILS_H - -#include -#include -#include - -namespace esi { -namespace cosim { - -/// Thread safe queue. Just wraps std::queue protected with a lock. -template -class TSQueue { - using Lock = std::lock_guard; - - std::mutex m; - std::queue q; - -public: - /// Push onto the queue. - template - void push(E... t) { - Lock l(m); - q.emplace(t...); - } - - /// Pop something off the queue but return nullopt if the queue is empty. Why - /// doesn't std::queue have anything like this? - std::optional pop() { - Lock l(m); - if (q.size() == 0) - return std::nullopt; - auto t = q.front(); - q.pop(); - return t; - } -}; - -} // namespace cosim -} // namespace esi - -#endif diff --git a/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h new file mode 100644 index 000000000000..017f739b679b --- /dev/null +++ b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h @@ -0,0 +1,47 @@ +//===- Server.h - Run a cosim server ----------------------------*- C++ -*-===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// +// +// Setup and run a server accepting connections via the 'cosim' RPC protocol. +// Then, one can request ports to and from the clients. +// +// Abstract this out to support multi-party communication in the future. +// +//===----------------------------------------------------------------------===// + +#ifndef ESI_COSIM_RPCSERVER_H +#define ESI_COSIM_RPCSERVER_H + +#include "esi/Ports.h" + +namespace esi { +namespace cosim { + +class RpcServer { +public: + ~RpcServer(); + + void setManifest(int esiVersion, std::vector compressedManifest); + + ReadChannelPort ®isterReadPort(const std::string &name, + const std::string &type); + WriteChannelPort ®isterWritePort(const std::string &name, + const std::string &type); + + void stop(); + void run(int port); + + class Impl; + +private: + Impl *impl; +}; + +} // namespace cosim +} // namespace esi + +#endif // ESI_COSIM_RPCSERVER_H diff --git a/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp new file mode 100644 index 000000000000..d93984247cd5 --- /dev/null +++ b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp @@ -0,0 +1,309 @@ +//===- RpcServer.cpp - Run a cosim server ---------------------------------===// +// +// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +// See https://llvm.org/LICENSE.txt for license information. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// +//===----------------------------------------------------------------------===// + +#include "esi/cosim/RpcServer.h" +#include "esi/Utils.h" + +#include "cosim.grpc.pb.h" + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace esi; +using namespace esi::cosim; + +using grpc::CallbackServerContext; +using grpc::Server; +using grpc::ServerBuilder; +using grpc::Status; + +/// Write the port number to a file. Necessary when we allow 'EzRpcServer' to +/// select its own port. We can't use stdout/stderr because the flushing +/// semantics are undefined (as in `flush()` doesn't work on all simulators). +static void writePort(uint16_t port) { + // "cosim.cfg" since we may want to include other info in the future. + FILE *fd = fopen("cosim.cfg", "w"); + fprintf(fd, "port: %u\n", (unsigned int)port); + fclose(fd); +} + +namespace { +class RpcServerReadPort; +class RpcServerWritePort; +} // namespace + +class esi::cosim::RpcServer::Impl + : public esi::cosim::ChannelServer::CallbackService { +public: + Impl(int port); + ~Impl(); + + //===--------------------------------------------------------------------===// + // Internal API + //===--------------------------------------------------------------------===// + + void setManifest(int esiVersion, std::vector compressedManifest) { + this->compressedManifest = std::move(compressedManifest); + this->esiVersion = esiVersion; + } + + ReadChannelPort ®isterReadPort(const std::string &name, + const std::string &type); + WriteChannelPort ®isterWritePort(const std::string &name, + const std::string &type); + + void stop(); + + //===--------------------------------------------------------------------===// + // RPC API implementations. + //===--------------------------------------------------------------------===// + + grpc::ServerUnaryReactor *GetManifest(CallbackServerContext *context, + const VoidMessage *, + Manifest *response) override { + printf("GetManifest\n"); + fflush(stdout); + response->set_esi_version(esiVersion); + response->set_compressed_manifest(compressedManifest.data(), + compressedManifest.size()); + auto reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; + } + + grpc::ServerUnaryReactor *ListChannels(CallbackServerContext *, + const VoidMessage *, + ListOfChannels *channelsOut) override; + + grpc::ServerWriteReactor * + ConnectToClientChannel(CallbackServerContext *context, + const ChannelDesc *request) override; + grpc::ServerUnaryReactor * + SendToServer(CallbackServerContext *context, + const esi::cosim::AddressedMessage *request, + esi::cosim::VoidMessage *response) override; + +private: + int esiVersion; + std::vector compressedManifest; + std::map readPorts; + std::map writePorts; + + std::unique_ptr server; +}; +using Impl = esi::cosim::RpcServer::Impl; + +RpcServer::~RpcServer() { + if (impl) + delete impl; +} + +void RpcServer::setManifest(int esiVersion, + std::vector compressedManifest) { + impl->setManifest(esiVersion, std::move(compressedManifest)); +} +ReadChannelPort &RpcServer::registerReadPort(const std::string &name, + const std::string &type) { + return impl->registerReadPort(name, type); +} +WriteChannelPort &RpcServer::registerWritePort(const std::string &name, + const std::string &type) { + return impl->registerWritePort(name, type); +} +void RpcServer::run(int port) { impl = new Impl(port); } +void RpcServer::stop() { + assert(impl && "Server not running"); + impl->stop(); +} + +namespace { +class RpcServerReadPort : public ReadChannelPort { +public: + RpcServerReadPort(Type *type) : ReadChannelPort(type) {} + + bool read(MessageData &data) override { + std::optional msg = readQueue.pop(); + if (!msg) + return false; + data = std::move(*msg); + return true; + } + void gotMessage(MessageData &data) { readQueue.push(std::move(data)); } + +private: + utils::TSQueue readQueue; +}; + +class RpcServerWritePort : public WriteChannelPort { +public: + RpcServerWritePort(Type *type) : WriteChannelPort(type) {} + + void write(const MessageData &data) override { + writeQueue.push(data); + printf("pushed message\n"); + } + + utils::TSQueue writeQueue; +}; + +class RpcServerWriteReactor + : public grpc::ServerWriteReactor { +public: + RpcServerWriteReactor(RpcServerWritePort *writePort) + : writePort(writePort), sentSuccessfully(false), shutdown(false) { + myThread = std::thread(&RpcServerWriteReactor::threadLoop, this); + } + void OnDone() override { delete this; } + void OnWriteDone(bool ok) override { + printf("on write done\n"); + std::scoped_lock lock(msgMutex); + sentSuccessfully = ok; + sentSuccessfullyCV.notify_one(); + } + void OnCancel() override { + printf("on cancel\n"); + std::scoped_lock lock(msgMutex); + sentSuccessfully = false; + sentSuccessfullyCV.notify_one(); + } + + void threadLoop(); + + RpcServerWritePort *writePort; + std::thread myThread; + + std::mutex msgMutex; + esi::cosim::Message msg; + std::condition_variable sentSuccessfullyCV; + std::atomic sentSuccessfully; + std::atomic shutdown; +}; + +} // namespace + +Impl::Impl(int port) : esiVersion(-1) { + ServerBuilder builder; + std::string server_address("127.0.0.1:" + std::to_string(port)); + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), + &port); + builder.RegisterService(this); + server = builder.BuildAndStart(); + if (!server) + throw std::runtime_error("Failed to start server on " + server_address); + writePort(port); + std::cout << "Server listening on 127.0.0.1:" << port << std::endl; +} + +Impl::~Impl() { + for (auto &port : readPorts) + delete port.second; + for (auto &port : writePorts) + delete port.second; +} + +ReadChannelPort &Impl::registerReadPort(const std::string &name, + const std::string &type) { + auto port = new RpcServerReadPort(new Type(type)); + readPorts.emplace(name, port); + return *port; +} +WriteChannelPort &Impl::registerWritePort(const std::string &name, + const std::string &type) { + auto port = new RpcServerWritePort(new Type(type)); + writePorts.emplace(name, port); + return *port; +} + +void Impl::stop() { + server->Shutdown(); + server->Wait(); +} + +grpc::ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context, + const VoidMessage *, + ListOfChannels *channelsOut) { + for (auto [name, port] : readPorts) { + auto *channel = channelsOut->add_channels(); + channel->set_name(name); + channel->set_type(port->getType()->getID()); + channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER); + } + for (auto [name, port] : writePorts) { + auto *channel = channelsOut->add_channels(); + channel->set_name(name); + channel->set_type(port->getType()->getID()); + channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT); + } + auto reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; +} + +grpc::ServerWriteReactor * +Impl::ConnectToClientChannel(CallbackServerContext *context, + const ChannelDesc *request) { + printf("connect to client channel\n"); + auto it = writePorts.find(request->name()); + if (it == writePorts.end()) { + auto reactor = new RpcServerWriteReactor(nullptr); + reactor->Finish(Status(grpc::StatusCode::NOT_FOUND, "Unknown channel")); + return reactor; + } + return new RpcServerWriteReactor(it->second); +} + +void RpcServerWriteReactor::threadLoop() { + printf("thread loop\n"); + while (!shutdown) { + // TODO: adapt this to a new notification mechanism which is forthcoming. + if (writePort->writeQueue.empty()) + std::this_thread::sleep_for(std::chrono::microseconds(100)); + else + printf("queue not empty\n"); + + writePort->writeQueue.pop([this](const MessageData &data) -> bool { + printf("attempting to send message\n"); + std::unique_lock lock(msgMutex); + msg.set_data(reinterpret_cast(data.getBytes()), + data.getSize()); + sentSuccessfully = false; + StartWrite(&msg); + sentSuccessfullyCV.wait(lock); + bool ret = sentSuccessfully; + lock.unlock(); + printf("pop'd message\n"); + return ret; + }); + } +} + +grpc::ServerUnaryReactor * +Impl::SendToServer(CallbackServerContext *context, + const esi::cosim::AddressedMessage *request, + esi::cosim::VoidMessage *response) { + auto reactor = context->DefaultReactor(); + auto it = readPorts.find(request->channel_name()); + if (it == readPorts.end()) { + reactor->Finish(Status(grpc::StatusCode::NOT_FOUND, "Unknown channel")); + return reactor; + } + + std::string msgDataString = request->message().data(); + MessageData data(reinterpret_cast(msgDataString.data()), + msgDataString.size()); + it->second->gotMessage(data); + reactor->Finish(Status::OK); + return reactor; +} diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h index fe59c806f39e..3bbef62c967f 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h @@ -17,12 +17,70 @@ #define ESI_UTILS_H #include +#include +#include +#include +#include #include namespace esi { namespace utils { // Very basic base64 encoding. void encodeBase64(const void *data, size_t size, std::string &out); + +/// Thread safe queue. Just wraps std::queue protected with a lock. Long term, +/// we need to avoid copying data. It has a lot of data copies currently. +template +class TSQueue { + using Lock = std::lock_guard; + + mutable std::mutex m; + std::queue q; + +public: + /// Push onto the queue. + template + void push(E... t) { + Lock l(m); + q.emplace(t...); + } + + /// Pop something off the queue but return nullopt if the queue is empty. Why + /// doesn't std::queue have anything like this? + std::optional pop() { + Lock l(m); + if (q.size() == 0) + return std::nullopt; + auto t = q.front(); + q.pop(); + return t; + } + + /// Call the callback for the front of the queue (if anything is there). Only + /// pop it off the queue if the callback returns true. + void pop(std::function callback) { + // TODO: since we need to unlock the mutex to call the callback, the queue + // could be pushed on to and its memory layout could thusly change, + // invalidating the reference returned by `.front()`. The easy solution here + // is to copy the data. Avoid copying the data. + T t; + { + Lock l(m); + if (q.size() == 0) + return; + t = q.front(); + } + if (callback(t)) { + Lock l(m); + q.pop(); + } + } + + bool empty() const { + Lock l(m); + return q.empty(); + } +}; } // namespace utils } // namespace esi diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h b/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h index 14610a726a60..5256277dc936 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h @@ -31,8 +31,8 @@ namespace esi { namespace cosim { -class RpcClient; -} // namespace cosim +class ChannelDesc; +} namespace backends { namespace cosim { @@ -41,6 +41,8 @@ namespace cosim { class CosimAccelerator : public esi::AcceleratorConnection { public: CosimAccelerator(Context &, std::string hostname, uint16_t port); + ~CosimAccelerator(); + static std::unique_ptr connect(Context &, std::string connectionString); @@ -58,6 +60,10 @@ class CosimAccelerator : public esi::AcceleratorConnection { virtual std::map requestChannelsFor(AppIDPath, const BundleType *) override; + // C++ doesn't have a mechanism to forward declare a nested class and we don't + // want to include the generated header here + struct StubContainer; + protected: virtual Service *createService(Service::Type service, AppIDPath path, std::string implName, @@ -65,7 +71,11 @@ class CosimAccelerator : public esi::AcceleratorConnection { const HWClientDetails &clients) override; private: - std::unique_ptr rpcClient; + std::unique_ptr rpcClient; + + /// Get the type ID for a channel name. + bool getChannelDesc(const std::string &channelName, + esi::cosim::ChannelDesc &desc); // We own all channels connected to rpcClient since their lifetime is tied to // rpcClient. diff --git a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp index 8ec6493be790..4ab56a414d9d 100644 --- a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp +++ b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp @@ -15,8 +15,15 @@ #include "esi/backends/Cosim.h" #include "esi/Services.h" +#include "esi/Utils.h" -#include "cosim/CapnpThreads.h" +#include "cosim.grpc.pb.h" + +#include +#include +#include +#include +#include #include #include @@ -25,9 +32,29 @@ using namespace std; using namespace esi; +using namespace esi::cosim; using namespace esi::services; using namespace esi::backends::cosim; +using grpc::Channel; +using grpc::ClientContext; +using grpc::ClientReader; +using grpc::ClientReaderWriter; +using grpc::ClientWriter; +using grpc::Status; + +static void checkStatus(Status s, string msg) { + if (!s.ok()) + throw runtime_error(msg + ". Code " + to_string(s.error_code()) + ": " + + s.error_message() + " (" + s.error_details() + ")"); +} + +struct esi::backends::cosim::CosimAccelerator::StubContainer { + StubContainer(std::unique_ptr stub) + : stub(std::move(stub)) {} + std::unique_ptr stub; +}; + /// Parse the connection string and instantiate the accelerator. Support the /// traditional 'host:port' syntax and a path to 'cosim.cfg' which is output by /// the cosimulation when it starts (which is useful when it chooses its own @@ -80,142 +107,180 @@ CosimAccelerator::CosimAccelerator(Context &ctxt, string hostname, uint16_t port) : AcceleratorConnection(ctxt) { // Connect to the simulation. - rpcClient = std::make_unique(); - rpcClient->run(hostname, port); + auto channel = grpc::CreateChannel(hostname + ":" + to_string(port), + grpc::InsecureChannelCredentials()); + rpcClient = std::make_unique(ChannelServer::NewStub(channel)); } - -namespace { -class CosimMMIO : public MMIO { -public: - CosimMMIO(esi::cosim::LowLevel *lowLevel) : lowLevel(lowLevel) {} - - // Push the read request into the LowLevel capnp bridge and wait for the - // response. - uint32_t read(uint32_t addr) const override { - lowLevel->readReqs.push(addr); - - std::optional> resp; - while (resp = lowLevel->readResps.pop(), !resp.has_value()) - std::this_thread::sleep_for(std::chrono::microseconds(10)); - if (resp->second != 0) - throw runtime_error("MMIO read error" + to_string(resp->second)); - return resp->first; - } - - // Push the write request into the LowLevel capnp bridge and wait for the ack - // or error. - void write(uint32_t addr, uint32_t data) override { - lowLevel->writeReqs.push(make_pair(addr, data)); - - std::optional resp; - while (resp = lowLevel->writeResps.pop(), !resp.has_value()) - std::this_thread::sleep_for(std::chrono::microseconds(10)); - if (*resp != 0) - throw runtime_error("MMIO write error" + to_string(*resp)); - } - -private: - esi::cosim::LowLevel *lowLevel; -}; -} // namespace +CosimAccelerator::~CosimAccelerator() { channels.clear(); } + +// TODO: Fix MMIO! +// namespace { +// class CosimMMIO : public MMIO { +// public: +// CosimMMIO(esi::cosim::LowLevel *lowLevel) : lowLevel(lowLevel) {} + +// // Push the read request into the LowLevel capnp bridge and wait for the +// // response. +// uint32_t read(uint32_t addr) const override { +// lowLevel->readReqs.push(addr); + +// std::optional> resp; +// while (resp = lowLevel->readResps.pop(), !resp.has_value()) +// std::this_thread::sleep_for(std::chrono::microseconds(10)); +// if (resp->second != 0) +// throw runtime_error("MMIO read error" + to_string(resp->second)); +// return resp->first; +// } + +// // Push the write request into the LowLevel capnp bridge and wait for the +// ack +// // or error. +// void write(uint32_t addr, uint32_t data) override { +// lowLevel->writeReqs.push(make_pair(addr, data)); + +// std::optional resp; +// while (resp = lowLevel->writeResps.pop(), !resp.has_value()) +// std::this_thread::sleep_for(std::chrono::microseconds(10)); +// if (*resp != 0) +// throw runtime_error("MMIO write error" + to_string(*resp)); +// } + +// private: +// esi::cosim::LowLevel *lowLevel; +// }; +// } // namespace namespace { class CosimSysInfo : public SysInfo { public: - CosimSysInfo(const std::unique_ptr &rpcClient) + CosimSysInfo( + const std::unique_ptr &rpcClient) : rpcClient(rpcClient) {} uint32_t getEsiVersion() const override { - unsigned int esiVersion; - std::vector compressedManifest; - if (!rpcClient->getCompressedManifest(esiVersion, compressedManifest)) - throw runtime_error("Could not get ESI version from cosim"); - return esiVersion; + ::esi::cosim::Manifest response = getManifest(); + return response.esi_version(); } vector getCompressedManifest() const override { - unsigned int esiVersion; - std::vector compressedManifest; - if (!rpcClient->getCompressedManifest(esiVersion, compressedManifest)) - throw runtime_error("Could not get ESI version from cosim"); - return compressedManifest; + ::esi::cosim::Manifest response = getManifest(); + std::string compressedManifestStr = response.compressed_manifest(); + return std::vector(compressedManifestStr.begin(), + compressedManifestStr.end()); } private: - const std::unique_ptr &rpcClient; + ::esi::cosim::Manifest getManifest() const { + ::esi::cosim::Manifest response; + do { + ClientContext context; + VoidMessage arg; + Status s = rpcClient->GetManifest(&context, arg, &response); + checkStatus(s, "Failed to get manifest"); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } while (response.esi_version() < 0); + return response; + } + + const std::unique_ptr &rpcClient; }; } // namespace namespace { class WriteCosimChannelPort : public WriteChannelPort { public: - WriteCosimChannelPort(esi::cosim::Endpoint *ep, const Type *type, string name) - : WriteChannelPort(type), ep(ep), name(name) {} - virtual ~WriteCosimChannelPort() = default; - - // TODO: Replace this with a request to connect to the capnp thread. - virtual void connect() override { - if (!ep) - throw runtime_error("Could not find channel '" + name + - "' in cosimulation"); - if (ep->getSendTypeId() == "") - throw runtime_error("Channel '" + name + "' is not a read channel"); - if (ep->getSendTypeId() != getType()->getID()) + WriteCosimChannelPort(ChannelServer::Stub *rpcClient, const ChannelDesc &desc, + const Type *type, string name) + : WriteChannelPort(type), rpcClient(rpcClient), desc(desc), name(name) {} + ~WriteCosimChannelPort() = default; + + void connect() override { + WriteChannelPort::connect(); + if (desc.type() != getType()->getID()) throw runtime_error("Channel '" + name + "' has wrong type. Expected " + - getType()->getID() + ", got " + ep->getSendTypeId()); - ep->setInUse(); + getType()->getID() + ", got " + desc.type()); + if (desc.dir() != ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER) + throw runtime_error("Channel '" + name + "' is not a to server channel"); + assert(desc.name() == name); } - virtual void disconnect() override { - if (ep) - ep->returnForUse(); + + void write(const MessageData &data) override { + ClientContext context; + AddressedMessage msg; + msg.set_channel_name(name); + msg.mutable_message()->set_data(data.getBytes(), data.getSize()); + VoidMessage response; + grpc::Status sendStatus = rpcClient->SendToServer(&context, msg, &response); + if (!sendStatus.ok()) + throw runtime_error("Failed to write to channel '" + name + + "': " + sendStatus.error_message() + + ". Details: " + sendStatus.error_details()); } - virtual void write(const MessageData &) override; protected: - esi::cosim::Endpoint *ep; + ChannelServer::Stub *rpcClient; + ChannelDesc desc; string name; }; } // namespace -void WriteCosimChannelPort::write(const MessageData &data) { - ep->pushMessageToSim(make_unique(data)); -} - namespace { -class ReadCosimChannelPort : public ReadChannelPort { +class ReadCosimChannelPort + : public ReadChannelPort, + public grpc::ClientReadReactor { public: - ReadCosimChannelPort(esi::cosim::Endpoint *ep, const Type *type, string name) - : ReadChannelPort(type), ep(ep), name(name) {} - virtual ~ReadCosimChannelPort() = default; + ReadCosimChannelPort(ChannelServer::Stub *rpcClient, const ChannelDesc &desc, + const Type *type, string name) + : ReadChannelPort(type), rpcClient(rpcClient), desc(desc), name(name), + context(nullptr) {} + virtual ~ReadCosimChannelPort() { disconnect(); } - // TODO: Replace this with a request to connect to the capnp thread. virtual void connect() override { - if (!ep) - throw runtime_error("Could not find channel '" + name + - "' in cosimulation"); - if (ep->getRecvTypeId() == "") - throw runtime_error("Channel '" + name + "' is not a read channel"); - if (ep->getRecvTypeId() != getType()->getID()) + if (desc.type() != getType()->getID()) throw runtime_error("Channel '" + name + "' has wrong type. Expected " + - getType()->getID() + ", got " + ep->getRecvTypeId()); - ep->setInUse(); + getType()->getID() + ", got " + desc.type()); + if (desc.dir() != ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT) + throw runtime_error("Channel '" + name + "' is not a to server channel"); + assert(desc.name() == name); + + context = new ClientContext(); + rpcClient->async()->ConnectToClientChannel(context, &desc, this); + StartCall(); + StartRead(&incomingMessage); + } + void OnReadDone(bool ok) override { + if (!ok) + return; + const std::string &messageString = incomingMessage.data(); + MessageData data(reinterpret_cast(messageString.data()), + messageString.size()); + messageQueue.push(data); + StartRead(&incomingMessage); } - virtual void disconnect() override { - if (ep) - ep->returnForUse(); + void disconnect() override { + if (!context) + return; + context->TryCancel(); + delete context; + context = nullptr; } - virtual bool read(MessageData &) override; + bool read(MessageData &) override; protected: - esi::cosim::Endpoint *ep; + ChannelServer::Stub *rpcClient; + ChannelDesc desc; string name; + + ClientContext *context; + esi::cosim::Message incomingMessage; + esi::utils::TSQueue messageQueue; }; } // namespace bool ReadCosimChannelPort::read(MessageData &data) { - esi::cosim::Endpoint::MessageDataPtr msg; - if (!ep->getMessageToClient(msg)) + std::optional msg = messageQueue.pop(); + if (!msg.has_value()) return false; data = *msg; return true; @@ -242,18 +307,40 @@ CosimAccelerator::requestChannelsFor(AppIDPath idPath, // Get the endpoint, which may or may not exist. Construct the port. // Everything is validated when the client calls 'connect()' on the port. - esi::cosim::Endpoint *ep = rpcClient->getEndpoint(channelName); + ChannelDesc chDesc; + if (!getChannelDesc(channelName, chDesc)) + throw runtime_error("Could not find channel '" + channelName + + "' in cosimulation"); + ChannelPort *port; - if (BundlePort::isWrite(dir)) - port = new WriteCosimChannelPort(ep, type, channelName); - else - port = new ReadCosimChannelPort(ep, type, channelName); + if (BundlePort::isWrite(dir)) { + port = new WriteCosimChannelPort(rpcClient->stub.get(), chDesc, type, + channelName); + } else { + port = new ReadCosimChannelPort(rpcClient->stub.get(), chDesc, type, + channelName); + } channels.emplace(port); channelResults.emplace(name, *port); } return channelResults; } +bool CosimAccelerator::getChannelDesc(const string &channelName, + ChannelDesc &desc) { + ClientContext context; + VoidMessage arg; + ListOfChannels response; + Status s = rpcClient->stub->ListChannels(&context, arg, &response); + checkStatus(s, "Failed to list channels"); + for (const auto &channel : response.channels()) + if (channel.name() == channelName) { + desc = channel; + return true; + } + return false; +} + Service *CosimAccelerator::createService(Service::Type svcType, AppIDPath idPath, std::string implName, const ServiceImplDetails &details, @@ -277,11 +364,11 @@ Service *CosimAccelerator::createService(Service::Type svcType, } if (svcType == typeid(services::MMIO)) { - return new CosimMMIO(rpcClient->getLowLevel()); + // return new CosimMMIO(rpcClient->getLowLevel()); } else if (svcType == typeid(SysInfo)) { switch (manifestMethod) { case ManifestMethod::Cosim: - return new CosimSysInfo(rpcClient); + return new CosimSysInfo(rpcClient->stub); case ManifestMethod::MMIO: return new MMIOSysInfo(getService()); } diff --git a/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp b/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp index 1d4fc64df4f9..f54af9f27ffd 100644 --- a/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp +++ b/lib/Dialect/ESI/runtime/cpp/tools/esitester.cpp @@ -60,6 +60,8 @@ int main(int argc, const char *argv[]) { while (true) { this_thread::sleep_for(chrono::milliseconds(100)); } + } else if (cmd == "wait") { + this_thread::sleep_for(chrono::seconds(1)); } acc->disconnect(); diff --git a/utils/get-capnp.sh b/utils/get-capnp.sh deleted file mode 100755 index 18cf72248423..000000000000 --- a/utils/get-capnp.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env bash -##===- utils/get-capnp.sh - Install CapnProto ----------------*- Script -*-===## -# -# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. -# See https://llvm.org/LICENSE.txt for license information. -# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -# -##===----------------------------------------------------------------------===## -# -# This script downloads, compiles, and installs CapnProto into $/ext. -# Cap'nProto is use by ESI (Elastic Silicon Interfaces) cosimulation as a -# message format and RPC client/server. -# -# It will also optionally install pycapnp, which is used for testing. -# -##===----------------------------------------------------------------------===## - -echo "Do you wish to install pycapnp? Cosim integration tests require pycapnp." -read -p "Yes to confirm: " yn -case $yn in - [Yy]* ) pip3 install pycapnp;; - * ) echo "Skipping.";; -esac - -mkdir -p "$(dirname "$BASH_SOURCE[0]")/../ext" -EXT_DIR=$(cd "$(dirname "$BASH_SOURCE[0]")/../ext" && pwd) -CAPNP_VER=0.9.1 -echo "Installing capnproto..." - -echo $EXT_DIR -cd $EXT_DIR - -wget https://capnproto.org/capnproto-c++-$CAPNP_VER.tar.gz -tar -zxf capnproto-c++-$CAPNP_VER.tar.gz -cd capnproto-c++-$CAPNP_VER -./configure --prefix=$EXT_DIR -make -j$(nproc) -make install -cd ../ -rm -r capnproto-c++-$CAPNP_VER capnproto-c++-$CAPNP_VER.tar.gz - -echo "Done." diff --git a/utils/get-grpc.sh b/utils/get-grpc.sh new file mode 100755 index 000000000000..d2c1313e1ed2 --- /dev/null +++ b/utils/get-grpc.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +##===- utils/get-grpc.sh - Install gRPC (for ESI runtime) ----*- Script -*-===## +# +# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. +# See https://llvm.org/LICENSE.txt for license information. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +##===----------------------------------------------------------------------===## +# +# +##===----------------------------------------------------------------------===## + +mkdir -p "$(dirname "$BASH_SOURCE[0]")/../ext" +EXT_DIR=$(cd "$(dirname "$BASH_SOURCE[0]")/../ext" && pwd) +GRPC_VER=1.64.2 +echo "Installing gRPC..." + +echo $EXT_DIR +cd $EXT_DIR + +if [ ! -d grpc ]; then + git clone --recurse-submodules -b v$GRPC_VER https://github.com/grpc/grpc +fi +cd grpc +mkdir -p cmake/build +cd cmake/build +cmake -S ../.. -B . -DCMAKE_INSTALL_PREFIX=$EXT_DIR \ + -DgRPC_INSTALL=ON \ + -DCMAKE_BUILD_TYPE=Debug +make -j$(nproc) +make install + +cd ../../../ +rm -rf grpc + +echo "Done." From 6adaaeb301b1e4edfd8ae7b75842fc550582ef47 Mon Sep 17 00:00:00 2001 From: John Demme Date: Fri, 21 Jun 2024 05:00:21 +0000 Subject: [PATCH 3/4] self-review --- .github/workflows/nightlyIntegrationTests.yml | 2 +- .github/workflows/shortIntegrationTests.yml | 2 +- lib/Dialect/ESI/runtime/CMakeLists.txt | 20 +- lib/Dialect/ESI/runtime/cosim/cosim.proto | 23 +- .../cosim/cosim_dpi_server/CMakeLists.txt | 1 - .../cosim/cosim_dpi_server/DpiEntryPoints.cpp | 8 + .../cosim/include/esi/cosim/RpcServer.h | 6 + .../ESI/runtime/cosim/lib/RpcServer.cpp | 288 ++++++++++-------- .../ESI/runtime/cpp/include/esi/Utils.h | 26 +- .../runtime/cpp/include/esi/backends/Cosim.h | 5 +- .../ESI/runtime/cpp/lib/backends/Cosim.cpp | 63 +++- utils/get-grpc.sh | 3 +- 12 files changed, 278 insertions(+), 169 deletions(-) diff --git a/.github/workflows/nightlyIntegrationTests.yml b/.github/workflows/nightlyIntegrationTests.yml index 478299f63099..f8967c10426a 100644 --- a/.github/workflows/nightlyIntegrationTests.yml +++ b/.github/workflows/nightlyIntegrationTests.yml @@ -19,7 +19,7 @@ jobs: # John and re-run the job. runs-on: ["self-hosted", "1ES.Pool=1ES-CIRCT-builds", "linux"] container: - image: ghcr.io/circt/images/circt-integration-test:v13.1 + image: ghcr.io/circt/images/circt-integration-test:v15.0 volumes: - /mnt:/__w/circt strategy: diff --git a/.github/workflows/shortIntegrationTests.yml b/.github/workflows/shortIntegrationTests.yml index 0af6f7e252f8..78b85ed60db5 100644 --- a/.github/workflows/shortIntegrationTests.yml +++ b/.github/workflows/shortIntegrationTests.yml @@ -29,7 +29,7 @@ jobs: # John and re-run the job. runs-on: ["self-hosted", "1ES.Pool=1ES-CIRCT-builds", "linux"] container: - image: ghcr.io/circt/images/circt-integration-test:v13.1 + image: ghcr.io/circt/images/circt-integration-test:v15.0 volumes: - /mnt:/__w/circt strategy: diff --git a/lib/Dialect/ESI/runtime/CMakeLists.txt b/lib/Dialect/ESI/runtime/CMakeLists.txt index 4cdf9bc9249b..7756371f2d0d 100644 --- a/lib/Dialect/ESI/runtime/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/CMakeLists.txt @@ -37,7 +37,7 @@ find_package(ZLIB REQUIRED) # JSON parser for the manifest. if (NOT TARGET nlohmann_json) - message(" -- ESI runtime pulling down json") + message("-- ESI runtime pulling down json") FetchContent_Declare(json GIT_REPOSITORY https://github.com/nlohmann/json.git GIT_TAG v3.11.3 @@ -163,6 +163,14 @@ if (XRT_PATH) ${ESIRuntimeLibDirs} ${XRT_PATH}/lib ) + get_filename_component(XRT_SO ${XRT_PATH}/lib/libxrt_coreutil.so REALPATH) + install(FILES + ${XRT_SO} + ${XRT_PATH}/lib/libxrt_coreutil.so.2 + ${XRT_PATH}/lib/libxrt_coreutil.so + DESTINATION lib + COMPONENT ESIRuntime + ) endif() # The core API. For now, compile the backends into it directly. @@ -181,16 +189,6 @@ install(TARGETS ESIRuntime DESTINATION lib COMPONENT ESIRuntime ) -install(IMPORTED_RUNTIME_ARTIFACTS ESIRuntime - RUNTIME_DEPENDENCY_SET ESIRuntime_RUNTIME_DEPS - DESTINATION lib - COMPONENT ESIRuntime -) -# install(RUNTIME_DEPENDENCY_SET ESIRuntime_RUNTIME_DEPS -# DESTINATION lib -# PRE_EXCLUDE_REGEXES .* -# COMPONENT ESIRuntime -# ) install(FILES ${ESIRuntimeHeaders} DESTINATION include/esi COMPONENT ESIRuntime-dev diff --git a/lib/Dialect/ESI/runtime/cosim/cosim.proto b/lib/Dialect/ESI/runtime/cosim/cosim.proto index 1f14a9163df3..5bbde44077e5 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim.proto +++ b/lib/Dialect/ESI/runtime/cosim/cosim.proto @@ -20,6 +20,7 @@ syntax = "proto3"; package esi.cosim; +// Description of a channel that can be connected to by the client. message ChannelDesc { string name = 1; @@ -32,31 +33,39 @@ message ChannelDesc { string type = 3; } +// List of channels that the client can connect to. message ListOfChannels { repeated ChannelDesc channels = 1; } +// Empty message since gRPC only supports exactly one argument and return. message VoidMessage {} +// The manifest package. message Manifest { int32 esi_version = 1; bytes compressed_manifest = 2; } +// An ESI message. message Message { bytes data = 1; } + +// An ESI message and the channel to which is should be directed. message AddressedMessage { string channel_name = 1; Message message = 2; } -// message MessageStream { -// oneof PayloadOrHeader { -// string channel_name = 1; -// Message message = 2; -// } -// } - +// The server interface provided by the ESI cosim server. service ChannelServer { + // Get the manifest embedded in the accelertor. rpc GetManifest(VoidMessage) returns (Manifest) {} + + // List the channels that the client can connect to. rpc ListChannels(VoidMessage) returns (ListOfChannels) {} + + // Send a message to the server. rpc SendToServer(AddressedMessage) returns (VoidMessage) {} + + // Connect to a client channel and return a stream of messages coming from + // that channel. rpc ConnectToClientChannel(ChannelDesc) returns (stream Message) {} } diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt index 46a7d7bc16b1..bea992f0ca62 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/CMakeLists.txt @@ -16,7 +16,6 @@ set_target_properties(EsiCosimDpiServer add_dependencies(EsiCosimDpiServer ESIRuntime MtiPli) target_link_libraries(EsiCosimDpiServer PRIVATE - EsiCosimCapnp ESIRuntime CosimRpcServer MtiPli diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp index 9e1c017508bc..cd11af5bd1c8 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp @@ -103,11 +103,15 @@ static int validateSvOpenArray(const svOpenArrayHandle data, // ---- Traditional cosim DPI entry points ---- +// Lookups for registered ports. As a future optimization, change the DPI API to +// return a handle when registering wherein said handle is a pointer to a port. std::map readPorts; std::map writePorts; // Register simulated device endpoints. // - return 0 on success, non-zero on failure (duplicate EP registered). +// TODO: Change this by breaking it in two functions, one for read and one for +// write. Also return the pointer as a handle. DPI int sv2cCosimserverEpRegister(char *endpointId, char *fromHostTypeIdC, int fromHostTypeSize, char *toHostTypeIdC, int toHostTypeSize) { @@ -312,6 +316,10 @@ sv2cCosimserverSetManifest(int esiVersion, // ---- Low-level cosim DPI entry points ---- +// TODO: These had the shit broken outta them in the gRPC conversion. We're not +// actively using them at the moment, but they'll have to be revived again in +// the future. + static bool mmioRegistered = false; DPI int sv2cCosimserverMMIORegister() { if (mmioRegistered) { diff --git a/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h index 017f739b679b..7d3a54c18e87 100644 --- a/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h +++ b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h @@ -25,8 +25,13 @@ class RpcServer { public: ~RpcServer(); + /// Set the manifest and version. There is a race condition here in that the + /// RPC server can be started and a connection from the client could happen + /// before the manifest is set. TODO: rework the DPI API to require that the + /// manifest gets set first. void setManifest(int esiVersion, std::vector compressedManifest); + /// Register a read or write port which communicates over RPC. ReadChannelPort ®isterReadPort(const std::string &name, const std::string &type); WriteChannelPort ®isterWritePort(const std::string &name, @@ -35,6 +40,7 @@ class RpcServer { void stop(); void run(int port); + /// Hide the implementation details from this header file. class Impl; private: diff --git a/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp index d93984247cd5..d13500c57d9d 100644 --- a/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp +++ b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp @@ -26,12 +26,14 @@ using namespace esi::cosim; using grpc::CallbackServerContext; using grpc::Server; -using grpc::ServerBuilder; +using grpc::ServerUnaryReactor; +using grpc::ServerWriteReactor; using grpc::Status; +using grpc::StatusCode; -/// Write the port number to a file. Necessary when we allow 'EzRpcServer' to -/// select its own port. We can't use stdout/stderr because the flushing -/// semantics are undefined (as in `flush()` doesn't work on all simulators). +/// Write the port number to a file. Necessary when we are allowed to select our +/// own port. We can't use stdout/stderr because the flushing semantics are +/// undefined (as in `flush()` doesn't work on all simulators). static void writePort(uint16_t port) { // "cosim.cfg" since we may want to include other info in the future. FILE *fd = fopen("cosim.cfg", "w"); @@ -67,33 +69,20 @@ class esi::cosim::RpcServer::Impl void stop(); //===--------------------------------------------------------------------===// - // RPC API implementations. + // RPC API implementations. See the .proto file for the API documentation. //===--------------------------------------------------------------------===// - grpc::ServerUnaryReactor *GetManifest(CallbackServerContext *context, - const VoidMessage *, - Manifest *response) override { - printf("GetManifest\n"); - fflush(stdout); - response->set_esi_version(esiVersion); - response->set_compressed_manifest(compressedManifest.data(), - compressedManifest.size()); - auto reactor = context->DefaultReactor(); - reactor->Finish(Status::OK); - return reactor; - } - - grpc::ServerUnaryReactor *ListChannels(CallbackServerContext *, - const VoidMessage *, - ListOfChannels *channelsOut) override; - - grpc::ServerWriteReactor * + ServerUnaryReactor *GetManifest(CallbackServerContext *context, + const VoidMessage *, + Manifest *response) override; + ServerUnaryReactor *ListChannels(CallbackServerContext *, const VoidMessage *, + ListOfChannels *channelsOut) override; + ServerWriteReactor * ConnectToClientChannel(CallbackServerContext *context, const ChannelDesc *request) override; - grpc::ServerUnaryReactor * - SendToServer(CallbackServerContext *context, - const esi::cosim::AddressedMessage *request, - esi::cosim::VoidMessage *response) override; + ServerUnaryReactor *SendToServer(CallbackServerContext *context, + const esi::cosim::AddressedMessage *request, + esi::cosim::VoidMessage *response) override; private: int esiVersion; @@ -105,30 +94,15 @@ class esi::cosim::RpcServer::Impl }; using Impl = esi::cosim::RpcServer::Impl; -RpcServer::~RpcServer() { - if (impl) - delete impl; -} - -void RpcServer::setManifest(int esiVersion, - std::vector compressedManifest) { - impl->setManifest(esiVersion, std::move(compressedManifest)); -} -ReadChannelPort &RpcServer::registerReadPort(const std::string &name, - const std::string &type) { - return impl->registerReadPort(name, type); -} -WriteChannelPort &RpcServer::registerWritePort(const std::string &name, - const std::string &type) { - return impl->registerWritePort(name, type); -} -void RpcServer::run(int port) { impl = new Impl(port); } -void RpcServer::stop() { - assert(impl && "Server not running"); - impl->stop(); -} +//===----------------------------------------------------------------------===// +// Read and write ports +// +// Implemented as simple queues which the RPC server writes to and reads from. +//===----------------------------------------------------------------------===// namespace { +/// Implements a simple read queue. The RPC server will push messages into this +/// as appropriate. class RpcServerReadPort : public ReadChannelPort { public: RpcServerReadPort(Type *type) : ReadChannelPort(type) {} @@ -140,62 +114,34 @@ class RpcServerReadPort : public ReadChannelPort { data = std::move(*msg); return true; } - void gotMessage(MessageData &data) { readQueue.push(std::move(data)); } -private: utils::TSQueue readQueue; }; +/// Implements a simple write queue. The RPC server will pull messages from this +/// as appropriate. Note that this could be more performant if a callback is +/// used. This would have more complexity as when a client disconnects the +/// outstanding messages will need somewhere to be held until the next client +/// connects. For now, it's simpler to just have the server poll the queue. class RpcServerWritePort : public WriteChannelPort { public: RpcServerWritePort(Type *type) : WriteChannelPort(type) {} - - void write(const MessageData &data) override { - writeQueue.push(data); - printf("pushed message\n"); - } + void write(const MessageData &data) override { writeQueue.push(data); } utils::TSQueue writeQueue; }; - -class RpcServerWriteReactor - : public grpc::ServerWriteReactor { -public: - RpcServerWriteReactor(RpcServerWritePort *writePort) - : writePort(writePort), sentSuccessfully(false), shutdown(false) { - myThread = std::thread(&RpcServerWriteReactor::threadLoop, this); - } - void OnDone() override { delete this; } - void OnWriteDone(bool ok) override { - printf("on write done\n"); - std::scoped_lock lock(msgMutex); - sentSuccessfully = ok; - sentSuccessfullyCV.notify_one(); - } - void OnCancel() override { - printf("on cancel\n"); - std::scoped_lock lock(msgMutex); - sentSuccessfully = false; - sentSuccessfullyCV.notify_one(); - } - - void threadLoop(); - - RpcServerWritePort *writePort; - std::thread myThread; - - std::mutex msgMutex; - esi::cosim::Message msg; - std::condition_variable sentSuccessfullyCV; - std::atomic sentSuccessfully; - std::atomic shutdown; -}; - } // namespace +//===----------------------------------------------------------------------===// +// RPC server implementations +//===----------------------------------------------------------------------===// + +/// Start a server on the given port. -1 means to let the OS pick a port. Impl::Impl(int port) : esiVersion(-1) { - ServerBuilder builder; + grpc::ServerBuilder builder; std::string server_address("127.0.0.1:" + std::to_string(port)); + // TODO: use secure credentials. Not so bad for now since we only accept + // connections on localhost. builder.AddListeningPort(server_address, grpc::InsecureServerCredentials(), &port); builder.RegisterService(this); @@ -206,7 +152,16 @@ Impl::Impl(int port) : esiVersion(-1) { std::cout << "Server listening on 127.0.0.1:" << port << std::endl; } +void Impl::stop() { + /// Shutdown the server and wait for it to finish. + server->Shutdown(); + server->Wait(); + server = nullptr; +} + Impl::~Impl() { + if (server) + stop(); for (auto &port : readPorts) delete port.second; for (auto &port : writePorts) @@ -226,14 +181,20 @@ WriteChannelPort &Impl::registerWritePort(const std::string &name, return *port; } -void Impl::stop() { - server->Shutdown(); - server->Wait(); +ServerUnaryReactor *Impl::GetManifest(CallbackServerContext *context, + const VoidMessage *, Manifest *response) { + response->set_esi_version(esiVersion); + response->set_compressed_manifest(compressedManifest.data(), + compressedManifest.size()); + ServerUnaryReactor *reactor = context->DefaultReactor(); + reactor->Finish(Status::OK); + return reactor; } -grpc::ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context, - const VoidMessage *, - ListOfChannels *channelsOut) { +/// Load the list of channels into the response and fire it off. +ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context, + const VoidMessage *, + ListOfChannels *channelsOut) { for (auto [name, port] : readPorts) { auto *channel = channelsOut->add_channels(); channel->set_name(name); @@ -246,64 +207,149 @@ grpc::ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context, channel->set_type(port->getType()->getID()); channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_CLIENT); } + + // The default reactor is basically to just finish the RPC call as if we're + // implementing the RPC function as a blocking call. auto reactor = context->DefaultReactor(); reactor->Finish(Status::OK); return reactor; } -grpc::ServerWriteReactor * -Impl::ConnectToClientChannel(CallbackServerContext *context, - const ChannelDesc *request) { - printf("connect to client channel\n"); - auto it = writePorts.find(request->name()); - if (it == writePorts.end()) { - auto reactor = new RpcServerWriteReactor(nullptr); - reactor->Finish(Status(grpc::StatusCode::NOT_FOUND, "Unknown channel")); - return reactor; +namespace { +/// When a client connects to a read port (on its end, a write port on this +/// end), construct one of these to poll the corresponding write port on this +/// side and forward the messages. +class RpcServerWriteReactor : public ServerWriteReactor { +public: + RpcServerWriteReactor(RpcServerWritePort *writePort) + : writePort(writePort), sentSuccessfully(SendStatus::UnknownStatus), + shutdown(false) { + myThread = std::thread(&RpcServerWriteReactor::threadLoop, this); } - return new RpcServerWriteReactor(it->second); -} + ~RpcServerWriteReactor() { + shutdown = true; + myThread.join(); + } + + void OnDone() override { delete this; } + void OnWriteDone(bool ok) override { + std::scoped_lock lock(sentMutex); + sentSuccessfully = ok ? SendStatus::Success : SendStatus::Failure; + sentSuccessfullyCV.notify_one(); + } + void OnCancel() override { + std::scoped_lock lock(sentMutex); + sentSuccessfully = SendStatus::Disconnect; + sentSuccessfullyCV.notify_one(); + } + +private: + /// The polling loop. + void threadLoop(); + /// The polling thread. + std::thread myThread; + + /// Assoicated write port on this side. (Read port on the client side.) + RpcServerWritePort *writePort; + + /// Mutex to protect the sentSuccessfully flag. + std::mutex sentMutex; + enum SendStatus { UnknownStatus, Success, Failure, Disconnect }; + volatile SendStatus sentSuccessfully; + std::condition_variable sentSuccessfullyCV; + + volatile bool shutdown; +}; + +} // namespace void RpcServerWriteReactor::threadLoop() { - printf("thread loop\n"); - while (!shutdown) { + while (!shutdown && sentSuccessfully != SendStatus::Disconnect) { // TODO: adapt this to a new notification mechanism which is forthcoming. if (writePort->writeQueue.empty()) std::this_thread::sleep_for(std::chrono::microseconds(100)); - else - printf("queue not empty\n"); + // This lambda will get called with the message at the front of the queue. + // If the send is successful, return true to pop it. We don't know, however, + // if the message was sent successfully in this thread. It's only when the + // `OnWriteDone` method is called by gRPC that we know. Use locking and + // condition variables to orchestrate this confirmation. writePort->writeQueue.pop([this](const MessageData &data) -> bool { - printf("attempting to send message\n"); - std::unique_lock lock(msgMutex); + esi::cosim::Message msg; msg.set_data(reinterpret_cast(data.getBytes()), data.getSize()); - sentSuccessfully = false; + + // Get a lock, reset the flag, start sending the message, and wait for the + // write to complete or fail. Be mindful of the shutdown flag. + std::unique_lock lock(sentMutex); + sentSuccessfully = SendStatus::UnknownStatus; StartWrite(&msg); - sentSuccessfullyCV.wait(lock); - bool ret = sentSuccessfully; + while (!shutdown && sentSuccessfully == SendStatus::UnknownStatus) + sentSuccessfullyCV.wait_for(lock, std::chrono::milliseconds(10)); + bool ret = sentSuccessfully == SendStatus::Success; lock.unlock(); - printf("pop'd message\n"); return ret; }); } } -grpc::ServerUnaryReactor * +/// When a client sends a message to a read port (write port on this end), start +/// streaming messages until the client calls uncle and requests a cancellation. +ServerWriteReactor * +Impl::ConnectToClientChannel(CallbackServerContext *context, + const ChannelDesc *request) { + printf("connect to client channel\n"); + auto it = writePorts.find(request->name()); + if (it == writePorts.end()) { + auto reactor = new RpcServerWriteReactor(nullptr); + reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel")); + return reactor; + } + return new RpcServerWriteReactor(it->second); +} + +/// When a client sends a message to a write port (a read port on this end), +/// simply locate the associated port, and write that message into its queue. +ServerUnaryReactor * Impl::SendToServer(CallbackServerContext *context, const esi::cosim::AddressedMessage *request, esi::cosim::VoidMessage *response) { auto reactor = context->DefaultReactor(); auto it = readPorts.find(request->channel_name()); if (it == readPorts.end()) { - reactor->Finish(Status(grpc::StatusCode::NOT_FOUND, "Unknown channel")); + reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel")); return reactor; } std::string msgDataString = request->message().data(); MessageData data(reinterpret_cast(msgDataString.data()), msgDataString.size()); - it->second->gotMessage(data); + it->second->readQueue.push(std::move(data)); reactor->Finish(Status::OK); return reactor; } + +//===----------------------------------------------------------------------===// +// RpcServer pass throughs to the actual implementations above. +//===----------------------------------------------------------------------===// +RpcServer::~RpcServer() { + if (impl) + delete impl; +} +void RpcServer::setManifest(int esiVersion, + std::vector compressedManifest) { + impl->setManifest(esiVersion, std::move(compressedManifest)); +} +ReadChannelPort &RpcServer::registerReadPort(const std::string &name, + const std::string &type) { + return impl->registerReadPort(name, type); +} +WriteChannelPort &RpcServer::registerWritePort(const std::string &name, + const std::string &type) { + return impl->registerWritePort(name, type); +} +void RpcServer::run(int port) { impl = new Impl(port); } +void RpcServer::stop() { + assert(impl && "Server not running"); + impl->stop(); +} diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h index 3bbef62c967f..465cfb6c9f63 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h @@ -34,21 +34,29 @@ template class TSQueue { using Lock = std::lock_guard; - mutable std::mutex m; + /// The queue and its mutex. + mutable std::mutex qM; std::queue q; + /// A mutex to ensure that only one 'pop' operation is happening at a time. It + /// is critical that locks be obtained on this and `qM` same order in both pop + /// methods. This lock should be obtained first since one of the pop methods + /// must unlock `qM` then relock it. + mutable std::mutex popM; + public: /// Push onto the queue. template void push(E... t) { - Lock l(m); + Lock l(qM); q.emplace(t...); } /// Pop something off the queue but return nullopt if the queue is empty. Why /// doesn't std::queue have anything like this? std::optional pop() { - Lock l(m); + Lock pl(popM); + Lock ql(qM); if (q.size() == 0) return std::nullopt; auto t = q.front(); @@ -59,25 +67,27 @@ class TSQueue { /// Call the callback for the front of the queue (if anything is there). Only /// pop it off the queue if the callback returns true. void pop(std::function callback) { - // TODO: since we need to unlock the mutex to call the callback, the queue + // Since we need to unlock the mutex to call the callback, the queue // could be pushed on to and its memory layout could thusly change, // invalidating the reference returned by `.front()`. The easy solution here - // is to copy the data. Avoid copying the data. + // is to copy the data. TODO: Avoid copying the data. + Lock pl(popM); T t; { - Lock l(m); + Lock l(qM); if (q.size() == 0) return; t = q.front(); } if (callback(t)) { - Lock l(m); + Lock l(qM); q.pop(); } } + /// Is the queue empty? bool empty() const { - Lock l(m); + Lock l(qM); return q.empty(); } }; diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h b/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h index 5256277dc936..e78ff4cb5559 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/backends/Cosim.h @@ -61,7 +61,8 @@ class CosimAccelerator : public esi::AcceleratorConnection { requestChannelsFor(AppIDPath, const BundleType *) override; // C++ doesn't have a mechanism to forward declare a nested class and we don't - // want to include the generated header here + // want to include the generated header here. So we have to wrap it in a + // forward-declared struct we write ourselves. struct StubContainer; protected: @@ -71,7 +72,7 @@ class CosimAccelerator : public esi::AcceleratorConnection { const HWClientDetails &clients) override; private: - std::unique_ptr rpcClient; + StubContainer *rpcClient; /// Get the type ID for a channel name. bool getChannelDesc(const std::string &channelName, diff --git a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp index 4ab56a414d9d..b4d9cc391736 100644 --- a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp +++ b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp @@ -49,6 +49,7 @@ static void checkStatus(Status s, string msg) { s.error_message() + " (" + s.error_details() + ")"); } +/// Hack around C++ not having a way to forward declare a nested class. struct esi::backends::cosim::CosimAccelerator::StubContainer { StubContainer(std::unique_ptr stub) : stub(std::move(stub)) {} @@ -109,9 +110,13 @@ CosimAccelerator::CosimAccelerator(Context &ctxt, string hostname, // Connect to the simulation. auto channel = grpc::CreateChannel(hostname + ":" + to_string(port), grpc::InsecureChannelCredentials()); - rpcClient = std::make_unique(ChannelServer::NewStub(channel)); + rpcClient = new StubContainer(ChannelServer::NewStub(channel)); +} +CosimAccelerator::~CosimAccelerator() { + if (rpcClient) + delete rpcClient; + channels.clear(); } -CosimAccelerator::~CosimAccelerator() { channels.clear(); } // TODO: Fix MMIO! // namespace { @@ -153,9 +158,7 @@ CosimAccelerator::~CosimAccelerator() { channels.clear(); } namespace { class CosimSysInfo : public SysInfo { public: - CosimSysInfo( - const std::unique_ptr &rpcClient) - : rpcClient(rpcClient) {} + CosimSysInfo(ChannelServer::Stub *rpcClient) : rpcClient(rpcClient) {} uint32_t getEsiVersion() const override { ::esi::cosim::Manifest response = getManifest(); @@ -172,6 +175,8 @@ class CosimSysInfo : public SysInfo { private: ::esi::cosim::Manifest getManifest() const { ::esi::cosim::Manifest response; + // To get around the a race condition where the manifest may not be set yet, + // loop until it is. TODO: fix this with the DPI API change. do { ClientContext context; VoidMessage arg; @@ -182,11 +187,12 @@ class CosimSysInfo : public SysInfo { return response; } - const std::unique_ptr &rpcClient; + esi::cosim::ChannelServer::Stub *rpcClient; }; } // namespace namespace { +/// Cosim client implementation of a write channel port. class WriteCosimChannelPort : public WriteChannelPort { public: WriteCosimChannelPort(ChannelServer::Stub *rpcClient, const ChannelDesc &desc, @@ -204,6 +210,7 @@ class WriteCosimChannelPort : public WriteChannelPort { assert(desc.name() == name); } + /// Send a write message to the server. void write(const MessageData &data) override { ClientContext context; AddressedMessage msg; @@ -219,12 +226,16 @@ class WriteCosimChannelPort : public WriteChannelPort { protected: ChannelServer::Stub *rpcClient; + /// The channel description as provided by the server. ChannelDesc desc; + /// The name of the channel from the manifest. string name; }; } // namespace namespace { +/// Cosim client implementation of a read channel port. Since gRPC read protocol +/// streams messages back, this implementation is quite complex. class ReadCosimChannelPort : public ReadChannelPort, public grpc::ClientReadReactor { @@ -236,6 +247,8 @@ class ReadCosimChannelPort virtual ~ReadCosimChannelPort() { disconnect(); } virtual void connect() override { + // Sanity checking. + ReadChannelPort::connect(); if (desc.type() != getType()->getID()) throw runtime_error("Channel '" + name + "' has wrong type. Expected " + getType()->getID() + ", got " + desc.type()); @@ -243,20 +256,31 @@ class ReadCosimChannelPort throw runtime_error("Channel '" + name + "' is not a to server channel"); assert(desc.name() == name); + // Initiate a stream of messages from the server. context = new ClientContext(); rpcClient->async()->ConnectToClientChannel(context, &desc, this); StartCall(); StartRead(&incomingMessage); } + + /// Gets called when there's a new message from the server. It'll be stored in + /// `incomingMessage`. void OnReadDone(bool ok) override { if (!ok) + // TODO: should we do something here? return; + + // Read the delivered message and push it onto the queue. const std::string &messageString = incomingMessage.data(); MessageData data(reinterpret_cast(messageString.data()), messageString.size()); messageQueue.push(data); + + // Initiate the next read. StartRead(&incomingMessage); } + + /// Disconnect this channel from the server. void disconnect() override { if (!context) return; @@ -264,28 +288,32 @@ class ReadCosimChannelPort delete context; context = nullptr; } - bool read(MessageData &) override; + + /// Poll the queue. + bool read(MessageData &data) override { + std::optional msg = messageQueue.pop(); + if (!msg.has_value()) + return false; + data = std::move(*msg); + return true; + } protected: ChannelServer::Stub *rpcClient; + /// The channel description as provided by the server. ChannelDesc desc; + /// The name of the channel from the manifest. string name; ClientContext *context; + /// Storage location for the incoming message. esi::cosim::Message incomingMessage; + /// Queue of messages read from the server. esi::utils::TSQueue messageQueue; }; } // namespace -bool ReadCosimChannelPort::read(MessageData &data) { - std::optional msg = messageQueue.pop(); - if (!msg.has_value()) - return false; - data = *msg; - return true; -} - map CosimAccelerator::requestChannelsFor(AppIDPath idPath, const BundleType *bundleType) { @@ -326,6 +354,9 @@ CosimAccelerator::requestChannelsFor(AppIDPath idPath, return channelResults; } +/// Get the channel description for a channel name. Iterate through the list +/// each time. Since this will only be called a small number of times on a small +/// list, it's not worth doing anything fancy. bool CosimAccelerator::getChannelDesc(const string &channelName, ChannelDesc &desc) { ClientContext context; @@ -368,7 +399,7 @@ Service *CosimAccelerator::createService(Service::Type svcType, } else if (svcType == typeid(SysInfo)) { switch (manifestMethod) { case ManifestMethod::Cosim: - return new CosimSysInfo(rpcClient->stub); + return new CosimSysInfo(rpcClient->stub.get()); case ManifestMethod::MMIO: return new MMIOSysInfo(getService()); } diff --git a/utils/get-grpc.sh b/utils/get-grpc.sh index d2c1313e1ed2..c7391ba32bbc 100755 --- a/utils/get-grpc.sh +++ b/utils/get-grpc.sh @@ -12,7 +12,8 @@ mkdir -p "$(dirname "$BASH_SOURCE[0]")/../ext" EXT_DIR=$(cd "$(dirname "$BASH_SOURCE[0]")/../ext" && pwd) -GRPC_VER=1.64.2 +# v1.54.2 is the version in Ubuntu 22.04 +GRPC_VER=1.54.2 echo "Installing gRPC..." echo $EXT_DIR From e00212f25260633683e3ad3c6354a520f2eed584 Mon Sep 17 00:00:00 2001 From: John Demme Date: Fri, 21 Jun 2024 20:00:27 +0000 Subject: [PATCH 4/4] Morten's feedback --- docs/Dialects/ESI/cosim.md | 4 +- lib/Dialect/ESI/runtime/CMakeLists.txt | 8 ---- lib/Dialect/ESI/runtime/cosim/cosim.proto | 13 +++--- .../cosim/cosim_dpi_server/DpiEntryPoints.cpp | 18 +++----- .../cosim/include/esi/cosim/RpcServer.h | 5 ++- .../ESI/runtime/cosim/lib/RpcServer.cpp | 44 ++++++++++++------- .../ESI/runtime/cpp/include/esi/Utils.h | 2 +- .../ESI/runtime/cpp/lib/backends/Cosim.cpp | 15 ++++--- 8 files changed, 54 insertions(+), 55 deletions(-) diff --git a/docs/Dialects/ESI/cosim.md b/docs/Dialects/ESI/cosim.md index 2c0ac2b03906..2cdf1c1ac232 100644 --- a/docs/Dialects/ESI/cosim.md +++ b/docs/Dialects/ESI/cosim.md @@ -15,7 +15,9 @@ variety of languages: the Cap'nProto website lists C++, C#, Erlang, Go, Haskell, JavaScript, Ocaml, Python, and Rust as languages which support messages and RPC! -Status: **prototype** +Status: **production** +Documentation status: **out of date** needs to be updated to reflect the new RPC +interface and library. ## Usage diff --git a/lib/Dialect/ESI/runtime/CMakeLists.txt b/lib/Dialect/ESI/runtime/CMakeLists.txt index 7756371f2d0d..8b240f627995 100644 --- a/lib/Dialect/ESI/runtime/CMakeLists.txt +++ b/lib/Dialect/ESI/runtime/CMakeLists.txt @@ -163,14 +163,6 @@ if (XRT_PATH) ${ESIRuntimeLibDirs} ${XRT_PATH}/lib ) - get_filename_component(XRT_SO ${XRT_PATH}/lib/libxrt_coreutil.so REALPATH) - install(FILES - ${XRT_SO} - ${XRT_PATH}/lib/libxrt_coreutil.so.2 - ${XRT_PATH}/lib/libxrt_coreutil.so - DESTINATION lib - COMPONENT ESIRuntime - ) endif() # The core API. For now, compile the backends into it directly. diff --git a/lib/Dialect/ESI/runtime/cosim/cosim.proto b/lib/Dialect/ESI/runtime/cosim/cosim.proto index 5bbde44077e5..67683cd5d13f 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim.proto +++ b/lib/Dialect/ESI/runtime/cosim/cosim.proto @@ -1,4 +1,4 @@ -//===- DpiEntryPoints.cpp - ESI cosim DPI calls -----------------*- C++ -*-===// +//===- cosim.proto - ESI cosim RPC definitions ------------------*- C++ -*-===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. @@ -6,13 +6,10 @@ // //===----------------------------------------------------------------------===// // -// Cosim DPI function implementations. Mostly C-C++ gaskets to the C++ -// RpcServer. -// -// These function signatures were generated by an HW simulator (see dpi.h) so -// we don't change them to be more rational here. The resulting code gets -// dynamically linked in and I'm concerned about maintaining binary -// compatibility with all the simulators. +// The ESI cosimulation gRPC schema. If something (client or server) wants to +// talk to an ESI runtime, it mergely needs to implement this schema. If +// possible, however, it is encouraged to use the C++ esiruntime API as that is +// expected to be more portable and supports more than just cosim. // //===----------------------------------------------------------------------===// diff --git a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp index cd11af5bd1c8..bad8afefa05b 100644 --- a/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp +++ b/lib/Dialect/ESI/runtime/cosim/cosim_dpi_server/DpiEntryPoints.cpp @@ -120,12 +120,8 @@ DPI int sv2cCosimserverEpRegister(char *endpointId, char *fromHostTypeIdC, std::string fromHostTypeId(fromHostTypeIdC), toHostTypeId(toHostTypeIdC); // Both only one type allowed. - if (!fromHostTypeId.empty() && !toHostTypeId.empty()) { + if (!(fromHostTypeId.empty() ^ toHostTypeId.empty())) { printf("ERROR: Only one of fromHostTypeId and toHostTypeId can be set!\n"); - return -1; - } - if (fromHostTypeId.empty() && toHostTypeId.empty()) { - printf("ERROR: One of fromHostTypeId and toHostTypeId must be set!\n"); return -2; } if (readPorts.contains(endpointId)) { @@ -155,13 +151,13 @@ DPI int sv2cCosimserverEpTryGet(char *endpointId, if (server == nullptr) return -1; - auto portF = readPorts.find(endpointId); - if (portF == readPorts.end()) { + auto portIt = readPorts.find(endpointId); + if (portIt == readPorts.end()) { fprintf(stderr, "Endpoint not found in registry!\n"); return -4; } - ReadChannelPort &port = portF->second; + ReadChannelPort &port = portIt->second; MessageData msg; // Poll for a message. if (!port.read(msg)) { @@ -244,13 +240,13 @@ DPI int sv2cCosimserverEpTryPut(char *endpointId, auto blob = std::make_unique(dataVec); // Queue the blob. - auto portF = writePorts.find(endpointId); - if (portF == writePorts.end()) { + auto portIt = writePorts.find(endpointId); + if (portIt == writePorts.end()) { fprintf(stderr, "Endpoint not found in registry!\n"); return -4; } log(endpointId, true, *blob); - WriteChannelPort &port = portF->second; + WriteChannelPort &port = portIt->second; port.write(*blob); return 0; } diff --git a/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h index 7d3a54c18e87..c92671641c47 100644 --- a/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h +++ b/lib/Dialect/ESI/runtime/cosim/include/esi/cosim/RpcServer.h @@ -1,4 +1,4 @@ -//===- Server.h - Run a cosim server ----------------------------*- C++ -*-===// +//===- RpcServer.h - Run a cosim server -------------------------*- C++ -*-===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. @@ -29,7 +29,8 @@ class RpcServer { /// RPC server can be started and a connection from the client could happen /// before the manifest is set. TODO: rework the DPI API to require that the /// manifest gets set first. - void setManifest(int esiVersion, std::vector compressedManifest); + void setManifest(int esiVersion, + const std::vector &compressedManifest); /// Register a read or write port which communicates over RPC. ReadChannelPort ®isterReadPort(const std::string &name, diff --git a/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp index d13500c57d9d..dd1658ce740b 100644 --- a/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp +++ b/lib/Dialect/ESI/runtime/cosim/lib/RpcServer.cpp @@ -37,7 +37,7 @@ using grpc::StatusCode; static void writePort(uint16_t port) { // "cosim.cfg" since we may want to include other info in the future. FILE *fd = fopen("cosim.cfg", "w"); - fprintf(fd, "port: %u\n", (unsigned int)port); + fprintf(fd, "port: %u\n", static_cast(port)); fclose(fd); } @@ -56,8 +56,9 @@ class esi::cosim::RpcServer::Impl // Internal API //===--------------------------------------------------------------------===// - void setManifest(int esiVersion, std::vector compressedManifest) { - this->compressedManifest = std::move(compressedManifest); + void setManifest(int esiVersion, + const std::vector &compressedManifest) { + this->compressedManifest = compressedManifest; this->esiVersion = esiVersion; } @@ -87,8 +88,8 @@ class esi::cosim::RpcServer::Impl private: int esiVersion; std::vector compressedManifest; - std::map readPorts; - std::map writePorts; + std::map> readPorts; + std::map> writePorts; std::unique_ptr server; }; @@ -162,10 +163,6 @@ void Impl::stop() { Impl::~Impl() { if (server) stop(); - for (auto &port : readPorts) - delete port.second; - for (auto &port : writePorts) - delete port.second; } ReadChannelPort &Impl::registerReadPort(const std::string &name, @@ -195,13 +192,13 @@ ServerUnaryReactor *Impl::GetManifest(CallbackServerContext *context, ServerUnaryReactor *Impl::ListChannels(CallbackServerContext *context, const VoidMessage *, ListOfChannels *channelsOut) { - for (auto [name, port] : readPorts) { + for (auto &[name, port] : readPorts) { auto *channel = channelsOut->add_channels(); channel->set_name(name); channel->set_type(port->getType()->getID()); channel->set_dir(ChannelDesc::Direction::ChannelDesc_Direction_TO_SERVER); } - for (auto [name, port] : writePorts) { + for (auto &[name, port] : writePorts) { auto *channel = channelsOut->add_channels(); channel->set_name(name); channel->set_type(port->getType()->getID()); @@ -228,9 +225,21 @@ class RpcServerWriteReactor : public ServerWriteReactor { } ~RpcServerWriteReactor() { shutdown = true; + // Wake up the potentially sleeping thread. + sentSuccessfullyCV.notify_one(); myThread.join(); } + // Deleting 'this' from within a callback is safe since this is how gRPC tells + // us that it's released the reference. This pattern lets gRPC manage this + // object. (Though a shared pointer would be better.) It was actually copied + // from one of the gRPC examples: + // https://github.com/grpc/grpc/blob/4795c5e69b25e8c767b498bea784da0ef8c96fd5/examples/cpp/route_guide/route_guide_callback_server.cc#L120 + // The alternative is to have something else (e.g. Impl) manage this object + // and have this method tell it that gRPC is done with it and it should be + // deleted. As of now, there's no specific need for that and it adds + // additional complexity. If there is at some point in the future, change + // this. void OnDone() override { delete this; } void OnWriteDone(bool ok) override { std::scoped_lock lock(sentMutex); @@ -258,7 +267,7 @@ class RpcServerWriteReactor : public ServerWriteReactor { volatile SendStatus sentSuccessfully; std::condition_variable sentSuccessfullyCV; - volatile bool shutdown; + std::atomic shutdown; }; } // namespace @@ -284,8 +293,9 @@ void RpcServerWriteReactor::threadLoop() { std::unique_lock lock(sentMutex); sentSuccessfully = SendStatus::UnknownStatus; StartWrite(&msg); - while (!shutdown && sentSuccessfully == SendStatus::UnknownStatus) - sentSuccessfullyCV.wait_for(lock, std::chrono::milliseconds(10)); + sentSuccessfullyCV.wait(lock, [&]() { + return shutdown || sentSuccessfully != SendStatus::UnknownStatus; + }); bool ret = sentSuccessfully == SendStatus::Success; lock.unlock(); return ret; @@ -305,7 +315,7 @@ Impl::ConnectToClientChannel(CallbackServerContext *context, reactor->Finish(Status(StatusCode::NOT_FOUND, "Unknown channel")); return reactor; } - return new RpcServerWriteReactor(it->second); + return new RpcServerWriteReactor(it->second.get()); } /// When a client sends a message to a write port (a read port on this end), @@ -337,8 +347,8 @@ RpcServer::~RpcServer() { delete impl; } void RpcServer::setManifest(int esiVersion, - std::vector compressedManifest) { - impl->setManifest(esiVersion, std::move(compressedManifest)); + const std::vector &compressedManifest) { + impl->setManifest(esiVersion, compressedManifest); } ReadChannelPort &RpcServer::registerReadPort(const std::string &name, const std::string &type) { diff --git a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h index 465cfb6c9f63..0101095c84f5 100644 --- a/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h +++ b/lib/Dialect/ESI/runtime/cpp/include/esi/Utils.h @@ -42,7 +42,7 @@ class TSQueue { /// is critical that locks be obtained on this and `qM` same order in both pop /// methods. This lock should be obtained first since one of the pop methods /// must unlock `qM` then relock it. - mutable std::mutex popM; + std::mutex popM; public: /// Push onto the queue. diff --git a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp index b4d9cc391736..9f6c75a4c614 100644 --- a/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp +++ b/lib/Dialect/ESI/runtime/cpp/lib/backends/Cosim.cpp @@ -43,7 +43,7 @@ using grpc::ClientReaderWriter; using grpc::ClientWriter; using grpc::Status; -static void checkStatus(Status s, string msg) { +static void checkStatus(Status s, const string &msg) { if (!s.ok()) throw runtime_error(msg + ". Code " + to_string(s.error_code()) + ": " + s.error_message() + " (" + s.error_details() + ")"); @@ -257,8 +257,8 @@ class ReadCosimChannelPort assert(desc.name() == name); // Initiate a stream of messages from the server. - context = new ClientContext(); - rpcClient->async()->ConnectToClientChannel(context, &desc, this); + context = std::make_unique(); + rpcClient->async()->ConnectToClientChannel(context.get(), &desc, this); StartCall(); StartRead(&incomingMessage); } @@ -266,9 +266,11 @@ class ReadCosimChannelPort /// Gets called when there's a new message from the server. It'll be stored in /// `incomingMessage`. void OnReadDone(bool ok) override { - if (!ok) + if (!ok) { // TODO: should we do something here? + std::cerr << "Internal error: read failed due to not `ok`." << std::endl; return; + } // Read the delivered message and push it onto the queue. const std::string &messageString = incomingMessage.data(); @@ -285,8 +287,7 @@ class ReadCosimChannelPort if (!context) return; context->TryCancel(); - delete context; - context = nullptr; + context.reset(); } /// Poll the queue. @@ -305,7 +306,7 @@ class ReadCosimChannelPort /// The name of the channel from the manifest. string name; - ClientContext *context; + std::unique_ptr context; /// Storage location for the incoming message. esi::cosim::Message incomingMessage; /// Queue of messages read from the server.