From 2601ac6adb664cc93086cb2877cb08a351b4e11a Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Wed, 22 Jan 2025 14:21:51 -0500 Subject: [PATCH] feat: persist snapshots in parallel This speeds up snapshot persistence by taking all of the persist jobs and running them simultaneously on a JoinSet. With this we can speed things up a bit by not waiting for each file to persist before the next one can be persisted. Instead we now can run all the persisting at the same time using the tokio runtime. Closes #24658 --- .../src/write_buffer/queryable_buffer.rs | 104 ++++++++++-------- 1 file changed, 60 insertions(+), 44 deletions(-) diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 43dea717b82..1a8d2960320 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -26,6 +26,7 @@ use iox_query::frontend::reorg::ReorgPlanner; use iox_query::QueryChunk; use object_store::path::Path; use observability_deps::tracing::{error, info}; +use parking_lot::Mutex; use parking_lot::RwLock; use parquet::format::FileMetaData; use schema::sort::SortKey; @@ -34,6 +35,7 @@ use std::any::Any; use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot::{self, Receiver}; +use tokio::task::JoinSet; #[derive(Debug)] pub struct QueryableBuffer { @@ -267,58 +269,69 @@ impl QueryableBuffer { wal_file_number.as_u64(), ); // persist the individual files, building the snapshot as we go - let mut persisted_snapshot = PersistedSnapshot::new( + let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( persister.node_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, snapshot_details.last_wal_sequence_number, catalog.sequence_number(), - ); + ))); + let persist_jobs_empty = persist_jobs.is_empty(); + let mut set = JoinSet::new(); for persist_job in persist_jobs { - let path = persist_job.path.to_string(); - let database_id = persist_job.database_id; - let table_id = persist_job.table_id; - let chunk_time = persist_job.chunk_time; - let min_time = persist_job.timestamp_min_max.min; - let max_time = persist_job.timestamp_min_max.max; - - let SortDedupePersistSummary { - file_size_bytes, - file_meta_data, - } = sort_dedupe_persist( - persist_job, - Arc::clone(&persister), - Arc::clone(&executor), - parquet_cache.clone(), - ) - .await - .inspect_err(|error| { - error!( - %error, - debug = ?error, - "error during sort, deduplicate, and persist of buffer data as parquet" - ); - }) - // for now, we are still panicking in this case, see: - // https://github.com/influxdata/influxdb/issues/25676 - // https://github.com/influxdata/influxdb/issues/25677 - .expect("sort, deduplicate, and persist buffer data as parquet"); - - persisted_snapshot.add_parquet_file( - database_id, - table_id, - ParquetFile { - id: ParquetFileId::new(), - path, - size_bytes: file_size_bytes, - row_count: file_meta_data.num_rows as u64, - chunk_time, - min_time, - max_time, - }, - ) + let persister = Arc::clone(&persister); + let executor = Arc::clone(&executor); + let persisted_snapshot = Arc::clone(&persisted_snapshot); + let parquet_cache = parquet_cache.clone(); + + set.spawn(async move { + let path = persist_job.path.to_string(); + let database_id = persist_job.database_id; + let table_id = persist_job.table_id; + let chunk_time = persist_job.chunk_time; + let min_time = persist_job.timestamp_min_max.min; + let max_time = persist_job.timestamp_min_max.max; + + let SortDedupePersistSummary { + file_size_bytes, + file_meta_data, + } = sort_dedupe_persist( + persist_job, + persister, + executor, + parquet_cache + ) + .await + .inspect_err(|error| { + error!( + %error, + debug = ?error, + "error during sort, deduplicate, and persist of buffer data as parquet" + ); + }) + // for now, we are still panicking in this case, see: + // https://github.com/influxdata/influxdb/issues/25676 + // https://github.com/influxdata/influxdb/issues/25677 + .expect("sort, deduplicate, and persist buffer data as parquet"); + + persisted_snapshot.lock().add_parquet_file( + database_id, + table_id, + ParquetFile { + id: ParquetFileId::new(), + path, + size_bytes: file_size_bytes, + row_count: file_meta_data.num_rows as u64, + chunk_time, + min_time, + max_time, + }, + ) + }); } + set.join_all().await; + // persist the snapshot file - only if persist jobs are present // if persist_jobs is empty, then parquet file wouldn't have been // written out, so it's desirable to not write empty snapshot file. @@ -352,6 +365,9 @@ impl QueryableBuffer { // force_snapshot) snapshot runs, snapshot_tracker will check if // wal_periods are empty so it won't trigger a snapshot in the first // place. + let persisted_snapshot = Arc::into_inner(persisted_snapshot) + .expect("Should only have one strong reference") + .into_inner(); if !persist_jobs_empty { loop { match persister.persist_snapshot(&persisted_snapshot).await {