diff --git a/doc/changelog.rst b/doc/changelog.rst index a0d41bf60..73cb7fe0b 100644 --- a/doc/changelog.rst +++ b/doc/changelog.rst @@ -12,6 +12,7 @@ This section details changes made in the development branch that have not yet be Description +- Improved performance of get, put, and copy dataset methods - Fix a bug which prevented multi-GPU model set in some cases - Streamline pipelined execution of tasks for backend database - Enhance code coverage to include all 4 languages supported by SmartRedis @@ -39,6 +40,7 @@ Description Detailed Notes +- Leveraged Redis pipelining to improve performance of get, put, and copy dataset methods (PR311_) - Redis::set_model_multigpu() will now upload the correct model to all GPUs (PR310_) - RedisCluster::_run_pipeline() will no longer unconditionally apply a retry wait before returning (PR309_) - Expand code coverage to all four languages and make the CI/CD more efficent (PR308_) @@ -68,6 +70,7 @@ Detailed Notes - Implemented support for Unix Domain Sockets, including refactorization of server address code, test cases, and check-in tests. (PR252_) - A new make target `make lib-with-fortran` now compiles the Fortran client and dataset into its own library which applications can link against (PR245_) +.. _PR311: https://github.com/CrayLabs/SmartRedis/pull/311 .. _PR310: https://github.com/CrayLabs/SmartRedis/pull/310 .. _PR309: https://github.com/CrayLabs/SmartRedis/pull/309 .. _PR308: https://github.com/CrayLabs/SmartRedis/pull/308 diff --git a/include/client.h b/include/client.h index ef787e660..59747c33d 100644 --- a/include/client.h +++ b/include/client.h @@ -1304,7 +1304,7 @@ class Client : public SRObject */ inline CommandReply _run(AddressAtCommand& cmd) { - return this->_redis_server->run(cmd); + return _redis_server->run(cmd); } /*! @@ -1314,7 +1314,7 @@ class Client : public SRObject */ inline CommandReply _run(AddressAnyCommand& cmd) { - return this->_redis_server->run(cmd); + return _redis_server->run(cmd); } /*! @@ -1324,7 +1324,7 @@ class Client : public SRObject */ inline CommandReply _run(SingleKeyCommand& cmd) { - return this->_redis_server->run(cmd); + return _redis_server->run(cmd); } /*! @@ -1334,7 +1334,7 @@ class Client : public SRObject */ inline CommandReply _run(MultiKeyCommand& cmd) { - return this->_redis_server->run(cmd); + return _redis_server->run(cmd); } /*! @@ -1344,7 +1344,7 @@ class Client : public SRObject */ inline CommandReply _run(CompoundCommand& cmd) { - return this->_redis_server->run(cmd); + return _redis_server->run(cmd); } /*! @@ -1354,7 +1354,7 @@ class Client : public SRObject */ inline std::vector _run(CommandList& cmd_list) { - return this->_redis_server->run(cmd_list); + return _redis_server->run(cmd_list); } /*! @@ -1415,20 +1415,21 @@ class Client : public SRObject const int start_index, const int end_index); + + // Add a retrieved tensor to a dataset /*! - * \brief Retrieve a tensor and add it to the dataset object - * \param dataset The dataset which will be augmented with the - * retrieved tensor - * \param name The name (not key) of the tensor to retrieve and add + * \brief Add a tensor retrieved via get_tensor() to a dataset + * \param dataset The dataset which will receive the tensor + * \param name The name by which the tensor shall be added * to the dataset - * \param key The key (not name) of the tensor to retrieve and add - * to the dataset - * \throw SmartRedis::Exception if retrieval or addition - * of tensor fails - */ - inline void _get_and_add_dataset_tensor(DataSet& dataset, - const std::string& name, - const std::string& key); + * \param tensor_data get_tensor command reply containing + * tensor data + * \throw SmartRedis::Exception if addition of tensor fails + */ + inline void _add_dataset_tensor( + DataSet& dataset, + const std::string& name, + CommandReply tensor_data); /*! * \brief Retrieve the tensor from the DataSet and return diff --git a/include/redis.h b/include/redis.h index a31869c63..2a8efc25a 100644 --- a/include/redis.h +++ b/include/redis.h @@ -220,6 +220,15 @@ class Redis : public RedisServer */ virtual CommandReply get_tensor(const std::string& key); + /*! + * \brief Get a list of Tensor from the server + * \param keys The keys of the tensor to retrieve + * \returns The PipelineReply from executing the get tensor commands + * \throw SmartRedis::Exception if tensor retrieval fails + */ + virtual PipelineReply get_tensors( + const std::vector& keys); + /*! * \brief Rename a tensor in the database * \param key The original key for the tensor @@ -496,6 +505,14 @@ class Redis : public RedisServer const std::string& key, const bool reset_stat); + /*! + * \brief Run a CommandList via a Pipeline + * \param cmdlist The list of commands to run + * \returns The PipelineReply with the result of command execution + * \throw SmartRedis::Exception if execution fails + */ + PipelineReply run_in_pipeline(CommandList& cmdlist); + /*! * \brief Create a string representation of the Redis connection * \returns A string representation of the Redis connection @@ -512,8 +529,7 @@ class Redis : public RedisServer /*! * \brief Run a Command on the server * \param cmd The Command to run - * \returns The CommandReply from the - * command execution + * \returns The CommandReply from the command execution * \throw SmartRedis::Exception if command execution fails */ inline CommandReply _run(const Command& cmd); @@ -530,6 +546,15 @@ class Redis : public RedisServer * \throw SmartRedis::Exception if connection fails */ inline void _connect(SRAddress& db_address); + + /*! + * \brief Pipeline execute a series of commands + * \param cmds The commands to execute + * \returns Pipeline reply from the command execution + * \throw SmartRedis::Exception if command execution fails + */ + PipelineReply _run_pipeline(std::vector& cmds); + }; } // namespace SmartRedis diff --git a/include/rediscluster.h b/include/rediscluster.h index feefff9da..bf390472d 100644 --- a/include/rediscluster.h +++ b/include/rediscluster.h @@ -231,6 +231,16 @@ class RedisCluster : public RedisServer virtual CommandReply get_tensor(const std::string& key); /*! + * \brief Get a list of Tensor from the server. All tensors + * must be on the same node + * \param keys The keys of the tensor to retrieve + * \returns The PipelineReply from executing the get tensor commands + * \throw SmartRedis::Exception if tensor retrieval fails + */ + virtual PipelineReply get_tensors( + const std::vector& keys); + + /*! * \brief Rename a tensor in the database * \param key The original key for the tensor * \param new_key The new key for the tensor @@ -518,6 +528,15 @@ class RedisCluster : public RedisServer const std::string& key, const bool reset_stat); + /*! + * \brief Run a CommandList via a Pipeline. + * All commands must go to the same shard + * \param cmdlist The list of commands to run + * \returns The PipelineReply with the result of command execution + * \throw SmartRedis::Exception if execution fails + */ + PipelineReply run_in_pipeline(CommandList& cmdlist); + /*! * \brief Create a string representation of the Redis connection * \returns A string representation of the Redis connection diff --git a/include/redisserver.h b/include/redisserver.h index ec4d525a0..0701c28f2 100644 --- a/include/redisserver.h +++ b/include/redisserver.h @@ -211,6 +211,16 @@ class RedisServer { */ virtual CommandReply get_tensor(const std::string& key) = 0; + /*! + * \brief Get a list of Tensor from the server. For clustered + * servers, all tensors must be on the same node + * \param keys The keys of the tensor to retrieve + * \returns The PipelineReply from executing the get tensor commands + * \throw SmartRedis::Exception if tensor retrieval fails + */ + virtual PipelineReply get_tensors( + const std::vector& keys) = 0; + /*! * \brief Rename a tensor in the database * \param key The original key for the tensor @@ -511,6 +521,15 @@ class RedisServer { const std::string& key, const bool reset_stat) = 0; + /*! + * \brief Run a CommandList via a Pipeline. For clustered databases + * all commands must go to the same shard + * \param cmdlist The list of commands to run + * \returns The PipelineReply with the result of command execution + * \throw SmartRedis::Exception if execution fails + */ + virtual PipelineReply run_in_pipeline(CommandList& cmdlist) = 0; + /*! * \brief Create a string representation of the Redis connection * \returns A string representation of the Redis connection diff --git a/src/cpp/client.cpp b/src/cpp/client.cpp index dc7172346..0f64895a7 100644 --- a/src/cpp/client.cpp +++ b/src/cpp/client.cpp @@ -27,6 +27,7 @@ */ #include +#include #include "client.h" #include "srexception.h" #include "logger.h" @@ -88,7 +89,7 @@ void Client::put_dataset(DataSet& dataset) _append_dataset_metadata_commands(cmds, dataset); _append_dataset_tensor_commands(cmds, dataset); _append_dataset_ack_command(cmds, dataset); - _run(cmds); + _redis_server->run_in_pipeline(cmds); } // Retrieve a DataSet object from the database @@ -109,15 +110,25 @@ DataSet Client::get_dataset(const std::string& name) DataSet dataset(name); _unpack_dataset_metadata(dataset, reply); + // Build the tensor keys std::vector tensor_names = dataset.get_tensor_names(); - - // Retrieve DataSet tensors and fill the DataSet object - for(size_t i = 0; i < tensor_names.size(); i++) { - // Build the tensor key - std::string tensor_key = - _build_dataset_tensor_key(name, tensor_names[i], true); - // Retrieve tensor and add it to the dataset - _get_and_add_dataset_tensor(dataset, tensor_names[i], tensor_key); + if (tensor_names.size() == 0) + return dataset; // If no tensors, we're done + std::vector tensor_keys; + std::transform( + tensor_names.cbegin(), + tensor_names.cend(), + std::back_inserter(tensor_keys), + [this, name](std::string s){ + return _build_dataset_tensor_key(name, s, true); + }); + + // Retrieve DataSet tensors + PipelineReply tensors = _redis_server->get_tensors(tensor_keys); + + // Put them into the dataset + for (size_t i = 0; i < tensor_names.size(); i++) { + _add_dataset_tensor(dataset, tensor_names[i], tensors[i]); } return dataset; @@ -167,7 +178,7 @@ void Client::copy_dataset(const std::string& src_name, CommandList put_meta_cmds; _append_dataset_metadata_commands(put_meta_cmds, dataset); _append_dataset_ack_command(put_meta_cmds, dataset); - (void)_run(put_meta_cmds); + (void)_redis_server->run_in_pipeline(put_meta_cmds); } // Delete a DataSet from the database. @@ -1422,7 +1433,7 @@ void Client::copy_list(const std::string& src_name, copy_cmd.add_field_ptr(reply[i].str(), reply[i].str_len()); } - CommandReply copy_reply = this->_run(copy_cmd); + CommandReply copy_reply = _run(copy_cmd); if (reply.has_error() > 0) throw SRRuntimeException("Dataset aggregation list copy " @@ -1652,18 +1663,16 @@ inline CommandReply Client::_get_dataset_metadata(const std::string& name) return _run(cmd); } -// Retrieve a tensor and add it to the dataset -inline void Client::_get_and_add_dataset_tensor(DataSet& dataset, - const std::string& name, - const std::string& key) +// Add a retrieved tensor to a dataset +inline void Client::_add_dataset_tensor( + DataSet& dataset, + const std::string& name, + CommandReply tensor_data) { - // Run tensor retrieval command - CommandReply reply = _redis_server->get_tensor(key); - // Extract tensor properties from command reply - std::vector reply_dims = GetTensorCommand::get_dims(reply); - std::string_view blob = GetTensorCommand::get_data_blob(reply); - SRTensorType type = GetTensorCommand::get_data_type(reply); + std::vector reply_dims = GetTensorCommand::get_dims(tensor_data); + std::string_view blob = GetTensorCommand::get_data_blob(tensor_data); + SRTensorType type = GetTensorCommand::get_data_type(tensor_data); // Add tensor to the dataset dataset._add_to_tensorpack(name, (void*)blob.data(), reply_dims, diff --git a/src/cpp/redis.cpp b/src/cpp/redis.cpp index e6f56aa04..22302312c 100644 --- a/src/cpp/redis.cpp +++ b/src/cpp/redis.cpp @@ -192,6 +192,22 @@ CommandReply Redis::get_tensor(const std::string& key) return run(cmd); } +// Get a list of Tensor from the server +PipelineReply Redis::get_tensors(const std::vector& keys) +{ + // Build up the commands to get the tensors + CommandList cmdlist; // This just holds the memory + std::vector cmds; + for (auto it = keys.begin(); it != keys.end(); ++it) { + GetTensorCommand* cmd = cmdlist.add_command(); + (*cmd) << "AI.TENSORGET" << Keyfield(*it) << "META" << "BLOB"; + cmds.push_back(cmd); + } + + // Run them via pipeline + return _run_pipeline(cmds); +} + // Rename a tensor in the database CommandReply Redis::rename_tensor(const std::string& key, const std::string& new_key) @@ -725,6 +741,93 @@ inline void Redis::_connect(SRAddress& db_address) std::to_string(_connection_attempts) + "tries"); } +// Run a CommandList via a Pipeline +PipelineReply Redis::run_in_pipeline(CommandList& cmdlist) +{ + // Convert from CommandList to vector + std::vector cmds; + for (auto it = cmdlist.begin(); it != cmdlist.end(); ++it) { + cmds.push_back(*it); + } + + // Run the commands + return _run_pipeline(cmds); +} + +// Build and run unordered pipeline +PipelineReply Redis::_run_pipeline(std::vector& cmds) +{ + PipelineReply reply; + for (int i = 1; i <= _command_attempts; i++) { + try { + // Get pipeline object for shard (no new connection) + auto pipeline = _redis->pipeline(false); + + // Loop over all commands and add to the pipeline + for (size_t i = 0; i < cmds.size(); i++) { + // Add the commands to the pipeline + pipeline.command(cmds[i]->cbegin(), cmds[i]->cend()); + } + + // Execute the pipeline + reply = pipeline.exec(); + + // Check the replies + if (reply.has_error()) { + throw SRRuntimeException("Redis failed to execute the pipeline"); + } + + // If we get here, it all worked + return reply; + } + catch (SmartRedis::Exception& e) { + // Exception is already prepared, just propagate it + throw; + } + catch (sw::redis::IoError &e) { + // For an error from Redis, retry unless we're out of chances + if (i == _command_attempts) { + throw SRDatabaseException( + std::string("Redis IO error when executing the pipeline: ") + + e.what()); + } + // else, Fall through for a retry + } + catch (sw::redis::ClosedError &e) { + // For an error from Redis, retry unless we're out of chances + if (i == _command_attempts) { + throw SRDatabaseException( + std::string("Redis Closed error when executing the "\ + "pipeline: ") + e.what()); + } + // else, Fall through for a retry + } + catch (sw::redis::Error &e) { + // For other errors from Redis, report them immediately + throw SRRuntimeException( + std::string("Redis error when executing the pipeline: ") + + e.what()); + } + catch (std::exception& e) { + // Should never hit this, so bail immediately if we do + throw SRInternalException( + std::string("Unexpected exception executing the pipeline: ") + + e.what()); + } + catch (...) { + // Should never hit this, so bail immediately if we do + throw SRInternalException( + "Non-standard exception encountered executing the pipeline"); + } + + // Sleep before the next attempt + std::this_thread::sleep_for(std::chrono::milliseconds(_command_interval)); + } + + // If we get here, we've run out of retry attempts + throw SRTimeoutException("Unable to execute pipeline"); +} + // Create a string representation of the Redis connection std::string Redis::to_string() const { diff --git a/src/cpp/rediscluster.cpp b/src/cpp/rediscluster.cpp index dea2ff719..04c3e76a1 100644 --- a/src/cpp/rediscluster.cpp +++ b/src/cpp/rediscluster.cpp @@ -386,6 +386,26 @@ CommandReply RedisCluster::get_tensor(const std::string& key) return run(cmd); } +// Get a list of Tensor from the server +PipelineReply RedisCluster::get_tensors(const std::vector& keys) +{ + // Build up the commands to get the tensors + CommandList cmdlist; // This just holds the memory + std::vector cmds; + for (auto it = keys.begin(); it != keys.end(); ++it) { + GetTensorCommand* cmd = cmdlist.add_command(); + (*cmd) << "AI.TENSORGET" << Keyfield(*it) << "META" << "BLOB"; + cmds.push_back(cmd); + } + + // Get the shard index for the first key + size_t db_index = _get_db_node_index(keys[0]); + std::string shard_prefix = _db_nodes[db_index].prefix; + + // Run them via pipeline + return _run_pipeline(cmds, shard_prefix); +} + // Rename a tensor in the database CommandReply RedisCluster::rename_tensor(const std::string& key, const std::string& new_key) @@ -1344,10 +1364,30 @@ DBNode* RedisCluster::_get_model_script_db(const std::string& name, return db; } +// Run a CommandList via a Pipeline +PipelineReply RedisCluster::run_in_pipeline(CommandList& cmdlist) +{ + // Convert from CommandList to vector and grab the shard along + // the way + std::vector cmds; + std::string shard_prefix = _db_nodes[0].prefix; + bool shard_found = false; + for (auto it = cmdlist.begin(); it != cmdlist.end(); ++it) { + cmds.push_back(*it); + if (!shard_found && (*it)->has_keys()) { + shard_prefix = _get_db_node_prefix(*(*it)); + shard_found = true; + } + } + + // Run the commands + return _run_pipeline(cmds, shard_prefix); +} + // Build and run unordered pipeline -PipelineReply -RedisCluster::_run_pipeline(std::vector& cmds, - std::string& shard_prefix) +PipelineReply RedisCluster::_run_pipeline( + std::vector& cmds, + std::string& shard_prefix) { PipelineReply reply; for (int i = 1; i <= _command_attempts; i++) {