From b6b35e28b5c39814fa55a1880507b096df31d604 Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Wed, 22 Jan 2025 15:09:40 -0500 Subject: [PATCH] feat: add no_sync write_lp param for fast writes In cases where a user does not need the guarantees that data is persisted to the WAL on write and needs faster ingest speed then the no_sync param added in this commit are what they need. Rather than waiting on a sync to the WAL we write to the buffer without confirming that writes have made it to the WAL. The upside to this is that they can ingest data faster, but there is a small risk that between writing the data and it eventually being written to object storage, that the server crashes and it's irrevocably lost. Also if the write to the WAL fails, then at most the user will not get a failed response code they can handle. The data will still be in the buffer, but will not be durable until persisted as a parquet file in this case. However, in many cases that might be acceptable. This commit expands on what's possible so that the user can use InfluxDB Core the way that works best for their workload. Note that this feature is only added for the /api/v3/write_lp endpoint. The legacy endpoints for writing can take the parameter, but won't do anything with it at all. Closes #25597 --- influxdb3/tests/server/write.rs | 51 +++ influxdb3_processing_engine/src/lib.rs | 6 + influxdb3_server/src/http.rs | 4 + influxdb3_server/src/query_executor/mod.rs | 5 + influxdb3_wal/src/lib.rs | 4 +- influxdb3_wal/src/object_store.rs | 488 ++------------------- influxdb3_write/src/lib.rs | 1 + influxdb3_write/src/write_buffer/mod.rs | 50 ++- 8 files changed, 147 insertions(+), 462 deletions(-) diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs index 9d5810bbb8c..45811ed3ce7 100644 --- a/influxdb3/tests/server/write.rs +++ b/influxdb3/tests/server/write.rs @@ -342,3 +342,54 @@ async fn writes_with_different_schema_should_fail() { "the request should hae failed with an API Error" ); } +#[tokio::test] +/// Check that the no_sync param can be used on any endpoint. However, this only means that serde +/// will parse it just fine. It is only able to be used in the v3 endpoint and will +/// default to requiring the WAL to synce before returning. +async fn api_no_sync_param() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let v1_write_url = format!("{base}/write", base = server.client_addr()); + let v2_write_url = format!("{base}/api/v2/write", base = server.client_addr()); + let v3_write_url = format!("{base}/api/v3/write_lp", base = server.client_addr()); + let write_body = "cpu,host=a usage=0.5"; + + let params = vec![("db", "foo"), ("no_sync", "true")]; + let resp = client + .post(&v1_write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send /write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(status, StatusCode::NO_CONTENT); + + let params = vec![("bucket", "foo"), ("no_sync", "true")]; + let resp = client + .post(&v2_write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send api/v2/write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(status, StatusCode::NO_CONTENT); + + let params = vec![("db", "foo"), ("no_sync", "true")]; + let resp = client + .post(&v3_write_url) + .query(¶ms) + .body(write_body) + .send() + .await + .expect("send api/v3/write request"); + let status = resp.status(); + let body = resp.text().await.expect("response body as text"); + println!("Response [{status}]:\n{body}"); + assert_eq!(status, StatusCode::NO_CONTENT); +} diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 28f5083f72e..bff87d8d01d 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -885,6 +885,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -960,6 +961,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1020,6 +1022,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1088,6 +1091,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1182,6 +1186,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; @@ -1242,6 +1247,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await?; diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index d0aeffbce0c..66778cfeff9 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -518,6 +518,7 @@ where default_time, params.accept_partial, params.precision, + params.no_sync, ) .await?; @@ -1615,6 +1616,8 @@ pub(crate) struct WriteParams { pub(crate) accept_partial: bool, #[serde(default)] pub(crate) precision: Precision, + #[serde(default)] + pub(crate) no_sync: bool, } impl From for WriteParams { @@ -1624,6 +1627,7 @@ impl From for WriteParams { // legacy behaviour was to not accept partial: accept_partial: false, precision: legacy.precision.into(), + no_sync: false, } } } diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 664877bdd2b..4487c1894cc 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -899,6 +899,7 @@ mod tests { Time::from_timestamp_nanos(time), false, influxdb3_write::Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1010,6 +1011,7 @@ mod tests { Time::from_timestamp_nanos(time), false, influxdb3_write::Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1075,6 +1077,7 @@ mod tests { Time::from_timestamp_nanos(time), false, influxdb3_write::Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1124,6 +1127,7 @@ mod tests { Time::from_timestamp_nanos(time), false, influxdb3_write::Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1189,6 +1193,7 @@ mod tests { Time::from_timestamp_nanos(time), false, influxdb3_write::Precision::Nanosecond, + false, ) .await .unwrap(); diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 188e83d7ca6..1e94999d15f 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -70,8 +70,8 @@ pub type Result = std::result::Result; #[async_trait] pub trait Wal: Debug + Send + Sync + 'static { - /// Buffer into a single larger operation in memory. Returns before the operation is persisted. - async fn buffer_op_unconfirmed(&self, op: WalOp) -> Result<(), Error>; + /// Buffer writes ops into the buffer, but returns before the operation is persisted to the WAL. + async fn write_ops_unconfirmed(&self, op: Vec) -> Result<(), Error>; /// Writes the ops into the buffer and waits until the WAL file is persisted. When this returns /// the operations are durable in the configured object store and the file notifier has been diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index d5dca73036b..6055f62d9d0 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -229,12 +229,12 @@ impl WalObjectStore { } /// Buffer into a single larger operation in memory. Returns before the operation is persisted. - async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> { + async fn write_ops_unconfirmed(&self, op: Vec) -> crate::Result<(), crate::Error> { self.flush_buffer .lock() .await .wal_buffer - .buffer_op_unconfirmed(op) + .write_ops_unconfirmed(op) } /// Writes the op into the buffer and waits until the WAL file is persisted. When this returns @@ -513,8 +513,8 @@ async fn load_all_wal_file_paths( #[async_trait::async_trait] impl Wal for WalObjectStore { - async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> { - self.buffer_op_unconfirmed(op).await + async fn write_ops_unconfirmed(&self, op: Vec) -> crate::Result<(), crate::Error> { + self.write_ops_unconfirmed(op).await } async fn write_ops(&self, ops: Vec) -> crate::Result<(), crate::Error> { @@ -719,36 +719,38 @@ pub enum WriteResult { } impl WalBuffer { - fn buffer_op_unconfirmed(&mut self, op: WalOp) -> crate::Result<(), crate::Error> { + fn write_ops_unconfirmed(&mut self, ops: Vec) -> crate::Result<(), crate::Error> { if self.op_count >= self.op_limit { return Err(crate::Error::BufferFull(self.op_count)); } - match op { - WalOp::Write(new_write_batch) => { - let db_name = Arc::clone(&new_write_batch.database_name); - - // insert the database write batch or add to existing - let write_batch = - self.database_to_write_batch - .entry(db_name) - .or_insert_with(|| WriteBatch { - database_id: new_write_batch.database_id, - database_name: new_write_batch.database_name, - table_chunks: Default::default(), - min_time_ns: i64::MAX, - max_time_ns: i64::MIN, - }); - write_batch.add_write_batch( - new_write_batch.table_chunks, - new_write_batch.min_time_ns, - new_write_batch.max_time_ns, - ); - } - WalOp::Catalog(catalog_batch) => { - self.catalog_batches.push(catalog_batch); + for op in ops { + match op { + WalOp::Write(new_write_batch) => { + let db_name = Arc::clone(&new_write_batch.database_name); + + // insert the database write batch or add to existing + let write_batch = + self.database_to_write_batch + .entry(db_name) + .or_insert_with(|| WriteBatch { + database_id: new_write_batch.database_id, + database_name: new_write_batch.database_name, + table_chunks: Default::default(), + min_time_ns: i64::MAX, + max_time_ns: i64::MIN, + }); + write_batch.add_write_batch( + new_write_batch.table_chunks, + new_write_batch.min_time_ns, + new_write_batch.max_time_ns, + ); + } + WalOp::Catalog(catalog_batch) => { + self.catalog_batches.push(catalog_batch); + } + WalOp::Noop(_) => {} } - WalOp::Noop(_) => {} } Ok(()) @@ -760,9 +762,7 @@ impl WalBuffer { response: oneshot::Sender, ) -> crate::Result<(), crate::Error> { self.write_op_responses.push(response); - for op in ops { - self.buffer_op_unconfirmed(op)?; - } + self.write_ops_unconfirmed(ops)?; Ok(()) } @@ -894,8 +894,7 @@ impl WalFileRemoverInner { mod tests { use super::*; use crate::{ - create, Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, - TableChunks, + Field, FieldData, Gen1Duration, Row, SnapshotSequenceNumber, TableChunk, TableChunks, }; use async_trait::async_trait; use indexmap::IndexMap; @@ -905,419 +904,6 @@ mod tests { use std::any::Any; use tokio::sync::oneshot::Receiver; - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn write_flush_delete_and_load() { - let time_provider: Arc = - Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let object_store: Arc = Arc::new(InMemory::new()); - let notifier: Arc = Arc::new(TestNotifier::default()); - let wal_config = WalConfig { - max_write_buffer_size: 100, - flush_interval: Duration::from_secs(1), - snapshot_size: 2, - gen1_duration: Gen1Duration::new_1m(), - }; - let paths = vec![]; - let wal = WalObjectStore::new_without_replay( - Arc::clone(&time_provider), - Arc::clone(&object_store), - "my_host", - Arc::clone(¬ifier), - wal_config, - None, - None, - &paths, - 1, - ); - - let db_name: Arc = "db1".into(); - - let op1 = WalOp::Write(WriteBatch { - database_id: DbId::from(0), - database_name: Arc::clone(&db_name), - table_chunks: IndexMap::from([( - TableId::from(0), - TableChunks { - min_time: 1, - max_time: 3, - chunk_time_to_chunk: HashMap::from([( - 0, - TableChunk { - rows: vec![ - Row { - time: 1, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(1), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(1), - }, - ], - }, - Row { - time: 3, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(2), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(3), - }, - ], - }, - ], - }, - )]), - }, - )]) - .into(), - min_time_ns: 1, - max_time_ns: 3, - }); - wal.buffer_op_unconfirmed(op1.clone()).await.unwrap(); - - let op2 = WalOp::Write(WriteBatch { - database_id: DbId::from(0), - database_name: Arc::clone(&db_name), - table_chunks: IndexMap::from([( - TableId::from(0), - TableChunks { - min_time: 12, - max_time: 12, - chunk_time_to_chunk: HashMap::from([( - 0, - TableChunk { - rows: vec![Row { - time: 12, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(62_000000000), - }, - ], - }], - }, - )]), - }, - )]) - .into(), - min_time_ns: 62_000000000, - max_time_ns: 62_000000000, - }); - wal.buffer_op_unconfirmed(op2.clone()).await.unwrap(); - - // create wal file 1 - let ret = wal.flush_buffer(false).await; - assert!(ret.is_none()); - let file_1_contents = create::wal_contents( - (1, 62_000_000_000, 1), - [WalOp::Write(WriteBatch { - database_id: DbId::from(0), - database_name: "db1".into(), - table_chunks: IndexMap::from([( - TableId::from(0), - TableChunks { - min_time: 1, - max_time: 12, - chunk_time_to_chunk: HashMap::from([( - 0, - TableChunk { - rows: vec![ - Row { - time: 1, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(1), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(1), - }, - ], - }, - Row { - time: 3, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(2), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(3), - }, - ], - }, - Row { - time: 12, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(62_000000000), - }, - ], - }, - ], - }, - )]), - }, - )]) - .into(), - min_time_ns: 1, - max_time_ns: 62_000000000, - })], - ); - - // create wal file 2 - wal.buffer_op_unconfirmed(op2.clone()).await.unwrap(); - assert!(wal.flush_buffer(false).await.is_none()); - - let file_2_contents = create::wal_contents( - (62_000_000_000, 62_000_000_000, 2), - [WalOp::Write(WriteBatch { - database_id: DbId::from(0), - database_name: "db1".into(), - table_chunks: IndexMap::from([( - TableId::from(0), - TableChunks { - min_time: 12, - max_time: 12, - chunk_time_to_chunk: HashMap::from([( - 0, - TableChunk { - rows: vec![Row { - time: 12, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(62_000000000), - }, - ], - }], - }, - )]), - }, - )]) - .into(), - min_time_ns: 62_000000000, - max_time_ns: 62_000000000, - })], - ); - - // before we trigger a snapshot, test replay with a new wal and notifier - let replay_notifier: Arc = Arc::new(TestNotifier::default()); - let paths = vec![]; - let replay_wal = WalObjectStore::new_without_replay( - Arc::clone(&time_provider), - Arc::clone(&object_store), - "my_host", - Arc::clone(&replay_notifier), - wal_config, - None, - None, - &paths, - 1, - ); - assert_eq!( - replay_wal.load_existing_wal_file_paths( - None, - &[ - Path::from("my_host/wal/00000000001.wal"), - Path::from("my_host/wal/00000000002.wal") - ] - ), - vec![ - Path::from("my_host/wal/00000000001.wal"), - Path::from("my_host/wal/00000000002.wal") - ] - ); - replay_wal - .replay( - None, - &[ - Path::from("my_host/wal/00000000001.wal"), - Path::from("my_host/wal/00000000002.wal"), - ], - ) - .await - .unwrap(); - let replay_notifier = replay_notifier - .as_any() - .downcast_ref::() - .unwrap(); - - { - let notified_writes = replay_notifier.notified_writes.lock(); - let notified_refs = notified_writes - .iter() - .map(|x| x.as_ref()) - .collect::>(); - assert_eq!(notified_refs, vec![&file_1_contents, &file_2_contents]); - } - assert!(replay_notifier.snapshot_details.lock().is_none()); - - // create wal file 3, which should trigger a snapshot - let op3 = WalOp::Write(WriteBatch { - database_id: DbId::from(0), - database_name: Arc::clone(&db_name), - table_chunks: IndexMap::from([( - TableId::from(0), - TableChunks { - min_time: 26, - max_time: 26, - chunk_time_to_chunk: HashMap::from([( - 0, - TableChunk { - rows: vec![Row { - time: 26, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(128_000000000), - }, - ], - }], - }, - )]), - }, - )]) - .into(), - min_time_ns: 128_000000000, - max_time_ns: 128_000000000, - }); - wal.buffer_op_unconfirmed(op3.clone()).await.unwrap(); - - let (snapshot_done, snapshot_info, snapshot_permit) = - wal.flush_buffer(false).await.unwrap(); - let expected_info = SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(1), - end_time_marker: 120000000000, - first_wal_sequence_number: WalFileSequenceNumber(1), - last_wal_sequence_number: WalFileSequenceNumber(2), - forced: false, - }; - assert_eq!(expected_info, snapshot_info); - snapshot_done.await.unwrap(); - - let file_3_contents = create::wal_contents_with_snapshot( - (128_000_000_000, 128_000_000_000, 3), - [WalOp::Write(WriteBatch { - database_id: DbId::from(0), - database_name: "db1".into(), - table_chunks: IndexMap::from([( - TableId::from(0), - TableChunks { - min_time: 26, - max_time: 26, - chunk_time_to_chunk: HashMap::from([( - 0, - TableChunk { - rows: vec![Row { - time: 26, - fields: vec![ - Field { - id: ColumnId::from(0), - value: FieldData::Integer(3), - }, - Field { - id: ColumnId::from(1), - value: FieldData::Timestamp(128_000000000), - }, - ], - }], - }, - )]), - }, - )]) - .into(), - min_time_ns: 128_000000000, - max_time_ns: 128_000000000, - })], - SnapshotDetails { - snapshot_sequence_number: SnapshotSequenceNumber::new(1), - end_time_marker: 120_000000000, - first_wal_sequence_number: WalFileSequenceNumber(1), - last_wal_sequence_number: WalFileSequenceNumber(2), - forced: false, - }, - ); - - let notifier = notifier.as_any().downcast_ref::().unwrap(); - - { - let notified_writes = notifier.notified_writes.lock(); - let notified_refs = notified_writes - .iter() - .map(|x| x.as_ref()) - .collect::>(); - let expected_writes = vec![&file_1_contents, &file_2_contents, &file_3_contents]; - assert_eq!(notified_refs, expected_writes); - let details = notifier.snapshot_details.lock(); - assert_eq!(details.unwrap(), expected_info); - } - - wal.remove_snapshot_wal_files(snapshot_info, snapshot_permit) - .await; - - // test that replay now only has file 3 - let replay_notifier: Arc = Arc::new(TestNotifier::default()); - let paths = vec![]; - let replay_wal = WalObjectStore::new_without_replay( - Arc::clone(&time_provider), - object_store, - "my_host", - Arc::clone(&replay_notifier), - wal_config, - None, - None, - &paths, - 1, - ); - assert_eq!( - replay_wal - .load_existing_wal_file_paths(None, &[Path::from("my_host/wal/00000000003.wal")]), - vec![Path::from("my_host/wal/00000000003.wal")] - ); - replay_wal - .replay(None, &[Path::from("my_host/wal/00000000003.wal")]) - .await - .unwrap(); - let replay_notifier = replay_notifier - .as_any() - .downcast_ref::() - .unwrap(); - let notified_writes = replay_notifier.notified_writes.lock(); - let notified_refs = notified_writes - .iter() - .map(|x| x.as_ref()) - .collect::>(); - assert_eq!(notified_refs, vec![&file_3_contents]); - let snapshot_details = replay_notifier.snapshot_details.lock(); - assert_eq!(*snapshot_details, file_3_contents.snapshot); - } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn flush_for_empty_buffer_skips_notify() { let time_provider: Arc = @@ -1394,7 +980,7 @@ mod tests { flush_buffer .wal_buffer - .buffer_op_unconfirmed(WalOp::Write(WriteBatch { + .write_ops_unconfirmed(vec![WalOp::Write(WriteBatch { database_id: DbId::from(0), database_name: "db1".into(), table_chunks: IndexMap::from([( @@ -1425,7 +1011,7 @@ mod tests { .into(), min_time_ns: 128_000000000, max_time_ns: 148_000000000, - })) + })]) .unwrap(); // wal buffer not empty, force snapshot set => snapshot (empty wal buffer @@ -1452,7 +1038,7 @@ mod tests { // not snapshot flush_buffer .wal_buffer - .buffer_op_unconfirmed(WalOp::Write(WriteBatch { + .write_ops_unconfirmed(vec![WalOp::Write(WriteBatch { database_id: DbId::from(0), database_name: "db1".into(), table_chunks: IndexMap::from([( @@ -1483,7 +1069,7 @@ mod tests { .into(), min_time_ns: 128_000000000, max_time_ns: 148_000000000, - })) + })]) .unwrap(); let (wal_contents, _, maybe_snapshot) = flush_buffer diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index e5658262c61..0cfb04a90cd 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -100,6 +100,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static { ingest_time: Time, accept_partial: bool, precision: Precision, + no_sync: bool, ) -> write_buffer::Result; /// Returns the database schema provider diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 4baac86d32f..f0685eed88a 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -45,7 +45,7 @@ use metric::Registry; use metrics::WriteMetrics; use object_store::path::Path as ObjPath; use object_store::{ObjectMeta, ObjectStore}; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::{debug, error, warn}; use parquet_file::storage::ParquetExecInput; use queryable_buffer::QueryableBufferArgs; use schema::Schema; @@ -265,6 +265,7 @@ impl WriteBufferImpl { ingest_time: Time, accept_partial: bool, precision: Precision, + no_sync: bool, ) -> Result { debug!("write_lp to {} in writebuffer", db_name); @@ -286,12 +287,16 @@ impl WriteBufferImpl { } ops.push(WalOp::Write(result.valid_data)); - // write to the wal. Behind the scenes the ops get buffered in memory and once a second (or - // whatever the configured wal flush interval is set to) the buffer is flushed and all the - // data is persisted into a single wal file in the configured object store. Then the - // contents are sent to the configured notifier, which in this case is the queryable buffer. - // Thus, after this returns, the data is both durable and queryable. - self.wal.write_ops(ops).await?; + if no_sync { + self.wal.write_ops_unconfirmed(ops).await?; + } else { + // write to the wal. Behind the scenes the ops get buffered in memory and once a second (or + // whatever the configured wal flush interval is set to) the buffer is flushed and all the + // data is persisted into a single wal file in the configured object store. Then the + // contents are sent to the configured notifier, which in this case is the queryable buffer. + // Thus, after this returns, the data is both durable and queryable. + self.wal.write_ops(ops).await?; + } // record metrics for lines written, rejected, and bytes written self.metrics @@ -438,9 +443,17 @@ impl Bufferer for WriteBufferImpl { ingest_time: Time, accept_partial: bool, precision: Precision, + no_sync: bool, ) -> Result { - self.write_lp(database, lp, ingest_time, accept_partial, precision) - .await + self.write_lp( + database, + lp, + ingest_time, + accept_partial, + precision, + no_sync, + ) + .await } fn catalog(&self) -> Arc { @@ -994,6 +1007,7 @@ mod tests { Time::from_timestamp_nanos(123), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1018,6 +1032,7 @@ mod tests { Time::from_timestamp_nanos(124), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1029,6 +1044,7 @@ mod tests { Time::from_timestamp_nanos(125), false, Precision::Nanosecond, + false, ) .await; @@ -1108,6 +1124,7 @@ mod tests { Time::from_timestamp(20, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1166,6 +1183,7 @@ mod tests { Time::from_timestamp(30, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1187,6 +1205,7 @@ mod tests { Time::from_timestamp(40, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1241,6 +1260,7 @@ mod tests { Time::from_timestamp(10, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1264,6 +1284,7 @@ mod tests { Time::from_timestamp(65, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1289,6 +1310,7 @@ mod tests { Time::from_timestamp(147, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1331,6 +1353,7 @@ mod tests { Time::from_timestamp(250, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1410,6 +1433,7 @@ mod tests { Time::from_timestamp(300, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1422,6 +1446,7 @@ mod tests { Time::from_timestamp(330, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1593,6 +1618,7 @@ mod tests { Time::from_timestamp(10, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1602,6 +1628,7 @@ mod tests { Time::from_timestamp(20, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -1611,6 +1638,7 @@ mod tests { Time::from_timestamp(30, 0).unwrap(), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2331,6 +2359,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2359,6 +2388,7 @@ mod tests { start_time, false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2911,6 +2941,7 @@ mod tests { Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000), false, Precision::Nanosecond, + false, ) .await .unwrap(); @@ -2930,6 +2961,7 @@ mod tests { Time::from_timestamp_nanos(w.time_seconds * 1_000_000_000), true, Precision::Nanosecond, + false, ) .await .unwrap();