Skip to content

Commit

Permalink
refactor: clean up test helpers in influxdb3_write
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Jan 22, 2025
1 parent 57d5ab9 commit 51385c5
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 143 deletions.
215 changes: 76 additions & 139 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,145 +426,6 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision {
}
}

#[cfg(test)]
mod test_helpers {
use crate::{write_buffer::validator::WriteValidator, WriteBuffer};
use crate::{ChunkFilter, Precision};
use arrow::array::RecordBatch;
use data_types::NamespaceName;
use datafusion::prelude::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::{Gen1Duration, WriteBatch};
use iox_query::exec::IOxSessionContext;
use iox_time::Time;
use std::sync::Arc;

/// Helper trait for getting [`RecordBatch`]es from a [`WriteBuffer`] implementation in tests
#[async_trait::async_trait]
pub trait WriteBufferTester {
/// Get record batches for the given database and table, using the provided filter `Expr`s
async fn get_record_batches_filtered_unchecked(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
ctx: &IOxSessionContext,
) -> Vec<RecordBatch>;

/// Get record batches for the given database and table
async fn get_record_batches_unchecked(
&self,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch>;
}

#[async_trait::async_trait]
impl<T> WriteBufferTester for T
where
T: WriteBuffer,
{
async fn get_record_batches_filtered_unchecked(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
let db_schema = self
.catalog()
.db_schema(database_name)
.expect("database should exist");
let table_def = db_schema
.table_definition(table_name)
.expect("table should exist");
let filter =
ChunkFilter::new(&table_def, filters).expect("filter expressions should be valid");
let chunks = self
.get_table_chunks(db_schema, table_def, &filter, None, &ctx.inner().state())
.expect("should get query chunks");
let mut batches = vec![];
for chunk in chunks {
batches.extend(
chunk
.data()
.read_to_batches(chunk.schema(), ctx.inner())
.await,
);
}
batches
}

async fn get_record_batches_unchecked(
&self,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
self.get_record_batches_filtered_unchecked(database_name, table_name, &[], ctx)
.await
}
}

#[allow(dead_code)]
pub(crate) fn lp_to_write_batch(
catalog: Arc<Catalog>,
db_name: &'static str,
lp: &str,
) -> WriteBatch {
let db_name = NamespaceName::new(db_name).unwrap();
let result = WriteValidator::initialize(db_name.clone(), catalog, 0)
.unwrap()
.v1_parse_lines_and_update_schema(
lp,
false,
Time::from_timestamp_nanos(0),
Precision::Nanosecond,
)
.unwrap()
.convert_lines_to_buffer(Gen1Duration::new_5m());

result.valid_data
}
}

#[cfg(test)]
pub(crate) mod test_help {
use iox_query::exec::DedicatedExecutor;
use iox_query::exec::Executor;
use iox_query::exec::ExecutorConfig;
use object_store::memory::InMemory;
use object_store::ObjectStore;
use parquet_file::storage::ParquetStorage;
use parquet_file::storage::StorageId;
use std::num::NonZeroUsize;
use std::sync::Arc;

pub(crate) fn make_exec() -> Arc<Executor> {
let metrics = Arc::new(metric::Registry::default());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

let parquet_store = ParquetStorage::new(
Arc::clone(&object_store),
StorageId::from("test_exec_storage"),
);
Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
},
DedicatedExecutor::new_testing(),
))
}
}

/// A derived set of filters that are used to prune data in the buffer when serving queries
#[derive(Debug, Default)]
pub struct ChunkFilter {
Expand Down Expand Up @@ -752,6 +613,82 @@ impl ChunkFilter {
}
}

pub mod test_helpers {
use crate::ChunkFilter;
use crate::WriteBuffer;
use arrow::array::RecordBatch;
use datafusion::prelude::Expr;
use iox_query::exec::IOxSessionContext;

/// Helper trait for getting [`RecordBatch`]es from a [`WriteBuffer`] implementation in tests
#[async_trait::async_trait]
pub trait WriteBufferTester {
/// Get record batches for the given database and table, using the provided filter `Expr`s
async fn get_record_batches_filtered_unchecked(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
ctx: &IOxSessionContext,
) -> Vec<RecordBatch>;

/// Get record batches for the given database and table
async fn get_record_batches_unchecked(
&self,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch>;
}

#[async_trait::async_trait]
impl<T> WriteBufferTester for T
where
T: WriteBuffer,
{
async fn get_record_batches_filtered_unchecked(
&self,
database_name: &str,
table_name: &str,
filters: &[Expr],
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
let db_schema = self
.catalog()
.db_schema(database_name)
.expect("database should exist");
let table_def = db_schema
.table_definition(table_name)
.expect("table should exist");
let filter =
ChunkFilter::new(&table_def, filters).expect("filter expressions should be valid");
let chunks = self
.get_table_chunks(db_schema, table_def, &filter, None, &ctx.inner().state())
.expect("should get query chunks");
let mut batches = vec![];
for chunk in chunks {
batches.extend(
chunk
.data()
.read_to_batches(chunk.schema(), ctx.inner())
.await,
);
}
batches
}

async fn get_record_batches_unchecked(
&self,
database_name: &str,
table_name: &str,
ctx: &IOxSessionContext,
) -> Vec<RecordBatch> {
self.get_record_batches_filtered_unchecked(database_name, table_name, &[], ctx)
.await
}
}
}

#[cfg(test)]
mod tests {
use influxdb3_catalog::catalog::CatalogSequenceNumber;
Expand Down
35 changes: 31 additions & 4 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,8 @@ async fn check_mem_and_force_snapshot(
#[cfg(test)]
#[allow(clippy::await_holding_lock)]
mod tests {
use std::num::NonZeroUsize;

use super::*;
use crate::paths::{CatalogFilePath, SnapshotInfoFilePath};
use crate::persister::Persister;
Expand All @@ -879,13 +881,14 @@ mod tests {
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bytes::Bytes;
use datafusion_util::config::register_iox_object_store;
use executor::DedicatedExecutor;
use futures_util::StreamExt;
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::{DbId, ParquetFileId};
use influxdb3_test_helpers::object_store::RequestCountedObjectStore;
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::exec::IOxSessionContext;
use iox_query::exec::{Executor, ExecutorConfig, IOxSessionContext};
use iox_time::{MockProvider, Time};
use metric::{Attributes, Metric, U64Counter};
use metrics::{
Expand All @@ -895,6 +898,7 @@ mod tests {
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use parquet_file::storage::{ParquetStorage, StorageId};
use pretty_assertions::assert_eq;

#[test]
Expand Down Expand Up @@ -948,7 +952,7 @@ mod tests {
last_cache,
distinct_cache,
time_provider: Arc::clone(&time_provider),
executor: crate::test_help::make_exec(),
executor: make_exec(),
wal_config: WalConfig::test_config(),
parquet_cache: Some(Arc::clone(&parquet_cache)),
metric_registry: Default::default(),
Expand Down Expand Up @@ -1034,7 +1038,7 @@ mod tests {
last_cache,
distinct_cache,
time_provider,
executor: crate::test_help::make_exec(),
executor: make_exec(),
wal_config: WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
Expand Down Expand Up @@ -3034,7 +3038,7 @@ mod tests {
last_cache,
distinct_cache,
time_provider: Arc::clone(&time_provider),
executor: crate::test_help::make_exec(),
executor: make_exec(),
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metric_registry),
Expand Down Expand Up @@ -3124,4 +3128,27 @@ mod tests {
}
paths
}

fn make_exec() -> Arc<Executor> {
let metrics = Arc::new(metric::Registry::default());
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());

let parquet_store = ParquetStorage::new(
Arc::clone(&object_store),
StorageId::from("test_exec_storage"),
);
Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
},
DedicatedExecutor::new_testing(),
))
}
}

0 comments on commit 51385c5

Please sign in to comment.