From 6af5e1796b3ac693aa66823feadbc5bc6dec1466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Tue, 18 Feb 2025 13:10:18 +0100 Subject: [PATCH 01/13] RowSerializationContext: remove `column_by_name` This method is not used anywhere, and it is not useful for serialization because columns must be serialized in the correct order anyway. --- scylla-cql/src/serialize/row.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/scylla-cql/src/serialize/row.rs b/scylla-cql/src/serialize/row.rs index d586370913..c7b966a00f 100644 --- a/scylla-cql/src/serialize/row.rs +++ b/scylla-cql/src/serialize/row.rs @@ -52,13 +52,6 @@ impl<'a> RowSerializationContext<'a> { pub fn columns(&self) -> &'a [ColumnSpec] { self.columns } - - /// Looks up and returns a column/bind marker by name. - // TODO: change RowSerializationContext to make this faster - #[inline] - pub fn column_by_name(&self, target: &str) -> Option<&ColumnSpec> { - self.columns.iter().find(|&c| c.name() == target) - } } /// Represents a set of values that can be sent along a CQL statement. From 369ee35c87cc323bbf8ac2cde48ee2b408d4dd7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Wed, 5 Feb 2025 16:32:35 +0100 Subject: [PATCH 02/13] SerializedValues::from_serializable: Relax default `Sized` bound. This function only accepts `T` through a reference, so the `T` itself doesn't need to have a known size. --- scylla-cql/src/serialize/row.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla-cql/src/serialize/row.rs b/scylla-cql/src/serialize/row.rs index c7b966a00f..d4f266113c 100644 --- a/scylla-cql/src/serialize/row.rs +++ b/scylla-cql/src/serialize/row.rs @@ -507,7 +507,7 @@ impl SerializedValues { pub const EMPTY: &'static SerializedValues = &SerializedValues::new(); /// Constructs `SerializedValues` from given [`SerializeRow`] object. - pub fn from_serializable( + pub fn from_serializable( ctx: &RowSerializationContext, row: &T, ) -> Result { From cbf73f54946d628e57f155750e591da4be285c18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 1 Mar 2025 17:45:50 +0100 Subject: [PATCH 03/13] metadata.rs: Use new error type for errors of single keyspace In the further commits we will introduce another error condition that doesn't fail the whole metadata fetch, but only a single keyspace. --- scylla/src/cluster/metadata.rs | 52 +++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index b62d605d07..db90676f95 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -67,6 +67,18 @@ type PerTable = HashMap; type PerKsTable = HashMap<(String, String), T>; type PerKsTableResult = PerKsTable>; +/// Indicates that reading metadata failed, but in a way +/// that we can handle, by throwing out data for a keyspace. +/// It is possible that some of the errors could be handled in even +/// more granular way (e.g. throwing out a single table), but keyspace +/// granularity seems like a good choice given how independent keyspaces +/// are from each other. +#[derive(Clone, Debug, Error)] +pub(crate) enum SingleKeyspaceMetadataError { + #[error(transparent)] + MissingUDT(MissingUserDefinedType), +} + /// Allows to read current metadata from the cluster pub(crate) struct MetadataReader { control_connection_pool_config: PoolConfig, @@ -92,7 +104,7 @@ pub(crate) struct MetadataReader { /// Describes all metadata retrieved from the cluster pub(crate) struct Metadata { pub(crate) peers: Vec, - pub(crate) keyspaces: HashMap>, + pub(crate) keyspaces: HashMap>, } #[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way @@ -902,7 +914,7 @@ async fn query_keyspaces( conn: &Arc, keyspaces_to_fetch: &[String], fetch_schema: bool, -) -> Result, MetadataError> { +) -> Result, MetadataError> { let rows = query_filter_keyspace_name::<(String, HashMap)>( conn, "select keyspace_name, replication from system_schema.keyspaces", @@ -953,15 +965,20 @@ async fn query_keyspaces( // As you can notice, in this file we generally operate on two layers of errors: // - Outer (MetadataError) if something went wrong with querying the cluster. - // - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata - // turned out to not be fully consistent. + // - Inner (SingleKeyspaceMetadataError) if the fetched metadata turned out to not be fully consistent. // If there is an inner error, we want to drop metadata for the whole keyspace. - // This logic checks if either tables views or UDTs have such inner error, and returns it if so. + // This logic checks if either tables, views, or UDTs have such inner error, and returns it if so. // Notice that in the error branch, return value is wrapped in `Ok` - but this is the // outer error, so it just means there was no error while querying the cluster. let (tables, views, user_defined_types) = match (tables, views, user_defined_types) { (Ok(t), Ok(v), Ok(u)) => (t, v, u), - (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => return Ok((keyspace_name, Err(e))), + (Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))), + (_, _, Err(e)) => { + return Ok(( + keyspace_name, + Err(SingleKeyspaceMetadataError::MissingUDT(e)), + )) + } }; let keyspace = Keyspace { @@ -1364,8 +1381,8 @@ mod toposort_tests { async fn query_tables( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut PerKsTableResult, -) -> Result, MissingUserDefinedType>, MetadataError> { + tables: &mut PerKsTableResult, +) -> Result, SingleKeyspaceMetadataError>, MetadataError> { let rows = query_filter_keyspace_name::<(String, String)>( conn, "SELECT keyspace_name, table_name FROM system_schema.tables", @@ -1409,8 +1426,9 @@ async fn query_tables( async fn query_views( conn: &Arc, keyspaces_to_fetch: &[String], - tables: &mut PerKsTableResult, -) -> Result, MissingUserDefinedType>, MetadataError> { + tables: &mut PerKsTableResult, +) -> Result, SingleKeyspaceMetadataError>, MetadataError> +{ let rows = query_filter_keyspace_name::<(String, String, String)>( conn, "SELECT keyspace_name, view_name, base_table_name FROM system_schema.views", @@ -1465,7 +1483,7 @@ async fn query_tables_schema( conn: &Arc, keyspaces_to_fetch: &[String], udts: &PerKeyspaceResult>>, MissingUserDefinedType>, -) -> Result, MetadataError> { +) -> Result, MetadataError> { // Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of // type EmptyType for dense tables. This resolves into this CQL type name. // This column shouldn't be exposed to the user but is currently exposed in system tables. @@ -1484,7 +1502,7 @@ async fn query_tables_schema( let empty_ok_map = Ok(HashMap::new()); - let mut tables_schema: HashMap<_, Result<_, MissingUserDefinedType>> = HashMap::new(); + let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new(); rows.map(|row_result| { let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?; @@ -1518,7 +1536,10 @@ async fn query_tables_schema( // is minor enough to ignore. Note that the first issue also applies to // solution 1: but the keyspace won't be present in the result at all, // which is arguably worse. - tables_schema.insert((keyspace_name, table_name), Err(e.clone())); + tables_schema.insert( + (keyspace_name, table_name), + Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())), + ); return Ok::<_, MetadataError>(()); } }; @@ -1532,7 +1553,10 @@ async fn query_tables_schema( let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) { Ok(t) => t, Err(e) => { - tables_schema.insert((keyspace_name, table_name), Err(e)); + tables_schema.insert( + (keyspace_name, table_name), + Err(SingleKeyspaceMetadataError::MissingUDT(e)), + ); return Ok::<_, MetadataError>(()); } }; From cdf7f84510099bf47ee5a61e4ea38ca850cd71b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 1 Mar 2025 18:11:31 +0100 Subject: [PATCH 04/13] Metadata: Use Vecs for intermediate representation of pk and ck MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit HashMap was used before, but it provides no benefit over a Vec here. Vec will make it easier to verify that we received all of pk and ck columns, which the next commits will do. Co-authored-by: Wojciech Przytuła --- scylla/src/cluster/metadata.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index db90676f95..8130b9a903 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -1573,8 +1573,8 @@ async fn query_tables_schema( .entry((keyspace_name, table_name)) .or_insert(Ok(( HashMap::new(), // columns - HashMap::new(), // partition key - HashMap::new(), // clustering key + Vec::new(), // partition key + Vec::new(), // clustering key ))) else { // This table was previously marked as broken, no way to insert anything. @@ -1582,12 +1582,12 @@ async fn query_tables_schema( }; if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering { - let key_map = if kind == ColumnKind::PartitionKey { + let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey { entry.1.borrow_mut() } else { entry.2.borrow_mut() }; - key_map.insert(position, column_name.clone()); + key_list.push((position, column_name.clone())); } entry.0.insert( @@ -1609,7 +1609,12 @@ async fn query_tables_schema( for ((keyspace_name, table_name), table_result) in tables_schema { let keyspace_and_table_name = (keyspace_name, table_name); - let (columns, partition_key_columns, clustering_key_columns) = match table_result { + #[allow(clippy::type_complexity)] + let (columns, partition_key_columns, clustering_key_columns): ( + HashMap, + Vec<(i32, String)>, + Vec<(i32, String)>, + ) = match table_result { Ok(table) => table, Err(e) => { let _ = result.insert(keyspace_and_table_name, Err(e)); From da4a2478d9e3bd9cbe30c2c4e19b5d7607ef17a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 1 Mar 2025 18:16:10 +0100 Subject: [PATCH 05/13] Metadata: Verify that there are no gaps in pk and ck MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously it was possible that some positions of `partition_key` and `clustering_key` would remain unfilled and thus empty strings. The probability was very low - Scylla would have to return very weird data - but the possibility was there. This commit verifies that this is not happening, and returns and error if it is. Co-authored-by: Wojciech Przytuła --- scylla/src/cluster/metadata.rs | 56 +++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 8130b9a903..9ae89e1a2f 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -77,6 +77,10 @@ type PerKsTableResult = PerKsTable>; pub(crate) enum SingleKeyspaceMetadataError { #[error(transparent)] MissingUDT(MissingUserDefinedType), + #[error("Partition key column with position {0} is missing from metadata")] + IncompletePartitionKey(i32), + #[error("Clustering key column with position {0} is missing from metadata")] + IncompleteClusteringKey(i32), } /// Allows to read current metadata from the cluster @@ -1606,7 +1610,7 @@ async fn query_tables_schema( let mut all_partitioners = query_table_partitioners(conn).await?; let mut result = HashMap::new(); - for ((keyspace_name, table_name), table_result) in tables_schema { + 'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema { let keyspace_and_table_name = (keyspace_name, table_name); #[allow(clippy::type_complexity)] @@ -1621,16 +1625,52 @@ async fn query_tables_schema( continue; } }; - let mut partition_key = vec!["".to_string(); partition_key_columns.len()]; - for (position, column_name) in partition_key_columns { - partition_key[position as usize] = column_name; - } - let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()]; - for (position, column_name) in clustering_key_columns { - clustering_key[position as usize] = column_name; + fn validate_key_columns(mut key_columns: Vec<(i32, String)>) -> Result, i32> { + key_columns.sort_unstable_by_key(|(position, _)| *position); + + key_columns + .into_iter() + .enumerate() + .map(|(idx, (position, column_name))| { + // unwrap: I don't see the point of handling the scenario of fetching over + // 2 * 10^9 columns. + let idx: i32 = idx.try_into().unwrap(); + if idx == position { + Ok(column_name) + } else { + Err(idx) + } + }) + .collect::, _>>() } + let partition_key = match validate_key_columns(partition_key_columns) { + Ok(partition_key_columns) => partition_key_columns, + Err(position) => { + result.insert( + keyspace_and_table_name, + Err(SingleKeyspaceMetadataError::IncompletePartitionKey( + position, + )), + ); + continue 'tables_loop; + } + }; + + let clustering_key = match validate_key_columns(clustering_key_columns) { + Ok(clustering_key_columns) => clustering_key_columns, + Err(position) => { + result.insert( + keyspace_and_table_name, + Err(SingleKeyspaceMetadataError::IncompleteClusteringKey( + position, + )), + ); + continue 'tables_loop; + } + }; + let partitioner = all_partitioners .remove(&keyspace_and_table_name) .unwrap_or_default(); From 211089fb0b19e807fb05f285a02d7944f50928e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Fri, 28 Feb 2025 19:39:37 +0100 Subject: [PATCH 06/13] Table: Describe guarantee about pk and ck column names Now that we verify there are no gaps in pk and ck, we can explicitly provide this guarantee to users. --- scylla/src/cluster/metadata.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index 9ae89e1a2f..a9d8305775 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -197,7 +197,11 @@ pub struct Keyspace { #[derive(Clone, Debug, PartialEq, Eq)] pub struct Table { pub columns: HashMap, + /// Names of the column of partition key. + /// All of the names are guaranteed to be present in `columns` field. pub partition_key: Vec, + /// Names of the column of clustering key. + /// All of the names are guaranteed to be present in `columns` field. pub clustering_key: Vec, pub partitioner: Option, } From 4cdaba5f13354b1b9c1b88e6f096b556ef57cf51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 1 Mar 2025 17:38:45 +0100 Subject: [PATCH 07/13] Table: Add Vec of primary key column specs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We want to move token calculation methods in ClusterState to accept SerializeRow. This in turn means we need to serialize those values, so we have to create RowSerializationContext from the Table struct. RowSerializationContext currently needs a slice of ColumnSpec. Table has no such slice. Instead it has a hashmap from column name to a ColumnSpec, and a Vec of primary key column names. We have three options: - Add a field with the required slice to Table struct. - Modify RowSerializationContext somehow so it can be created from the data that we already have in Table. I'm not sure how to do that, idea would be appreciated. - Hybrid: Modify both Table and RowSerializationContext to make them work together. This commit takes the first approach because it seemed to be the easiest one. Doing it a different way is of course open for discussion. Co-authored-by: Wojciech Przytuła --- scylla/src/cluster/metadata.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/scylla/src/cluster/metadata.rs b/scylla/src/cluster/metadata.rs index a9d8305775..242102e25e 100644 --- a/scylla/src/cluster/metadata.rs +++ b/scylla/src/cluster/metadata.rs @@ -35,6 +35,7 @@ use futures::Stream; use itertools::Itertools; use rand::seq::{IndexedRandom, SliceRandom}; use rand::{rng, Rng}; +use scylla_cql::frame::response::result::{ColumnSpec, TableSpec}; use scylla_macros::DeserializeRow; use std::borrow::BorrowMut; use std::cell::Cell; @@ -197,13 +198,14 @@ pub struct Keyspace { #[derive(Clone, Debug, PartialEq, Eq)] pub struct Table { pub columns: HashMap, - /// Names of the column of partition key. + /// Names of the column of partition key. /// All of the names are guaranteed to be present in `columns` field. pub partition_key: Vec, - /// Names of the column of clustering key. + /// Names of the column of clustering key. /// All of the names are guaranteed to be present in `columns` field. pub clustering_key: Vec, pub partitioner: Option, + pub(crate) pk_column_specs: Vec>, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -1410,6 +1412,7 @@ async fn query_tables( partition_key: vec![], clustering_key: vec![], partitioner: None, + pk_column_specs: vec![], })); let mut entry = result @@ -1461,6 +1464,7 @@ async fn query_views( partition_key: vec![], clustering_key: vec![], partitioner: None, + pk_column_specs: vec![], })) .map(|table| MaterializedView { view_metadata: table, @@ -1679,6 +1683,20 @@ async fn query_tables_schema( .remove(&keyspace_and_table_name) .unwrap_or_default(); + // unwrap of get() result: all column names in `partition_key` are at this + // point guaranteed to be present in `columns`. See the construction of `partition_key` + let pk_column_specs = partition_key + .iter() + .map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ)) + .map(|(name, typ)| { + let table_spec = TableSpec::owned( + keyspace_and_table_name.0.clone(), + keyspace_and_table_name.1.clone(), + ); + ColumnSpec::owned(name.to_owned(), typ, table_spec) + }) + .collect(); + result.insert( keyspace_and_table_name, Ok(Table { @@ -1686,6 +1704,7 @@ async fn query_tables_schema( partition_key, clustering_key, partitioner, + pk_column_specs, }), ); } From 48966f1922b20ffe8f1aa97305153262b91ab27b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Fri, 28 Feb 2025 20:07:52 +0100 Subject: [PATCH 08/13] ClusterState: Accepts `&dyn SerializeRow` instead of `SerializedValues` There is no easy way for the users to create SerializedValues, which makes the current APIs cumbersome to use. Instead they should accept `&dyn SerializeRow` and perform serialization based on table metadata. This change means that those methods can now also return SerializationError, and also need to handle a table missing from metadata, preferably also returning an error in this case. No existing error type fits here, so either we need to extend some existing one, or create new one. First idea was extending PartitionKeyError, but it needs to be convertible to ExecutionError, in which we don't need those new variants. For that reason I introduced a new error type for those methods, called ClusterStateTokenError. --- scylla/src/client/session_test.rs | 13 +++--------- scylla/src/cluster/state.rs | 33 ++++++++++++++++++++----------- scylla/src/errors.rs | 18 +++++++++++++++++ 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/scylla/src/client/session_test.rs b/scylla/src/client/session_test.rs index a8227727b6..483016e7df 100644 --- a/scylla/src/client/session_test.rs +++ b/scylla/src/client/session_test.rs @@ -218,9 +218,6 @@ async fn test_prepared_statement() { .unwrap(); let values = (17_i32, 16_i32, "I'm prepared!!!"); - let serialized_values_complex_pk = prepared_complex_pk_statement - .serialize_values(&values) - .unwrap(); session .execute_unpaged(&prepared_statement, &values) @@ -245,12 +242,9 @@ async fn test_prepared_statement() { let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); - let mut pk = SerializedValues::new(); - pk.add_value(&17_i32, &ColumnType::Native(NativeType::Int)) - .unwrap(); let cluster_state_token = session .get_cluster_state() - .compute_token(&ks, "t2", &pk) + .compute_token(&ks, "t2", &(values.0,)) .unwrap(); assert_eq!(token, cluster_state_token); } @@ -272,7 +266,7 @@ async fn test_prepared_statement() { assert_eq!(token, prepared_token); let cluster_state_token = session .get_cluster_state() - .compute_token(&ks, "complex_pk", &serialized_values_complex_pk) + .compute_token(&ks, "complex_pk", &values) .unwrap(); assert_eq!(token, cluster_state_token); } @@ -608,7 +602,6 @@ async fn test_token_calculation() { s.push('a'); } let values = (&s,); - let serialized_values = prepared_statement.serialize_values(&values).unwrap(); session .execute_unpaged(&prepared_statement, &values) .await @@ -631,7 +624,7 @@ async fn test_token_calculation() { assert_eq!(token, prepared_token); let cluster_state_token = session .get_cluster_state() - .compute_token(&ks, "t3", &serialized_values) + .compute_token(&ks, "t3", &values) .unwrap(); assert_eq!(token, cluster_state_token); } diff --git a/scylla/src/cluster/state.rs b/scylla/src/cluster/state.rs index 97d55301ab..0cfe448d4b 100644 --- a/scylla/src/cluster/state.rs +++ b/scylla/src/cluster/state.rs @@ -1,15 +1,14 @@ -use crate::errors::ConnectionPoolError; +use crate::errors::{ClusterStateTokenError, ConnectionPoolError}; use crate::network::{Connection, PoolConfig, VerifiedKeyspaceName}; use crate::policies::host_filter::HostFilter; use crate::routing::locator::tablets::{RawTablet, Tablet, TabletsInfo}; use crate::routing::locator::ReplicaLocator; use crate::routing::partitioner::{calculate_token_for_partition_key, PartitionerName}; use crate::routing::{Shard, Token}; -use crate::statement::prepared::TokenCalculationError; use itertools::Itertools; use scylla_cql::frame::response::result::TableSpec; -use scylla_cql::serialize::row::SerializedValues; +use scylla_cql::serialize::row::{RowSerializationContext, SerializeRow, SerializedValues}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tracing::{debug, warn}; @@ -203,17 +202,29 @@ impl ClusterState { &self, keyspace: &str, table: &str, - partition_key: &SerializedValues, - ) -> Result { - let partitioner = self + partition_key: &dyn SerializeRow, + ) -> Result { + let Some(table) = self .keyspaces .get(keyspace) .and_then(|k| k.tables.get(table)) - .and_then(|t| t.partitioner.as_deref()) + else { + return Err(ClusterStateTokenError::UnknownTable { + keyspace: keyspace.to_owned(), + table: table.to_owned(), + }); + }; + let values = SerializedValues::from_serializable( + &RowSerializationContext::from_specs(table.pk_column_specs.as_slice()), + partition_key, + )?; + let partitioner = table + .partitioner + .as_deref() .and_then(PartitionerName::from_str) .unwrap_or_default(); - - calculate_token_for_partition_key(partition_key, &partitioner) + calculate_token_for_partition_key(&values, &partitioner) + .map_err(ClusterStateTokenError::TokenCalculation) } /// Access to replicas owning a given token @@ -250,8 +261,8 @@ impl ClusterState { &self, keyspace: &str, table: &str, - partition_key: &SerializedValues, - ) -> Result, Shard)>, TokenCalculationError> { + partition_key: &dyn SerializeRow, + ) -> Result, Shard)>, ClusterStateTokenError> { let token = self.compute_token(keyspace, table, partition_key)?; Ok(self.get_token_endpoints(keyspace, table, token)) } diff --git a/scylla/src/errors.rs b/scylla/src/errors.rs index 366016480d..bb36fc55a5 100644 --- a/scylla/src/errors.rs +++ b/scylla/src/errors.rs @@ -12,6 +12,7 @@ use crate::frame::response; // Re-export error types from pager module. pub use crate::client::pager::{NextPageError, NextRowError}; +use crate::statement::prepared::TokenCalculationError; // Re-export error types from query_result module. pub use crate::response::query_result::{ FirstRowError, IntoRowsResultError, MaybeFirstRowError, ResultNotRowsError, RowsError, @@ -934,6 +935,23 @@ pub(crate) enum ResponseParseError { CqlResponseParseError(#[from] CqlResponseParseError), } +/// Error returned from [ClusterState](crate::cluster::ClusterState) APIs. +#[derive(Clone, Debug, Error)] +#[non_exhaustive] +pub enum ClusterStateTokenError { + /// Failed to calculate token. + #[error(transparent)] + TokenCalculation(#[from] TokenCalculationError), + + /// Failed to serialize values required to compute partition key. + #[error(transparent)] + Serialization(#[from] SerializationError), + + /// ClusterState doesn't currently have metadata for the requested table. + #[error("Can't find metadata for requested table ({keyspace}.{table}).")] + UnknownTable { keyspace: String, table: String }, +} + #[cfg(test)] mod tests { use scylla_cql::Consistency; From c4c5eccb76e1fa6d3f1fede361e7c1fb5b017489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Wed, 26 Feb 2025 17:06:38 +0100 Subject: [PATCH 09/13] benchmark: Add "unstable-testing" feature We may sometimes need to benchmark / integration test some private stuff. There is no good way to do this. The least bad way seems to be to re-export such private APIs under an unstable feature, and mark tests or benchmarks that require them as depending on this feature. It's worth noting that using "required-features" cargo attribute won't automatically enable the feature - it will just prevent the target from existing unless the feature is enabled. It means that after this commit `cargo bench` won't run this benchmark. Instead you need to run "cargo bench --features unstable-testing" or "cargo bench --all-features". --- scylla/Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scylla/Cargo.toml b/scylla/Cargo.toml index 18a35efec0..32f70be0ba 100644 --- a/scylla/Cargo.toml +++ b/scylla/Cargo.toml @@ -39,6 +39,7 @@ full-serialization = [ "num-bigint-04", "bigdecimal-04", ] +unstable-testing = [] [dependencies] scylla-macros = { version = "0.7.0", path = "../scylla-macros" } @@ -96,6 +97,7 @@ time = "0.3" [[bench]] name = "benchmark" harness = false +required-features = ["unstable-testing"] [lints.rust] unnameable_types = "warn" From 47fd32d69b27509f0faf47af2b8237a0c96ec50a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Fri, 28 Feb 2025 20:16:01 +0100 Subject: [PATCH 10/13] scylla benchmark: Use calculate_token_for_partition_key behind feature flag This will allow us to unpub the original calculate_token_for_partition_key, which is the last step towards eliminating SerializedValues from public API. --- scylla/benches/benchmark.rs | 3 ++- scylla/src/lib.rs | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/scylla/benches/benchmark.rs b/scylla/benches/benchmark.rs index 0bb83df7fe..322f832d14 100644 --- a/scylla/benches/benchmark.rs +++ b/scylla/benches/benchmark.rs @@ -1,7 +1,8 @@ use criterion::{criterion_group, criterion_main, Criterion}; use bytes::BytesMut; -use scylla::routing::partitioner::{calculate_token_for_partition_key, PartitionerName}; +use scylla::internal_testing::calculate_token_for_partition_key; +use scylla::routing::partitioner::PartitionerName; use scylla_cql::frame::response::result::{ColumnType, NativeType}; use scylla_cql::frame::types; use scylla_cql::serialize::row::SerializedValues; diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index bc66116504..f096dee24b 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -257,3 +257,22 @@ pub(crate) mod utils; #[cfg(test)] pub(crate) use utils::test_utils; + +#[cfg(feature = "unstable-testing")] +pub mod internal_testing { + use scylla_cql::serialize::row::SerializedValues; + + use crate::routing::partitioner::PartitionerName; + use crate::routing::Token; + use crate::statement::prepared::TokenCalculationError; + + pub fn calculate_token_for_partition_key( + serialized_partition_key_values: &SerializedValues, + partitioner: &PartitionerName, + ) -> Result { + crate::routing::partitioner::calculate_token_for_partition_key( + serialized_partition_key_values, + partitioner, + ) + } +} From 3af140ce5b793039c68651d8864fd342a19be288 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Wed, 19 Feb 2025 18:26:35 +0100 Subject: [PATCH 11/13] partitioner.rs: un-pub calculate_token_for_partition_key This function operates on SerializedValues, and so is not type-safe. Given that we already provide token-calculation API on PreparedStatement and ClusterState, exposing this method seems redundant. If it turns out that there are users who need this method and can't use existing APIs then we can think about providing appropriate safe API, or making this pub once again as a last resort. --- scylla/src/routing/partitioner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scylla/src/routing/partitioner.rs b/scylla/src/routing/partitioner.rs index 0778c83065..a4deb9096f 100644 --- a/scylla/src/routing/partitioner.rs +++ b/scylla/src/routing/partitioner.rs @@ -349,7 +349,7 @@ impl PartitionerHasher for CDCPartitionerHasher { /// /// NOTE: the provided values must completely constitute partition key /// and be in the order defined in CREATE TABLE statement. -pub fn calculate_token_for_partition_key( +pub(crate) fn calculate_token_for_partition_key( serialized_partition_key_values: &SerializedValues, partitioner: &PartitionerName, ) -> Result { From d972aa93689716008c2354e32f95168d139e417a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Wed, 19 Feb 2025 18:28:19 +0100 Subject: [PATCH 12/13] lib.rs: Remove SerializedValues and SerializedValuesIterator re-exports Those are no longer used in any public API. --- scylla/src/lib.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/scylla/src/lib.rs b/scylla/src/lib.rs index f096dee24b..5f4e9207ab 100644 --- a/scylla/src/lib.rs +++ b/scylla/src/lib.rs @@ -163,10 +163,6 @@ pub mod serialize { BuiltinSerializationError, BuiltinSerializationErrorKind, BuiltinTypeCheckError, BuiltinTypeCheckErrorKind, }; - - // Not part of the old framework, but something that we should - // still aim to remove from public API. - pub use scylla_cql::serialize::row::{SerializedValues, SerializedValuesIterator}; } /// Contains the [SerializeValue][value::SerializeValue] trait and its implementations. From 473cc846753eb4c7d64e7b92a2de40a08c692780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 1 Mar 2025 01:23:24 +0100 Subject: [PATCH 13/13] ClusterState: Document token calculation methods --- scylla/src/cluster/state.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/scylla/src/cluster/state.rs b/scylla/src/cluster/state.rs index 0cfe448d4b..a754a8a9d9 100644 --- a/scylla/src/cluster/state.rs +++ b/scylla/src/cluster/state.rs @@ -198,6 +198,12 @@ impl ClusterState { } /// Compute token of a table partition key + /// + /// `partition_key` argument contains the values of all partition key + /// columns. You can use both unnamed values like a tuple (e.g. `(1, 5, 5)`) + /// or named values (e.g. struct that derives `SerializeRow`), as you would + /// when executing a request. No additional values are allowed besides values + /// for primary key columns. pub fn compute_token( &self, keyspace: &str, @@ -257,6 +263,12 @@ impl ClusterState { } /// Access to replicas owning a given partition key (similar to `nodetool getendpoints`) + /// + /// `partition_key` argument contains the values of all partition key + /// columns. You can use both unnamed values like a tuple (e.g. `(1, 5, 5)`) + /// or named values (e.g. struct that derives `SerializeRow`), as you would + /// when executing a request. No additional values are allowed besides values + /// for primary key columns. pub fn get_endpoints( &self, keyspace: &str,