Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline dataset operations #311

Merged
merged 8 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions include/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ class Client : public SRObject
*/
inline CommandReply _run(AddressAtCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1314,7 +1314,7 @@ class Client : public SRObject
*/
inline CommandReply _run(AddressAnyCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1324,7 +1324,7 @@ class Client : public SRObject
*/
inline CommandReply _run(SingleKeyCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1334,7 +1334,7 @@ class Client : public SRObject
*/
inline CommandReply _run(MultiKeyCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1344,7 +1344,7 @@ class Client : public SRObject
*/
inline CommandReply _run(CompoundCommand& cmd)
{
return this->_redis_server->run(cmd);
return _redis_server->run(cmd);
}

/*!
Expand All @@ -1354,7 +1354,7 @@ class Client : public SRObject
*/
inline std::vector<CommandReply> _run(CommandList& cmd_list)
{
return this->_redis_server->run(cmd_list);
return _redis_server->run(cmd_list);
}

/*!
Expand Down Expand Up @@ -1415,20 +1415,20 @@ class Client : public SRObject
const int start_index,
const int end_index);


// Add a retrieved tensor to a dataset
/*!
* \brief Retrieve a tensor and add it to the dataset object
* \param dataset The dataset which will be augmented with the
* retrieved tensor
* \param name The name (not key) of the tensor to retrieve and add
* \brief Add a tensor retrieved via get_tensor() to a dataset
* \param dataset The dataset which will receive the tensor
* \param name The name by which the tensor shall be added
* to the dataset
* \param key The key (not name) of the tensor to retrieve and add
* to the dataset
* \throw SmartRedis::Exception if retrieval or addition
* of tensor fails
*/
inline void _get_and_add_dataset_tensor(DataSet& dataset,
const std::string& name,
const std::string& key);
* \param data Result of a get_tensor command containing tensor data
* \throw SmartRedis::Exception if addition of tensor fails
*/
inline void _add_dataset_tensor(
DataSet& dataset,
const std::string& name,
CommandReply data);

/*!
* \brief Retrieve the tensor from the DataSet and return
Expand Down
29 changes: 27 additions & 2 deletions include/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ class Redis : public RedisServer
*/
virtual CommandReply get_tensor(const std::string& key);

/*!
* \brief Get a list of Tensor from the server
* \param keys The keys of the tensor to retrieve
* \returns The PipelineReply from executing the get tensor commands
* \throw SmartRedis::Exception if tensor retrieval fails
*/
virtual PipelineReply get_tensors(
const std::vector<std::string>& keys);

/*!
* \brief Rename a tensor in the database
* \param key The original key for the tensor
Expand Down Expand Up @@ -496,6 +505,14 @@ class Redis : public RedisServer
const std::string& key,
const bool reset_stat);

/*!
* \brief Run a CommandList via a Pipeline
* \param cmdlist The list of commands to run
* \returns The PipelineReply with the result of command execution
* \throw SmartRedis::Exception if execution fails
*/
PipelineReply run_in_pipeline(CommandList& cmdlist);

/*!
* \brief Create a string representation of the Redis connection
* \returns A string representation of the Redis connection
Expand All @@ -512,8 +529,7 @@ class Redis : public RedisServer
/*!
* \brief Run a Command on the server
* \param cmd The Command to run
* \returns The CommandReply from the
* command execution
* \returns The CommandReply from the command execution
* \throw SmartRedis::Exception if command execution fails
*/
inline CommandReply _run(const Command& cmd);
Expand All @@ -530,6 +546,15 @@ class Redis : public RedisServer
* \throw SmartRedis::Exception if connection fails
*/
inline void _connect(SRAddress& db_address);

/*!
* \brief Pipeline execute a series of commands
* \param cms The commands to execute
* \returns Pipeline reply from the command execution
* \throw SmartRedis::Exception if command execution fails
*/
PipelineReply _run_pipeline(std::vector<Command*>& cmds);

};

} // namespace SmartRedis
Expand Down
19 changes: 19 additions & 0 deletions include/rediscluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ class RedisCluster : public RedisServer
virtual CommandReply get_tensor(const std::string& key);

/*!
* \brief Get a list of Tensor from the server. All tensors
* must be on the same node
* \param keys The keys of the tensor to retrieve
* \returns The PipelineReply from executing the get tensor commands
* \throw SmartRedis::Exception if tensor retrieval fails
*/
virtual PipelineReply get_tensors(
const std::vector<std::string>& keys);

/*!
* \brief Rename a tensor in the database
* \param key The original key for the tensor
* \param new_key The new key for the tensor
Expand Down Expand Up @@ -518,6 +528,15 @@ class RedisCluster : public RedisServer
const std::string& key,
const bool reset_stat);

/*!
* \brief Run a CommandList via a Pipeline.
* All commands must go to the same shard
* \param cmdlist The list of commands to run
* \returns The PipelineReply with the result of command execution
* \throw SmartRedis::Exception if execution fails
*/
PipelineReply run_in_pipeline(CommandList& cmdlist);

/*!
* \brief Create a string representation of the Redis connection
* \returns A string representation of the Redis connection
Expand Down
19 changes: 19 additions & 0 deletions include/redisserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ class RedisServer {
*/
virtual CommandReply get_tensor(const std::string& key) = 0;

/*!
* \brief Get a list of Tensor from the server. For clustered
* servers, all tensors must be on the same node
* \param keys The keys of the tensor to retrieve
* \returns The PipelineReply from executing the get tensor commands
* \throw SmartRedis::Exception if tensor retrieval fails
*/
virtual PipelineReply get_tensors(
const std::vector<std::string>& keys) = 0;

/*!
* \brief Rename a tensor in the database
* \param key The original key for the tensor
Expand Down Expand Up @@ -511,6 +521,15 @@ class RedisServer {
const std::string& key,
const bool reset_stat) = 0;

/*!
* \brief Run a CommandList via a Pipeline. For clustered databases
* all commands must go to the same shard
* \param cmdlist The list of commands to run
* \returns The PipelineReply with the result of command execution
* \throw SmartRedis::Exception if execution fails
*/
virtual PipelineReply run_in_pipeline(CommandList& cmdlist) = 0;

/*!
* \brief Create a string representation of the Redis connection
* \returns A string representation of the Redis connection
Expand Down
51 changes: 30 additions & 21 deletions src/cpp/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*/

#include <ctype.h>
#include <algorithm>
#include "client.h"
#include "srexception.h"
#include "logger.h"
Expand Down Expand Up @@ -88,7 +89,7 @@ void Client::put_dataset(DataSet& dataset)
_append_dataset_metadata_commands(cmds, dataset);
_append_dataset_tensor_commands(cmds, dataset);
_append_dataset_ack_command(cmds, dataset);
_run(cmds);
_redis_server->run_in_pipeline(cmds);
}

// Retrieve a DataSet object from the database
Expand All @@ -109,15 +110,25 @@ DataSet Client::get_dataset(const std::string& name)
DataSet dataset(name);
_unpack_dataset_metadata(dataset, reply);

// Build the tensor keys
std::vector<std::string> tensor_names = dataset.get_tensor_names();

// Retrieve DataSet tensors and fill the DataSet object
for(size_t i = 0; i < tensor_names.size(); i++) {
// Build the tensor key
std::string tensor_key =
_build_dataset_tensor_key(name, tensor_names[i], true);
// Retrieve tensor and add it to the dataset
_get_and_add_dataset_tensor(dataset, tensor_names[i], tensor_key);
if (tensor_names.size() == 0)
return dataset; // If no tensors, we're done
std::vector<std::string> tensor_keys;
std::transform(
tensor_names.cbegin(),
tensor_names.cend(),
std::back_inserter(tensor_keys),
[this, name](std::string s){
return _build_dataset_tensor_key(name, s, true);
});

// Retrieve DataSet tensors
PipelineReply tensors = _redis_server->get_tensors(tensor_keys);

// Put them into the dataset
for (size_t i = 0; i < tensor_names.size(); i++) {
_add_dataset_tensor(dataset, tensor_names[i], tensors[i]);
}

return dataset;
Expand Down Expand Up @@ -167,7 +178,7 @@ void Client::copy_dataset(const std::string& src_name,
CommandList put_meta_cmds;
_append_dataset_metadata_commands(put_meta_cmds, dataset);
_append_dataset_ack_command(put_meta_cmds, dataset);
(void)_run(put_meta_cmds);
(void)_redis_server->run_in_pipeline(put_meta_cmds);
}

// Delete a DataSet from the database.
Expand Down Expand Up @@ -1422,7 +1433,7 @@ void Client::copy_list(const std::string& src_name,
copy_cmd.add_field_ptr(reply[i].str(), reply[i].str_len());
}

CommandReply copy_reply = this->_run(copy_cmd);
CommandReply copy_reply = _run(copy_cmd);

if (reply.has_error() > 0)
throw SRRuntimeException("Dataset aggregation list copy "
Expand Down Expand Up @@ -1652,18 +1663,16 @@ inline CommandReply Client::_get_dataset_metadata(const std::string& name)
return _run(cmd);
}

// Retrieve a tensor and add it to the dataset
inline void Client::_get_and_add_dataset_tensor(DataSet& dataset,
const std::string& name,
const std::string& key)
// Add a retrieved tensor to a dataset
inline void Client::_add_dataset_tensor(
DataSet& dataset,
const std::string& name,
CommandReply data)
{
// Run tensor retrieval command
CommandReply reply = _redis_server->get_tensor(key);

// Extract tensor properties from command reply
std::vector<size_t> reply_dims = GetTensorCommand::get_dims(reply);
std::string_view blob = GetTensorCommand::get_data_blob(reply);
SRTensorType type = GetTensorCommand::get_data_type(reply);
std::vector<size_t> reply_dims = GetTensorCommand::get_dims(data);
std::string_view blob = GetTensorCommand::get_data_blob(data);
SRTensorType type = GetTensorCommand::get_data_type(data);

// Add tensor to the dataset
dataset._add_to_tensorpack(name, (void*)blob.data(), reply_dims,
Expand Down
Loading