Skip to content

Commit

Permalink
Pipeline dataset operations (#311)
Browse files Browse the repository at this point in the history
Add pipelining support for put, get, and copy dataset operations
[ committed by @billschereriii ]
[ reviewed by @mellis13  ]
  • Loading branch information
billschereriii authored Mar 13, 2023
1 parent 188c12b commit 68a7a84
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 44 deletions.
3 changes: 3 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This section details changes made in the development branch that have not yet be

Description

- Improved performance of get, put, and copy dataset methods
- Fix a bug which prevented multi-GPU model set in some cases
- Streamline pipelined execution of tasks for backend database
- Enhance code coverage to include all 4 languages supported by SmartRedis
Expand Down Expand Up @@ -39,6 +40,7 @@ Description

Detailed Notes

- Leveraged Redis pipelining to improve performance of get, put, and copy dataset methods (PR311_)
- Redis::set_model_multigpu() will now upload the correct model to all GPUs (PR310_)
- RedisCluster::_run_pipeline() will no longer unconditionally apply a retry wait before returning (PR309_)
- Expand code coverage to all four languages and make the CI/CD more efficent (PR308_)
Expand Down Expand Up @@ -68,6 +70,7 @@ Detailed Notes
- Implemented support for Unix Domain Sockets, including refactorization of server address code, test cases, and check-in tests. (PR252_)
- A new make target `make lib-with-fortran` now compiles the Fortran client and dataset into its own library which applications can link against (PR245_)

.. _PR311: https://github.com/CrayLabs/SmartRedis/pull/311
.. _PR310: https://github.com/CrayLabs/SmartRedis/pull/310
.. _PR309: https://github.com/CrayLabs/SmartRedis/pull/309
.. _PR308: https://github.com/CrayLabs/SmartRedis/pull/308
Expand Down
37 changes: 19 additions & 18 deletions include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ class Client : public SRObject
*/
inline CommandReply _run(AddressAtCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1314,7 +1314,7 @@ class Client : public SRObject
*/
inline CommandReply _run(AddressAnyCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1324,7 +1324,7 @@ class Client : public SRObject
*/
inline CommandReply _run(SingleKeyCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1334,7 +1334,7 @@ class Client : public SRObject
*/
inline CommandReply _run(MultiKeyCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1344,7 +1344,7 @@ class Client : public SRObject
*/
inline CommandReply _run(CompoundCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1354,7 +1354,7 @@ class Client : public SRObject
*/
inline std::vector<CommandReply> _run(CommandList& cmd_list)
{
return this->_redis_server->run(cmd_list);
return _redis_server->run(cmd_list);
}

/*!
Expand Down Expand Up @@ -1415,20 +1415,21 @@ class Client : public SRObject
const int start_index,
const int end_index);


// Add a retrieved tensor to a dataset
/*!
* \brief Retrieve a tensor and add it to the dataset object
* \param dataset The dataset which will be augmented with the
* retrieved tensor
* \param name The name (not key) of the tensor to retrieve and add
* \brief Add a tensor retrieved via get_tensor() to a dataset
* \param dataset The dataset which will receive the tensor
* \param name The name by which the tensor shall be added
* to the dataset
* \param key The key (not name) of the tensor to retrieve and add
* to the dataset
* \throw SmartRedis::Exception if retrieval or addition
* of tensor fails
*/
inline void _get_and_add_dataset_tensor(DataSet& dataset,
const std::string& name,
const std::string& key);
* \param tensor_data get_tensor command reply containing
* tensor data
* \throw SmartRedis::Exception if addition of tensor fails
*/
inline void _add_dataset_tensor(
DataSet& dataset,
const std::string& name,
CommandReply tensor_data);

/*!
* \brief Retrieve the tensor from the DataSet and return
Expand Down
29 changes: 27 additions & 2 deletions include/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ class Redis : public RedisServer
*/
virtual CommandReply get_tensor(const std::string& key);

/*!
* \brief Get a list of Tensor from the server
* \param keys The keys of the tensor to retrieve
* \returns The PipelineReply from executing the get tensor commands
* \throw SmartRedis::Exception if tensor retrieval fails
*/
virtual PipelineReply get_tensors(
const std::vector<std::string>& keys);

/*!
* \brief Rename a tensor in the database
* \param key The original key for the tensor
Expand Down Expand Up @@ -496,6 +505,14 @@ class Redis : public RedisServer
const std::string& key,
const bool reset_stat);

/*!
* \brief Run a CommandList via a Pipeline
* \param cmdlist The list of commands to run
* \returns The PipelineReply with the result of command execution
* \throw SmartRedis::Exception if execution fails
*/
PipelineReply run_in_pipeline(CommandList& cmdlist);

/*!
* \brief Create a string representation of the Redis connection
* \returns A string representation of the Redis connection
Expand All @@ -512,8 +529,7 @@ class Redis : public RedisServer
/*!
* \brief Run a Command on the server
* \param cmd The Command to run
* \returns The CommandReply from the
* command execution
* \returns The CommandReply from the command execution
* \throw SmartRedis::Exception if command execution fails
*/
inline CommandReply _run(const Command& cmd);
Expand All @@ -530,6 +546,15 @@ class Redis : public RedisServer
* \throw SmartRedis::Exception if connection fails
*/
inline void _connect(SRAddress& db_address);

/*!
* \brief Pipeline execute a series of commands
* \param cmds The commands to execute
* \returns Pipeline reply from the command execution
* \throw SmartRedis::Exception if command execution fails
*/
PipelineReply _run_pipeline(std::vector<Command*>& cmds);

};

} // namespace SmartRedis
Expand Down
19 changes: 19 additions & 0 deletions include/rediscluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ class RedisCluster : public RedisServer
virtual CommandReply get_tensor(const std::string& key);

/*!
* \brief Get a list of Tensor from the server. All tensors
* must be on the same node
* \param keys The keys of the tensor to retrieve
* \returns The PipelineReply from executing the get tensor commands
* \throw SmartRedis::Exception if tensor retrieval fails
*/
virtual PipelineReply get_tensors(
const std::vector<std::string>& keys);

/*!
* \brief Rename a tensor in the database
* \param key The original key for the tensor
* \param new_key The new key for the tensor
Expand Down Expand Up @@ -518,6 +528,15 @@ class RedisCluster : public RedisServer
const std::string& key,
const bool reset_stat);

/*!
* \brief Run a CommandList via a Pipeline.
* All commands must go to the same shard
* \param cmdlist The list of commands to run
* \returns The PipelineReply with the result of command execution
* \throw SmartRedis::Exception if execution fails
*/
PipelineReply run_in_pipeline(CommandList& cmdlist);

/*!
* \brief Create a string representation of the Redis connection
* \returns A string representation of the Redis connection
Expand Down
19 changes: 19 additions & 0 deletions include/redisserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ class RedisServer {
*/
virtual CommandReply get_tensor(const std::string& key) = 0;

/*!
* \brief Get a list of Tensor from the server. For clustered
* servers, all tensors must be on the same node
* \param keys The keys of the tensor to retrieve
* \returns The PipelineReply from executing the get tensor commands
* \throw SmartRedis::Exception if tensor retrieval fails
*/
virtual PipelineReply get_tensors(
const std::vector<std::string>& keys) = 0;

/*!
* \brief Rename a tensor in the database
* \param key The original key for the tensor
Expand Down Expand Up @@ -511,6 +521,15 @@ class RedisServer {
const std::string& key,
const bool reset_stat) = 0;

/*!
* \brief Run a CommandList via a Pipeline. For clustered databases
* all commands must go to the same shard
* \param cmdlist The list of commands to run
* \returns The PipelineReply with the result of command execution
* \throw SmartRedis::Exception if execution fails
*/
virtual PipelineReply run_in_pipeline(CommandList& cmdlist) = 0;

/*!
* \brief Create a string representation of the Redis connection
* \returns A string representation of the Redis connection
Expand Down
51 changes: 30 additions & 21 deletions src/cpp/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*/

#include <ctype.h>
#include <algorithm>
#include "client.h"
#include "srexception.h"
#include "logger.h"
Expand Down Expand Up @@ -88,7 +89,7 @@ void Client::put_dataset(DataSet& dataset)
_append_dataset_metadata_commands(cmds, dataset);
_append_dataset_tensor_commands(cmds, dataset);
_append_dataset_ack_command(cmds, dataset);
_run(cmds);
_redis_server->run_in_pipeline(cmds);
}

// Retrieve a DataSet object from the database
Expand All @@ -109,15 +110,25 @@ DataSet Client::get_dataset(const std::string& name)
DataSet dataset(name);
_unpack_dataset_metadata(dataset, reply);

// Build the tensor keys
std::vector<std::string> tensor_names = dataset.get_tensor_names();

// Retrieve DataSet tensors and fill the DataSet object
for(size_t i = 0; i < tensor_names.size(); i++) {
// Build the tensor key
std::string tensor_key =
_build_dataset_tensor_key(name, tensor_names[i], true);
// Retrieve tensor and add it to the dataset
_get_and_add_dataset_tensor(dataset, tensor_names[i], tensor_key);
if (tensor_names.size() == 0)
return dataset; // If no tensors, we're done
std::vector<std::string> tensor_keys;
std::transform(
tensor_names.cbegin(),
tensor_names.cend(),
std::back_inserter(tensor_keys),
[this, name](std::string s){
return _build_dataset_tensor_key(name, s, true);
});

// Retrieve DataSet tensors
PipelineReply tensors = _redis_server->get_tensors(tensor_keys);

// Put them into the dataset
for (size_t i = 0; i < tensor_names.size(); i++) {
_add_dataset_tensor(dataset, tensor_names[i], tensors[i]);
}

return dataset;
Expand Down Expand Up @@ -167,7 +178,7 @@ void Client::copy_dataset(const std::string& src_name,
CommandList put_meta_cmds;
_append_dataset_metadata_commands(put_meta_cmds, dataset);
_append_dataset_ack_command(put_meta_cmds, dataset);
(void)_run(put_meta_cmds);
(void)_redis_server->run_in_pipeline(put_meta_cmds);
}

// Delete a DataSet from the database.
Expand Down Expand Up @@ -1422,7 +1433,7 @@ void Client::copy_list(const std::string& src_name,
copy_cmd.add_field_ptr(reply[i].str(), reply[i].str_len());
}

CommandReply copy_reply = this->_run(copy_cmd);
CommandReply copy_reply = _run(copy_cmd);

if (reply.has_error() > 0)
throw SRRuntimeException("Dataset aggregation list copy "
Expand Down Expand Up @@ -1652,18 +1663,16 @@ inline CommandReply Client::_get_dataset_metadata(const std::string& name)
return _run(cmd);
}

// Retrieve a tensor and add it to the dataset
inline void Client::_get_and_add_dataset_tensor(DataSet& dataset,
const std::string& name,
const std::string& key)
// Add a retrieved tensor to a dataset
inline void Client::_add_dataset_tensor(
DataSet& dataset,
const std::string& name,
CommandReply tensor_data)
{
// Run tensor retrieval command
CommandReply reply = _redis_server->get_tensor(key);

// Extract tensor properties from command reply
std::vector<size_t> reply_dims = GetTensorCommand::get_dims(reply);
std::string_view blob = GetTensorCommand::get_data_blob(reply);
SRTensorType type = GetTensorCommand::get_data_type(reply);
std::vector<size_t> reply_dims = GetTensorCommand::get_dims(tensor_data);
std::string_view blob = GetTensorCommand::get_data_blob(tensor_data);
SRTensorType type = GetTensorCommand::get_data_type(tensor_data);

// Add tensor to the dataset
dataset._add_to_tensorpack(name, (void*)blob.data(), reply_dims,
Expand Down
Loading

0 comments on commit 68a7a84

Please sign in to comment.