Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SerializedValues from public API of scylla crate. #1252

Merged
merged 13 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions scylla-cql/src/serialize/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,6 @@ impl<'a> RowSerializationContext<'a> {
pub fn columns(&self) -> &'a [ColumnSpec] {
self.columns
}

/// Looks up and returns a column/bind marker by name.
// TODO: change RowSerializationContext to make this faster
#[inline]
pub fn column_by_name(&self, target: &str) -> Option<&ColumnSpec> {
self.columns.iter().find(|&c| c.name() == target)
}
}

/// Represents a set of values that can be sent along a CQL statement.
Expand Down Expand Up @@ -514,7 +507,7 @@ impl SerializedValues {
pub const EMPTY: &'static SerializedValues = &SerializedValues::new();

/// Constructs `SerializedValues` from given [`SerializeRow`] object.
pub fn from_serializable<T: SerializeRow>(
pub fn from_serializable<T: SerializeRow + ?Sized>(
ctx: &RowSerializationContext,
row: &T,
) -> Result<Self, SerializationError> {
Expand Down
2 changes: 2 additions & 0 deletions scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ full-serialization = [
"num-bigint-04",
"bigdecimal-04",
]
unstable-testing = []

[dependencies]
scylla-macros = { version = "0.7.0", path = "../scylla-macros" }
Expand Down Expand Up @@ -96,6 +97,7 @@ time = "0.3"
[[bench]]
name = "benchmark"
harness = false
required-features = ["unstable-testing"]

[lints.rust]
unnameable_types = "warn"
Expand Down
3 changes: 2 additions & 1 deletion scylla/benches/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use criterion::{criterion_group, criterion_main, Criterion};

use bytes::BytesMut;
use scylla::routing::partitioner::{calculate_token_for_partition_key, PartitionerName};
use scylla::internal_testing::calculate_token_for_partition_key;
use scylla::routing::partitioner::PartitionerName;
use scylla_cql::frame::response::result::{ColumnType, NativeType};
use scylla_cql::frame::types;
use scylla_cql::serialize::row::SerializedValues;
Expand Down
13 changes: 3 additions & 10 deletions scylla/src/client/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,6 @@ async fn test_prepared_statement() {
.unwrap();

let values = (17_i32, 16_i32, "I'm prepared!!!");
let serialized_values_complex_pk = prepared_complex_pk_statement
.serialize_values(&values)
.unwrap();

session
.execute_unpaged(&prepared_statement, &values)
Expand All @@ -245,12 +242,9 @@ async fn test_prepared_statement() {
let prepared_token = Murmur3Partitioner
.hash_one(&prepared_statement.compute_partition_key(&values).unwrap());
assert_eq!(token, prepared_token);
let mut pk = SerializedValues::new();
pk.add_value(&17_i32, &ColumnType::Native(NativeType::Int))
.unwrap();
let cluster_state_token = session
.get_cluster_state()
.compute_token(&ks, "t2", &pk)
.compute_token(&ks, "t2", &(values.0,))
.unwrap();
assert_eq!(token, cluster_state_token);
}
Expand All @@ -272,7 +266,7 @@ async fn test_prepared_statement() {
assert_eq!(token, prepared_token);
let cluster_state_token = session
.get_cluster_state()
.compute_token(&ks, "complex_pk", &serialized_values_complex_pk)
.compute_token(&ks, "complex_pk", &values)
.unwrap();
assert_eq!(token, cluster_state_token);
}
Expand Down Expand Up @@ -608,7 +602,6 @@ async fn test_token_calculation() {
s.push('a');
}
let values = (&s,);
let serialized_values = prepared_statement.serialize_values(&values).unwrap();
session
.execute_unpaged(&prepared_statement, &values)
.await
Expand All @@ -631,7 +624,7 @@ async fn test_token_calculation() {
assert_eq!(token, prepared_token);
let cluster_state_token = session
.get_cluster_state()
.compute_token(&ks, "t3", &serialized_values)
.compute_token(&ks, "t3", &values)
.unwrap();
assert_eq!(token, cluster_state_token);
}
Expand Down
146 changes: 119 additions & 27 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use futures::Stream;
use itertools::Itertools;
use rand::seq::{IndexedRandom, SliceRandom};
use rand::{rng, Rng};
use scylla_cql::frame::response::result::{ColumnSpec, TableSpec};
use scylla_macros::DeserializeRow;
use std::borrow::BorrowMut;
use std::cell::Cell;
Expand Down Expand Up @@ -67,6 +68,22 @@ type PerTable<T> = HashMap<String, T>;
type PerKsTable<T> = HashMap<(String, String), T>;
type PerKsTableResult<T, E> = PerKsTable<Result<T, E>>;

/// Indicates that reading metadata failed, but in a way
/// that we can handle, by throwing out data for a keyspace.
/// It is possible that some of the errors could be handled in even
/// more granular way (e.g. throwing out a single table), but keyspace
/// granularity seems like a good choice given how independent keyspaces
/// are from each other.
#[derive(Clone, Debug, Error)]
pub(crate) enum SingleKeyspaceMetadataError {
#[error(transparent)]
MissingUDT(MissingUserDefinedType),
#[error("Partition key column with position {0} is missing from metadata")]
IncompletePartitionKey(i32),
#[error("Clustering key column with position {0} is missing from metadata")]
IncompleteClusteringKey(i32),
}

/// Allows to read current metadata from the cluster
pub(crate) struct MetadataReader {
control_connection_pool_config: PoolConfig,
Expand All @@ -92,7 +109,7 @@ pub(crate) struct MetadataReader {
/// Describes all metadata retrieved from the cluster
pub(crate) struct Metadata {
pub(crate) peers: Vec<Peer>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, MissingUserDefinedType>>,
pub(crate) keyspaces: HashMap<String, Result<Keyspace, SingleKeyspaceMetadataError>>,
}

#[non_exhaustive] // <- so that we can add more fields in a backwards-compatible way
Expand Down Expand Up @@ -181,9 +198,14 @@ pub struct Keyspace {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Table {
pub columns: HashMap<String, Column>,
/// Names of the column of partition key.
/// All of the names are guaranteed to be present in `columns` field.
pub partition_key: Vec<String>,
/// Names of the column of clustering key.
/// All of the names are guaranteed to be present in `columns` field.
pub clustering_key: Vec<String>,
pub partitioner: Option<String>,
pub(crate) pk_column_specs: Vec<ColumnSpec<'static>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🆗 I believe the taken approach is perfectly acceptable.

💭 What I'm not convinced about is having a HashMap of columns. HashMaps are generally heavyweight structures, whereas tables never contain that many columns that HashMap's performance would be better than Vec's.

🔧 My another concern are metadata-related structs in metadata.rs which are pub and have pub fields. For now, at least Column and MaterializedView are such structs and are not #[non_exhaustive], which might be a potential issue in the future.
I strongly suggest pub(crate)'ing all those fields and exposing only getters for them. This leaves maximum freedom for us in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 What I'm not convinced about is having a HashMap of columns. HashMaps are generally heavyweight structures, whereas tables never contain that many columns that HashMap's performance would be better than Vec's.

This would also allow us to implement cass_table_meta_column

🔧 My another concern are metadata-related structs in metadata.rs which are pub and have pub fields. For now, at least Column and MaterializedView are such structs and are not #[non_exhaustive], which might be a potential issue in the future. I strongly suggest pub(crate)'ing all those fields and exposing only getters for them. This leaves maximum freedom for us in the future.

I agree

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also allow us to implement cass_table_meta_column

It is possible (but problematic) to implement it now.
Afaik, the order of the columns is always:

  • partition key columns (in order defined in the key)
  • clustering key columns (in order defined in the key)
  • the rest of the column, in alphabetical order

cass_table_meta_column could sort the columns according to this, and retrieve the nth one. This is of course really ineffective.

💭 What I'm not convinced about is having a HashMap of columns. HashMaps are generally heavyweight structures, whereas tables never contain that many columns that HashMap's performance would be better than Vec's.

Are you talking about the performance of retrieving columns by name? Do you know at which size hashmaps start outperforming hashmaps (key length needs to be taken into consideration)?
Afaik it is not very unusual to have dozens or even hundreds columns in a table - Scylla / Cassandra design promotes using denormalized data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 My another concern are metadata-related structs in metadata.rs which are pub and have pub fields. For now, at least Column and MaterializedView are such structs and are not #[non_exhaustive], which might be a potential issue in the future.
I strongly suggest pub(crate)'ing all those fields and exposing only getters for them. This leaves maximum freedom for us in the future.

Makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding HashMaps vs Vec, the argument that convinces me more than performance is that we don't expose all the information about the table (we don't expose column order used by Scylla).
Do you have an idea what should the Table struct look like?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not extend Column with name: String and then just keep a Vec<Column> in Table?

}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -902,7 +924,7 @@ async fn query_keyspaces(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
fetch_schema: bool,
) -> Result<PerKeyspaceResult<Keyspace, MissingUserDefinedType>, MetadataError> {
) -> Result<PerKeyspaceResult<Keyspace, SingleKeyspaceMetadataError>, MetadataError> {
let rows = query_filter_keyspace_name::<(String, HashMap<String, String>)>(
conn,
"select keyspace_name, replication from system_schema.keyspaces",
Expand Down Expand Up @@ -953,15 +975,20 @@ async fn query_keyspaces(

// As you can notice, in this file we generally operate on two layers of errors:
// - Outer (MetadataError) if something went wrong with querying the cluster.
// - Inner (currently MissingUserDefinedType, possibly other variants in the future) if the fetched metadata
// turned out to not be fully consistent.
// - Inner (SingleKeyspaceMetadataError) if the fetched metadata turned out to not be fully consistent.
// If there is an inner error, we want to drop metadata for the whole keyspace.
// This logic checks if either tables views or UDTs have such inner error, and returns it if so.
// This logic checks if either tables, views, or UDTs have such inner error, and returns it if so.
// Notice that in the error branch, return value is wrapped in `Ok` - but this is the
// outer error, so it just means there was no error while querying the cluster.
let (tables, views, user_defined_types) = match (tables, views, user_defined_types) {
(Ok(t), Ok(v), Ok(u)) => (t, v, u),
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => return Ok((keyspace_name, Err(e))),
(Err(e), _, _) | (_, Err(e), _) => return Ok((keyspace_name, Err(e))),
(_, _, Err(e)) => {
return Ok((
keyspace_name,
Err(SingleKeyspaceMetadataError::MissingUDT(e)),
))
}
};

let keyspace = Keyspace {
Expand Down Expand Up @@ -1364,8 +1391,8 @@ mod toposort_tests {
async fn query_tables(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
tables: &mut PerKsTableResult<Table, MissingUserDefinedType>,
) -> Result<PerKeyspaceResult<PerTable<Table>, MissingUserDefinedType>, MetadataError> {
tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
) -> Result<PerKeyspaceResult<PerTable<Table>, SingleKeyspaceMetadataError>, MetadataError> {
let rows = query_filter_keyspace_name::<(String, String)>(
conn,
"SELECT keyspace_name, table_name FROM system_schema.tables",
Expand All @@ -1385,6 +1412,7 @@ async fn query_tables(
partition_key: vec![],
clustering_key: vec![],
partitioner: None,
pk_column_specs: vec![],
}));

let mut entry = result
Expand All @@ -1409,8 +1437,9 @@ async fn query_tables(
async fn query_views(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
tables: &mut PerKsTableResult<Table, MissingUserDefinedType>,
) -> Result<PerKeyspaceResult<PerTable<MaterializedView>, MissingUserDefinedType>, MetadataError> {
tables: &mut PerKsTableResult<Table, SingleKeyspaceMetadataError>,
) -> Result<PerKeyspaceResult<PerTable<MaterializedView>, SingleKeyspaceMetadataError>, MetadataError>
{
let rows = query_filter_keyspace_name::<(String, String, String)>(
conn,
"SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
Expand All @@ -1435,6 +1464,7 @@ async fn query_views(
partition_key: vec![],
clustering_key: vec![],
partitioner: None,
pk_column_specs: vec![],
}))
.map(|table| MaterializedView {
view_metadata: table,
Expand Down Expand Up @@ -1465,7 +1495,7 @@ async fn query_tables_schema(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
udts: &PerKeyspaceResult<PerTable<Arc<UserDefinedType<'static>>>, MissingUserDefinedType>,
) -> Result<PerKsTableResult<Table, MissingUserDefinedType>, MetadataError> {
) -> Result<PerKsTableResult<Table, SingleKeyspaceMetadataError>, MetadataError> {
// Upon migration from thrift to CQL, Cassandra internally creates a surrogate column "value" of
// type EmptyType for dense tables. This resolves into this CQL type name.
// This column shouldn't be exposed to the user but is currently exposed in system tables.
Expand All @@ -1484,7 +1514,7 @@ async fn query_tables_schema(

let empty_ok_map = Ok(HashMap::new());

let mut tables_schema: HashMap<_, Result<_, MissingUserDefinedType>> = HashMap::new();
let mut tables_schema: HashMap<_, Result<_, SingleKeyspaceMetadataError>> = HashMap::new();

rows.map(|row_result| {
let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;
Expand Down Expand Up @@ -1518,7 +1548,10 @@ async fn query_tables_schema(
// is minor enough to ignore. Note that the first issue also applies to
// solution 1: but the keyspace won't be present in the result at all,
// which is arguably worse.
tables_schema.insert((keyspace_name, table_name), Err(e.clone()));
tables_schema.insert(
(keyspace_name, table_name),
Err(SingleKeyspaceMetadataError::MissingUDT(e.clone())),
);
return Ok::<_, MetadataError>(());
}
};
Expand All @@ -1532,7 +1565,10 @@ async fn query_tables_schema(
let cql_type = match pre_cql_type.into_cql_type(&keyspace_name, keyspace_udts) {
Ok(t) => t,
Err(e) => {
tables_schema.insert((keyspace_name, table_name), Err(e));
tables_schema.insert(
(keyspace_name, table_name),
Err(SingleKeyspaceMetadataError::MissingUDT(e)),
);
return Ok::<_, MetadataError>(());
}
};
Expand All @@ -1549,21 +1585,21 @@ async fn query_tables_schema(
.entry((keyspace_name, table_name))
.or_insert(Ok((
HashMap::new(), // columns
HashMap::new(), // partition key
HashMap::new(), // clustering key
Vec::new(), // partition key
Vec::new(), // clustering key
)))
else {
// This table was previously marked as broken, no way to insert anything.
return Ok::<_, MetadataError>(());
};

if kind == ColumnKind::PartitionKey || kind == ColumnKind::Clustering {
let key_map = if kind == ColumnKind::PartitionKey {
let key_list: &mut Vec<(i32, String)> = if kind == ColumnKind::PartitionKey {
entry.1.borrow_mut()
} else {
entry.2.borrow_mut()
};
key_map.insert(position, column_name.clone());
key_list.push((position, column_name.clone()));
}

entry.0.insert(
Expand All @@ -1582,37 +1618,93 @@ async fn query_tables_schema(
let mut all_partitioners = query_table_partitioners(conn).await?;
let mut result = HashMap::new();

for ((keyspace_name, table_name), table_result) in tables_schema {
'tables_loop: for ((keyspace_name, table_name), table_result) in tables_schema {
let keyspace_and_table_name = (keyspace_name, table_name);

let (columns, partition_key_columns, clustering_key_columns) = match table_result {
#[allow(clippy::type_complexity)]
let (columns, partition_key_columns, clustering_key_columns): (
HashMap<String, Column>,
Vec<(i32, String)>,
Vec<(i32, String)>,
) = match table_result {
Ok(table) => table,
Err(e) => {
let _ = result.insert(keyspace_and_table_name, Err(e));
continue;
}
};
let mut partition_key = vec!["".to_string(); partition_key_columns.len()];
for (position, column_name) in partition_key_columns {
partition_key[position as usize] = column_name;
}

let mut clustering_key = vec!["".to_string(); clustering_key_columns.len()];
for (position, column_name) in clustering_key_columns {
clustering_key[position as usize] = column_name;
fn validate_key_columns(mut key_columns: Vec<(i32, String)>) -> Result<Vec<String>, i32> {
key_columns.sort_unstable_by_key(|(position, _)| *position);

key_columns
.into_iter()
.enumerate()
.map(|(idx, (position, column_name))| {
// unwrap: I don't see the point of handling the scenario of fetching over
// 2 * 10^9 columns.
let idx: i32 = idx.try_into().unwrap();
if idx == position {
Ok(column_name)
} else {
Err(idx)
}
})
.collect::<Result<Vec<_>, _>>()
}

let partition_key = match validate_key_columns(partition_key_columns) {
Ok(partition_key_columns) => partition_key_columns,
Err(position) => {
result.insert(
keyspace_and_table_name,
Err(SingleKeyspaceMetadataError::IncompletePartitionKey(
position,
)),
);
continue 'tables_loop;
}
};

let clustering_key = match validate_key_columns(clustering_key_columns) {
Ok(clustering_key_columns) => clustering_key_columns,
Err(position) => {
result.insert(
keyspace_and_table_name,
Err(SingleKeyspaceMetadataError::IncompleteClusteringKey(
position,
)),
);
continue 'tables_loop;
}
};

let partitioner = all_partitioners
.remove(&keyspace_and_table_name)
.unwrap_or_default();

// unwrap of get() result: all column names in `partition_key` are at this
// point guaranteed to be present in `columns`. See the construction of `partition_key`
let pk_column_specs = partition_key
.iter()
.map(|column_name| (column_name, columns.get(column_name).unwrap().clone().typ))
.map(|(name, typ)| {
let table_spec = TableSpec::owned(
keyspace_and_table_name.0.clone(),
keyspace_and_table_name.1.clone(),
);
ColumnSpec::owned(name.to_owned(), typ, table_spec)
})
.collect();

result.insert(
keyspace_and_table_name,
Ok(Table {
columns,
partition_key,
clustering_key,
partitioner,
pk_column_specs,
}),
);
}
Expand Down
Loading
Loading