Skip to content

Commit

Permalink
add state pagination for postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
Yurii Koba authored and Yurii Koba committed Dec 19, 2023
1 parent 81e76ab commit e9c6b3b
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 69 deletions.
2 changes: 2 additions & 0 deletions database/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct AdditionalDatabaseOptions {
pub database_name: Option<String>,
}

pub type PageToken = Option<String>;

#[async_trait::async_trait]
pub trait BaseDbManager {
async fn new(
Expand Down
4 changes: 2 additions & 2 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub trait ReaderDbManager {
async fn get_state_keys_by_page(
&self,
account_id: &near_primitives::types::AccountId,
page_state: Option<String>,
) -> anyhow::Result<(Vec<readnode_primitives::StateKey>, Option<String>)>;
page_token: crate::PageToken,
) -> anyhow::Result<(Vec<readnode_primitives::StateKey>, crate::PageToken)>;

/// Returns state keys for the given account id filtered by the given prefix
async fn get_state_keys_by_prefix(
Expand Down
2 changes: 1 addition & 1 deletion database/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod base;

pub use crate::base::AdditionalDatabaseOptions;
use crate::base::BaseDbManager;
pub use crate::base::ReaderDbManager;
pub use crate::base::StateIndexerDbManager;
pub use crate::base::TxIndexerDbManager;
pub use crate::base::{AdditionalDatabaseOptions, PageToken};

#[cfg(feature = "scylla_db")]
pub mod scylladb;
Expand Down
57 changes: 57 additions & 0 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::schema::*;
use borsh::{BorshDeserialize, BorshSerialize};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;

Expand Down Expand Up @@ -296,6 +297,28 @@ impl Chunk {
}
}

#[derive(borsh::BorshSerialize, borsh::BorshDeserialize, Clone, Debug)]
struct PageState {
page_size: i64,
offset: i64,
}

impl PageState {
fn new(page_size: i64) -> Self {
Self {
page_size,
offset: 0,
}
}

fn next_page(&self) -> Self {
Self {
page_size: self.page_size,
offset: self.offset + self.page_size,
}
}
}

#[derive(Insertable, Queryable, Selectable)]
#[diesel(table_name = account_state)]
pub struct AccountState {
Expand Down Expand Up @@ -323,6 +346,7 @@ impl AccountState {
let response = account_state::table
.filter(account_state::account_id.eq(account_id))
.select(Self::as_select())
.limit(25000)
.load(&mut conn)
.await?;

Expand All @@ -332,6 +356,39 @@ impl AccountState {
.collect())
}

pub async fn get_state_keys_by_page(
mut conn: crate::postgres::PgAsyncConn,
account_id: &str,
page_token: crate::PageToken,
) -> anyhow::Result<(Vec<String>, crate::PageToken)> {
let page_state = if let Some(page_state_token) = page_token {
PageState::try_from_slice(&hex::decode(page_state_token)?)?
} else {
PageState::new(1000)
};
let response = account_state::table
.filter(account_state::account_id.eq(account_id))
.select(Self::as_select())
.limit(page_state.page_size)
.offset(page_state.offset)
.load(&mut conn)
.await?;

let state_keys = response
.into_iter()
.map(|account_state_key| account_state_key.data_key)
.collect::<Vec<String>>();

if state_keys.len() < page_state.page_size as usize {
Ok((state_keys, None))
} else {
Ok((
state_keys,
Some(hex::encode(page_state.next_page().try_to_vec()?)),
))
}
}

pub async fn get_state_keys_by_prefix(
mut conn: crate::postgres::PgAsyncConn,
account_id: &str,
Expand Down
18 changes: 14 additions & 4 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,20 @@ impl crate::ReaderDbManager for PostgresDBManager {

async fn get_state_keys_by_page(
&self,
_account_id: &near_primitives::types::AccountId,
_page_state: Option<String>,
) -> anyhow::Result<(Vec<readnode_primitives::StateKey>, Option<String>)> {
todo!()
account_id: &near_primitives::types::AccountId,
page_token: crate::PageToken,
) -> anyhow::Result<(Vec<readnode_primitives::StateKey>, crate::PageToken)> {
let (state_keys, next_page_token) = crate::models::AccountState::get_state_keys_by_page(
Self::get_connection(&self.pg_pool).await?,
account_id,
page_token,
)
.await?;

let keys = state_keys
.into_iter()
.filter_map(|key| hex::decode(key).ok());
Ok((keys.collect(), next_page_token))
}

async fn get_state_key_value(
Expand Down
32 changes: 20 additions & 12 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ impl crate::ReaderDbManager for ScyllaDBManager {
})
}

/// Returns all state keys for the given account id
/// Returns 25000 state keys for the given account id
/// We limited the number of state keys returned because of the following reasons:
/// 1. The query is very slow and takes a lot of time to execute
/// 2. The contract state could be very large and we don't want to return all of it at once
/// To get all state keys use `get_state_keys_by_page` method
async fn get_state_keys_all(
&self,
account_id: &near_primitives::types::AccountId,
Expand All @@ -187,25 +191,29 @@ impl crate::ReaderDbManager for ScyllaDBManager {
.execute_iter(paged_query, (account_id.to_string(),))
.await?
.into_typed::<(String,)>();
let mut stata_keys = vec![];
let mut state_keys = vec![];
while let Some(next_row_res) = rows_stream.next().await {
let (value,): (String,) = next_row_res?;
stata_keys.push(hex::decode(value)?);
state_keys.push(hex::decode(value)?);
}
Ok(stata_keys)
Ok(state_keys)
}

/// Return contract state keys by page
/// The page size is 1000 keys
/// The page_token is a hex string of the scylla page_state
/// On the first call the page_token should be None
/// On the last page the page_token will be None
async fn get_state_keys_by_page(
&self,
account_id: &near_primitives::types::AccountId,
page_state: Option<String>,
) -> anyhow::Result<(Vec<readnode_primitives::StateKey>, Option<String>)> {
page_token: crate::PageToken,
) -> anyhow::Result<(Vec<readnode_primitives::StateKey>, crate::PageToken)> {
let mut paged_query = self.get_all_state_keys.clone();
paged_query.set_page_size(1000);

let result = if let Some(page_state) = page_state {
let page_bytes = hex::decode(page_state)?;
let page_state = bytes::Bytes::from(page_bytes);
let result = if let Some(page_state) = page_token {
let page_state = bytes::Bytes::from(hex::decode(page_state)?);
self.scylla_session
.execute_paged(&paged_query, (account_id.to_string(),), Some(page_state))
.await?
Expand All @@ -218,13 +226,13 @@ impl crate::ReaderDbManager for ScyllaDBManager {
.await?
};

let new_page_state = result.paging_state.as_ref().map(hex::encode);
let new_page_token = result.paging_state.as_ref().map(hex::encode);

let stata_keys = result
let state_keys = result
.rows_typed::<(String,)>()?
.filter_map(|row| row.ok().and_then(|(value,)| hex::decode(value).ok()));

Ok((stata_keys.collect(), new_page_state))
Ok((state_keys.collect(), new_page_token))
}

/// Returns state keys for the given account id filtered by the given prefix
Expand Down
14 changes: 6 additions & 8 deletions rpc-server/src/modules/queries/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ pub async fn get_state_keys_from_db(
};
match result {
Ok(state_keys) => {
// 3 nodes * 8 cpus * 100 = 2400
// TODO: 2400 is hardcoded value. Make it configurable.
for state_keys_chunk in state_keys.chunks(2400) {
// 3 nodes * 8 cpus * 100 = 2400
// TODO: 2400 is hardcoded value. Make it configurable.
let mut tasks = futures::stream::FuturesUnordered::new();
for state_key in state_keys_chunk {
let state_value_result_future =
db_manager.get_state_key_value(account_id, block_height, state_key.clone());
tasks.push(state_value_result_future);
}
let futures = state_keys_chunk.iter().map(|state_key| {
db_manager.get_state_key_value(account_id, block_height, state_key.clone())
});
let mut tasks = futures::stream::FuturesUnordered::from_iter(futures);
while let Some((state_key, state_value)) = tasks.next().await {
if !state_value.is_empty() {
data.insert(state_key, state_value);
Expand Down
8 changes: 4 additions & 4 deletions rpc-server/src/modules/state/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ pub async fn view_state_paginated(
.await
.map_err(near_jsonrpc_primitives::errors::RpcError::from)?;

let state_value = get_state_keys_from_db_paginated(
let state_values = get_state_keys_from_db_paginated(
&data.db_manager,
&params.account_id,
block.block_height,
params.next_page,
params.next_page_token,
)
.await;

Ok(crate::modules::state::RpcViewStatePaginatedResponse {
values: state_value.values,
next_page: state_value.next_page,
values: state_values.values,
next_page_token: state_values.next_page_token,
block_height: block.block_height,
block_hash: block.block_hash,
})
Expand Down
7 changes: 4 additions & 3 deletions rpc-server/src/modules/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
pub mod methods;
pub mod utils;

#[derive(Default, Debug)]
pub struct PageStateValues {
pub values: Vec<near_primitives::views::StateItem>,
pub next_page: Option<String>,
pub next_page_token: database::PageToken,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RpcViewStatePaginatedRequest {
pub account_id: near_primitives::types::AccountId,
pub block_id: near_primitives::types::BlockId,
pub next_page: Option<String>,
pub next_page_token: database::PageToken,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RpcViewStatePaginatedResponse {
pub values: Vec<near_primitives::views::StateItem>,
pub block_height: near_primitives::types::BlockHeight,
pub block_hash: near_primitives::hash::CryptoHash,
pub next_page: Option<String>,
pub next_page_token: database::PageToken,
}
64 changes: 29 additions & 35 deletions rpc-server/src/modules/state/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,41 @@ pub async fn get_state_keys_from_db_paginated(
db_manager: &std::sync::Arc<Box<dyn database::ReaderDbManager + Sync + Send + 'static>>,
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
page_state: Option<String>,
page_token: database::PageToken,
) -> crate::modules::state::PageStateValues {
tracing::debug!(
"`get_state_keys_from_db_paginated` call. AccountId {}, block {}, page_state {:?}",
"`get_state_keys_from_db_paginated` call. AccountId {}, block {}, page_token {:?}",
account_id,
block_height,
page_state,
page_token,
);
let mut data: HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue> =
HashMap::new();
let mut new_page_state = None;
let result = db_manager
.get_state_keys_by_page(account_id, page_state)
.await;
if let Ok((state_keys, page_state)) = result {
new_page_state = page_state;
for state_keys_chunk in state_keys.chunks(1000) {
// 3 nodes * 8 cpus * 100 = 2400
// TODO: 2400 is hardcoded value. Make it configurable.
let mut tasks = futures::stream::FuturesUnordered::new();
for state_key in state_keys_chunk {
let state_value_result_future =
db_manager.get_state_key_value(account_id, block_height, state_key.clone());
tasks.push(state_value_result_future);
}
while let Some((state_key, state_value)) = tasks.next().await {
if !state_value.is_empty() {
data.insert(state_key, state_value);
}
if let Ok((state_keys, next_page_token)) = db_manager
.get_state_keys_by_page(account_id, page_token)
.await
{
let futures = state_keys.iter().map(|state_key| {
db_manager.get_state_key_value(account_id, block_height, state_key.clone())
});
let mut tasks = futures::stream::FuturesUnordered::from_iter(futures);
let mut data: HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue> =
HashMap::new();
while let Some((state_key, state_value)) = tasks.next().await {
if !state_value.is_empty() {
data.insert(state_key, state_value);
}
}
}
let values = data
.into_iter()
.map(|(key, value)| near_primitives::views::StateItem {
key: key.into(),
value: value.into(),
})
.collect();
crate::modules::state::PageStateValues {
values,
next_page: new_page_state,
let values = data
.into_iter()
.map(|(key, value)| near_primitives::views::StateItem {
key: key.into(),
value: value.into(),
})
.collect();
crate::modules::state::PageStateValues {
values,
next_page_token,
}
} else {
crate::modules::state::PageStateValues::default()
}
}

0 comments on commit e9c6b3b

Please sign in to comment.