Skip to content

Commit

Permalink
feat: persist snapshots in parallel
Browse files Browse the repository at this point in the history
This speeds up snapshot persistence by taking all of the persist jobs
and running them simultaneously on a JoinSet. With this we can speed
things up a bit by not waiting for each file to persist before the next
one can be persisted. Instead we now can run all the persisting at the
same time using the tokio runtime.

Closes #24658
  • Loading branch information
mgattozzi committed Jan 27, 2025
1 parent 43e186d commit 2601ac6
Showing 1 changed file with 60 additions and 44 deletions.
104 changes: 60 additions & 44 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use iox_query::frontend::reorg::ReorgPlanner;
use iox_query::QueryChunk;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::Mutex;
use parking_lot::RwLock;
use parquet::format::FileMetaData;
use schema::sort::SortKey;
Expand All @@ -34,6 +35,7 @@ use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot::{self, Receiver};
use tokio::task::JoinSet;

#[derive(Debug)]
pub struct QueryableBuffer {
Expand Down Expand Up @@ -267,58 +269,69 @@ impl QueryableBuffer {
wal_file_number.as_u64(),
);
// persist the individual files, building the snapshot as we go
let mut persisted_snapshot = PersistedSnapshot::new(
let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new(
persister.node_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
);
)));

let persist_jobs_empty = persist_jobs.is_empty();
let mut set = JoinSet::new();
for persist_job in persist_jobs {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
let table_id = persist_job.table_id;
let chunk_time = persist_job.chunk_time;
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
} = sort_dedupe_persist(
persist_job,
Arc::clone(&persister),
Arc::clone(&executor),
parquet_cache.clone(),
)
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

persisted_snapshot.add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
let persister = Arc::clone(&persister);
let executor = Arc::clone(&executor);
let persisted_snapshot = Arc::clone(&persisted_snapshot);
let parquet_cache = parquet_cache.clone();

set.spawn(async move {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
let table_id = persist_job.table_id;
let chunk_time = persist_job.chunk_time;
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
} = sort_dedupe_persist(
persist_job,
persister,
executor,
parquet_cache
)
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

persisted_snapshot.lock().add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
});
}

set.join_all().await;

// persist the snapshot file - only if persist jobs are present
// if persist_jobs is empty, then parquet file wouldn't have been
// written out, so it's desirable to not write empty snapshot file.
Expand Down Expand Up @@ -352,6 +365,9 @@ impl QueryableBuffer {
// force_snapshot) snapshot runs, snapshot_tracker will check if
// wal_periods are empty so it won't trigger a snapshot in the first
// place.
let persisted_snapshot = Arc::into_inner(persisted_snapshot)
.expect("Should only have one strong reference")
.into_inner();
if !persist_jobs_empty {
loop {
match persister.persist_snapshot(&persisted_snapshot).await {
Expand Down

0 comments on commit 2601ac6

Please sign in to comment.