Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persist snapshots in parallel #25901

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 60 additions & 44 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use iox_query::frontend::reorg::ReorgPlanner;
use iox_query::QueryChunk;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::Mutex;
use parking_lot::RwLock;
use parquet::format::FileMetaData;
use schema::sort::SortKey;
Expand All @@ -34,6 +35,7 @@ use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::oneshot::{self, Receiver};
use tokio::task::JoinSet;

#[derive(Debug)]
pub struct QueryableBuffer {
Expand Down Expand Up @@ -267,58 +269,69 @@ impl QueryableBuffer {
wal_file_number.as_u64(),
);
// persist the individual files, building the snapshot as we go
let mut persisted_snapshot = PersistedSnapshot::new(
let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new(
persister.node_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
);
)));

let persist_jobs_empty = persist_jobs.is_empty();
let mut set = JoinSet::new();
for persist_job in persist_jobs {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
let table_id = persist_job.table_id;
let chunk_time = persist_job.chunk_time;
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
} = sort_dedupe_persist(
persist_job,
Arc::clone(&persister),
Arc::clone(&executor),
parquet_cache.clone(),
)
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

persisted_snapshot.add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
let persister = Arc::clone(&persister);
let executor = Arc::clone(&executor);
let persisted_snapshot = Arc::clone(&persisted_snapshot);
let parquet_cache = parquet_cache.clone();

set.spawn(async move {
let path = persist_job.path.to_string();
let database_id = persist_job.database_id;
let table_id = persist_job.table_id;
let chunk_time = persist_job.chunk_time;
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;

let SortDedupePersistSummary {
file_size_bytes,
file_meta_data,
} = sort_dedupe_persist(
persist_job,
persister,
executor,
parquet_cache
)
.await
.inspect_err(|error| {
error!(
%error,
debug = ?error,
"error during sort, deduplicate, and persist of buffer data as parquet"
);
})
// for now, we are still panicking in this case, see:
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");

persisted_snapshot.lock().add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
});
}

set.join_all().await;

// persist the snapshot file - only if persist jobs are present
// if persist_jobs is empty, then parquet file wouldn't have been
// written out, so it's desirable to not write empty snapshot file.
Expand Down Expand Up @@ -352,6 +365,9 @@ impl QueryableBuffer {
// force_snapshot) snapshot runs, snapshot_tracker will check if
// wal_periods are empty so it won't trigger a snapshot in the first
// place.
let persisted_snapshot = Arc::into_inner(persisted_snapshot)
.expect("Should only have one strong reference")
.into_inner();
if !persist_jobs_empty {
loop {
match persister.persist_snapshot(&persisted_snapshot).await {
Expand Down
Loading