Skip to content

Commit

Permalink
feat: persist snapshots in parallel
Browse files Browse the repository at this point in the history
This speeds up snapshot persistence by taking all of the persist jobs
and running them simultaneously on a JoinSet. With this we can speed
things up a bit by not waiting for each file to persist before the next
one can be persisted. Instead we now can run all the persisting at the
same time using the tokio runtime.

Closes #24658
  • Loading branch information
mgattozzi committed Jan 22, 2025
1 parent f3efa35 commit 3cd8914
Showing 1 changed file with 66 additions and 47 deletions.
113 changes: 66 additions & 47 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use iox_query::QueryChunk;
use iox_time::TimeProvider;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::Mutex;
use parking_lot::RwLock;
use parquet::format::FileMetaData;
use schema::sort::SortKey;
Expand All @@ -36,6 +37,7 @@ use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot::{self, Receiver};
use tokio::task::JoinSet;

#[derive(Debug)]
pub struct QueryableBuffer {
Expand Down Expand Up @@ -280,61 +282,72 @@ impl QueryableBuffer {
wal_file_number.as_u64(),
);
// persist the individual files, building the snapshot as we go
let mut persisted_snapshot = PersistedSnapshot::new(
let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new(
persister.writer_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
);
let mut cache_notifiers = vec![];
)));
let cache_notifiers = Arc::new(Mutex::new(Vec::new()));
let persist_jobs_empty = persist_jobs.is_empty();
let mut set = JoinSet::new();
for persist_job in persist_jobs {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
let table_id = persist_job.table_id;
let chunk_time = persist_job.chunk_time;
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
parquet_cache_rx,
} = sort_dedupe_persist(
persist_job,
Arc::clone(&persister),
Arc::clone(&executor),
parquet_cache.clone(),
)
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

cache_notifiers.push(parquet_cache_rx);
persisted_snapshot.add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
let persister = Arc::clone(&persister);
let executor = Arc::clone(&executor);
let persisted_snapshot = Arc::clone(&persisted_snapshot);
let cache_notifiers = Arc::clone(&cache_notifiers);
let parquet_cache = parquet_cache.clone();

set.spawn(async move {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
let table_id = persist_job.table_id;
let chunk_time = persist_job.chunk_time;
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
parquet_cache_rx,
} = sort_dedupe_persist(
persist_job,
persister,
executor,
parquet_cache
)
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

cache_notifiers.lock().push(parquet_cache_rx);
persisted_snapshot.lock().add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
});
}

set.join_all().await;

// persist the snapshot file - only if persist jobs are present
// if persist_jobs is empty, then parquet file wouldn't have been
// written out, so it's desirable to not write empty snapshot file.
Expand Down Expand Up @@ -368,6 +381,9 @@ impl QueryableBuffer {
// force_snapshot) snapshot runs, snapshot_tracker will check if
// wal_periods are empty so it won't trigger a snapshot in the first
// place.
let persisted_snapshot = Arc::into_inner(persisted_snapshot)
.expect("Should only have one strong reference")
.into_inner();
if !persist_jobs_empty {
loop {
match persister.persist_snapshot(&persisted_snapshot).await {
Expand All @@ -390,6 +406,9 @@ impl QueryableBuffer {
// on a background task to ensure that the cache has been populated before we clear
// the buffer
tokio::spawn(async move {
let cache_notifiers = Arc::into_inner(cache_notifiers)
.expect("Should only have one strong reference")
.into_inner();
// wait on the cache updates to complete if there is a cache:
for notifier in cache_notifiers.into_iter().flatten() {
let _ = notifier.await;
Expand Down

0 comments on commit 3cd8914

Please sign in to comment.