Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statement: unpub StatementConfig #772

Merged
merged 3 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions scylla/src/statement/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ pub use crate::frame::request::batch::BatchType;
pub struct Batch {
pub(crate) config: StatementConfig,

// TODO: Move this after #701 is fixed
retry_policy: Option<Arc<dyn RetryPolicy>>,

pub statements: Vec<BatchStatement>,
batch_type: BatchType,
}
Expand Down Expand Up @@ -116,13 +113,13 @@ impl Batch {
/// Set the retry policy for this batch, overriding the one from execution profile if not None.
#[inline]
pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
self.retry_policy = retry_policy;
self.config.retry_policy = retry_policy;
}

/// Get the retry policy set for the batch.
#[inline]
pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
self.retry_policy.as_ref()
self.config.retry_policy.as_ref()
}

/// Sets the listener capable of listening what happens during query execution.
Expand Down Expand Up @@ -151,7 +148,6 @@ impl Default for Batch {
fn default() -> Self {
Self {
statements: Vec::new(),
retry_policy: None,
batch_type: BatchType::Logged,
config: Default::default(),
}
Expand Down
51 changes: 13 additions & 38 deletions scylla/src/statement/mod.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,35 @@
use std::{sync::Arc, time::Duration};

use crate::history::HistoryListener;
use crate::transport::execution_profile::ExecutionProfileHandle;
use crate::{history::HistoryListener, retry_policy::RetryPolicy};

pub mod batch;
pub mod prepared_statement;
pub mod query;

pub use crate::frame::types::{Consistency, SerialConsistency};

#[derive(Debug)]
pub struct StatementConfig {
pub consistency: Option<Consistency>,
pub serial_consistency: Option<Option<SerialConsistency>>,
#[derive(Debug, Clone, Default)]
pub(crate) struct StatementConfig {
pub(crate) consistency: Option<Consistency>,
pub(crate) serial_consistency: Option<Option<SerialConsistency>>,

pub is_idempotent: bool,
pub(crate) is_idempotent: bool,

pub tracing: bool,
pub timestamp: Option<i64>,
pub request_timeout: Option<Duration>,
pub(crate) tracing: bool,
pub(crate) timestamp: Option<i64>,
pub(crate) request_timeout: Option<Duration>,

pub history_listener: Option<Arc<dyn HistoryListener>>,
pub(crate) history_listener: Option<Arc<dyn HistoryListener>>,

pub execution_profile_handle: Option<ExecutionProfileHandle>,
}

#[allow(clippy::derivable_impls)]
impl Default for StatementConfig {
fn default() -> Self {
Self {
consistency: Default::default(),
serial_consistency: None,
is_idempotent: false,
tracing: false,
timestamp: None,
request_timeout: None,
history_listener: None,
execution_profile_handle: None,
}
}
}

impl Clone for StatementConfig {
fn clone(&self) -> Self {
Self {
history_listener: self.history_listener.clone(),
execution_profile_handle: self.execution_profile_handle.clone(),
..*self
}
}
pub(crate) execution_profile_handle: Option<ExecutionProfileHandle>,
pub(crate) retry_policy: Option<Arc<dyn RetryPolicy>>,
}

impl StatementConfig {
/// Determines the consistency of a query
#[must_use]
pub fn determine_consistency(&self, default_consistency: Consistency) -> Consistency {
pub(crate) fn determine_consistency(&self, default_consistency: Consistency) -> Consistency {
self.consistency.unwrap_or(default_consistency)
}
}
10 changes: 2 additions & 8 deletions scylla/src/statement/prepared_statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ pub struct PreparedStatement {
pub(crate) config: StatementConfig,
pub prepare_tracing_ids: Vec<Uuid>,

// TODO: Move this after #701 is fixed
retry_policy: Option<Arc<dyn RetryPolicy>>,

id: Bytes,
shared: Arc<PreparedStatementSharedData>,
page_size: Option<i32>,
Expand All @@ -45,7 +42,6 @@ impl Clone for PreparedStatement {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
retry_policy: self.retry_policy.clone(),
prepare_tracing_ids: Vec::new(),
id: self.id.clone(),
shared: self.shared.clone(),
Expand All @@ -62,7 +58,6 @@ impl PreparedStatement {
is_lwt: bool,
metadata: PreparedMetadata,
statement: String,
retry_policy: Option<Arc<dyn RetryPolicy>>,
page_size: Option<i32>,
config: StatementConfig,
) -> Self {
Expand All @@ -72,7 +67,6 @@ impl PreparedStatement {
metadata,
statement,
}),
retry_policy,
prepare_tracing_ids: Vec::new(),
page_size,
config,
Expand Down Expand Up @@ -303,13 +297,13 @@ impl PreparedStatement {
/// Set the retry policy for this statement, overriding the one from execution profile if not None.
#[inline]
pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
self.retry_policy = retry_policy;
self.config.retry_policy = retry_policy;
}

/// Get the retry policy set for the statement.
#[inline]
pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
self.retry_policy.as_ref()
self.config.retry_policy.as_ref()
}

/// Sets the listener capable of listening what happens during query execution.
Expand Down
7 changes: 2 additions & 5 deletions scylla/src/statement/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub struct Query {
pub(crate) config: StatementConfig,

// TODO: Move this after #701 is fixed
retry_policy: Option<Arc<dyn RetryPolicy>>,

pub contents: String,
page_size: Option<i32>,
}
Expand All @@ -25,7 +23,6 @@ impl Query {
pub fn new(query_text: impl Into<String>) -> Self {
Self {
contents: query_text.into(),
retry_policy: None,
page_size: None,
config: Default::default(),
}
Expand Down Expand Up @@ -131,13 +128,13 @@ impl Query {
/// Set the retry policy for this statement, overriding the one from execution profile if not None.
#[inline]
pub fn set_retry_policy(&mut self, retry_policy: Option<Arc<dyn RetryPolicy>>) {
self.retry_policy = retry_policy;
self.config.retry_policy = retry_policy;
}

/// Get the retry policy set for the statement.
#[inline]
pub fn get_retry_policy(&self) -> Option<&Arc<dyn RetryPolicy>> {
self.retry_policy.as_ref()
self.config.retry_policy.as_ref()
}

/// Sets the listener capable of listening what happens during query execution.
Expand Down
2 changes: 0 additions & 2 deletions scylla/src/transport/caching_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,11 @@ where

if let Some(raw) = self.cache.get(&query.contents) {
let page_size = query.get_page_size();
let retry_policy = query.get_retry_policy().cloned();
let mut stmt = PreparedStatement::new(
raw.id.clone(),
raw.is_confirmed_lwt,
raw.metadata.clone(),
query.contents,
retry_policy,
page_size,
query.config,
);
Expand Down
1 change: 0 additions & 1 deletion scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ impl Connection {
.prepared_flags_contain_lwt_mark(p.prepared_metadata.flags as u32),
p.prepared_metadata,
query.contents.clone(),
query.get_retry_policy().cloned(),
query.get_page_size(),
query.config.clone(),
),
Expand Down
12 changes: 4 additions & 8 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::frame::types::LegacyConsistency;
use crate::history;
use crate::history::HistoryListener;
use crate::prepared_statement::PartitionKeyDecoder;
use crate::retry_policy::RetryPolicy;
use crate::utils::pretty::{CommaSeparatedDisplayer, CqlValueDisplayer};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
Expand Down Expand Up @@ -675,7 +674,6 @@ impl Session {
.run_query(
statement_info,
&query.config,
query.get_retry_policy().map(|rp| &**rp),
execution_profile,
|node: Arc<Node>| async move { node.random_connection().await },
|connection: Arc<Connection>,
Expand Down Expand Up @@ -1029,7 +1027,6 @@ impl Session {
.run_query(
statement_info,
&prepared.config,
prepared.get_retry_policy().map(|rp| &**rp),
execution_profile,
|node: Arc<Node>| async move {
match token {
Expand Down Expand Up @@ -1246,7 +1243,6 @@ impl Session {
.run_query(
statement_info,
&batch.config,
batch.get_retry_policy().map(|rp| &**rp),
execution_profile,
|node: Arc<Node>| async move {
match first_value_token {
Expand Down Expand Up @@ -1530,12 +1526,10 @@ impl Session {
// On success this query's result is returned
// I tried to make this closures take a reference instead of an Arc but failed
// maybe once async closures get stabilized this can be fixed
#[allow(clippy::too_many_arguments)] // <-- remove this once retry policy is put into StatementConfig
async fn run_query<'a, ConnFut, QueryFut, ResT>(
&'a self,
statement_info: RoutingInfo<'a>,
statement_config: &'a StatementConfig,
statement_retry_policy: Option<&dyn RetryPolicy>,
execution_profile: Arc<ExecutionProfileInner>,
choose_connection: impl Fn(Arc<Node>) -> ConnFut,
do_query: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
Expand Down Expand Up @@ -1580,7 +1574,10 @@ impl Session {
}
}

let retry_policy = statement_retry_policy.unwrap_or(&*execution_profile.retry_policy);
let retry_policy = statement_config
.retry_policy
.as_deref()
.unwrap_or(&*execution_profile.retry_policy);

let speculative_policy = execution_profile.speculative_execution_policy.as_ref();

Expand Down Expand Up @@ -1858,7 +1855,6 @@ impl Session {
.run_query(
info,
&config,
None, // No specific retry policy needed for schema agreement
self.get_default_execution_profile_handle().access(),
|node: Arc<Node>| async move { node.random_connection().await },
do_query,
Expand Down