Skip to content

Commit

Permalink
feat: add no_sync write_lp param for fast writes (#25902)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgattozzi authored Jan 24, 2025
1 parent 061b62a commit 43e186d
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 51 deletions.
51 changes: 51 additions & 0 deletions influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,54 @@ async fn writes_with_different_schema_should_fail() {
"the request should hae failed with an API Error"
);
}
#[tokio::test]
/// Check that the no_sync param can be used on any endpoint. However, this only means that serde
/// will parse it just fine. It is only able to be used in the v3 endpoint and will
/// default to requiring the WAL to synce before returning.
async fn api_no_sync_param() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let v1_write_url = format!("{base}/write", base = server.client_addr());
let v2_write_url = format!("{base}/api/v2/write", base = server.client_addr());
let v3_write_url = format!("{base}/api/v3/write_lp", base = server.client_addr());
let write_body = "cpu,host=a usage=0.5";

let params = vec![("db", "foo"), ("no_sync", "true")];
let resp = client
.post(&v1_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send /write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);

let params = vec![("bucket", "foo"), ("no_sync", "true")];
let resp = client
.post(&v2_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v2/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);

let params = vec![("db", "foo"), ("no_sync", "true")];
let resp = client
.post(&v3_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v3/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
}
6 changes: 6 additions & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -960,6 +961,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1020,6 +1022,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1088,6 +1091,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1182,6 +1186,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1242,6 +1247,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions influxdb3_processing_engine/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ mod python_plugin {
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
false,
)
.await
{
Expand All @@ -447,6 +448,7 @@ mod python_plugin {
Time::from_timestamp_nanos(ingest_time.as_nanos() as i64),
false,
Precision::Nanosecond,
false,
)
.await
{
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ where
default_time,
params.accept_partial,
params.precision,
params.no_sync,
)
.await?;

Expand Down Expand Up @@ -1615,6 +1616,8 @@ pub(crate) struct WriteParams {
pub(crate) accept_partial: bool,
#[serde(default)]
pub(crate) precision: Precision,
#[serde(default)]
pub(crate) no_sync: bool,
}

impl From<iox_http::write::WriteParams> for WriteParams {
Expand All @@ -1624,6 +1627,7 @@ impl From<iox_http::write::WriteParams> for WriteParams {
// legacy behaviour was to not accept partial:
accept_partial: false,
precision: legacy.precision.into(),
no_sync: false,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1010,6 +1011,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1075,6 +1077,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1124,6 +1127,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1189,6 +1193,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;

#[async_trait]
pub trait Wal: Debug + Send + Sync + 'static {
/// Buffer into a single larger operation in memory. Returns before the operation is persisted.
async fn buffer_op_unconfirmed(&self, op: WalOp) -> Result<(), Error>;
/// Buffer writes ops into the buffer, but returns before the operation is persisted to the WAL.
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> Result<(), Error>;

/// Writes the ops into the buffer and waits until the WAL file is persisted. When this returns
/// the operations are durable in the configured object store and the file notifier has been
Expand Down
80 changes: 40 additions & 40 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ impl WalObjectStore {
}

/// Buffer into a single larger operation in memory. Returns before the operation is persisted.
async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> {
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> crate::Result<(), crate::Error> {
self.flush_buffer
.lock()
.await
.wal_buffer
.buffer_op_unconfirmed(op)
.write_ops_unconfirmed(op)
}

/// Writes the op into the buffer and waits until the WAL file is persisted. When this returns
Expand Down Expand Up @@ -513,8 +513,8 @@ async fn load_all_wal_file_paths(

#[async_trait::async_trait]
impl Wal for WalObjectStore {
async fn buffer_op_unconfirmed(&self, op: WalOp) -> crate::Result<(), crate::Error> {
self.buffer_op_unconfirmed(op).await
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> crate::Result<(), crate::Error> {
self.write_ops_unconfirmed(op).await
}

async fn write_ops(&self, ops: Vec<WalOp>) -> crate::Result<(), crate::Error> {
Expand Down Expand Up @@ -719,36 +719,38 @@ pub enum WriteResult {
}

impl WalBuffer {
fn buffer_op_unconfirmed(&mut self, op: WalOp) -> crate::Result<(), crate::Error> {
fn write_ops_unconfirmed(&mut self, ops: Vec<WalOp>) -> crate::Result<(), crate::Error> {
if self.op_count >= self.op_limit {
return Err(crate::Error::BufferFull(self.op_count));
}

match op {
WalOp::Write(new_write_batch) => {
let db_name = Arc::clone(&new_write_batch.database_name);

// insert the database write batch or add to existing
let write_batch =
self.database_to_write_batch
.entry(db_name)
.or_insert_with(|| WriteBatch {
database_id: new_write_batch.database_id,
database_name: new_write_batch.database_name,
table_chunks: Default::default(),
min_time_ns: i64::MAX,
max_time_ns: i64::MIN,
});
write_batch.add_write_batch(
new_write_batch.table_chunks,
new_write_batch.min_time_ns,
new_write_batch.max_time_ns,
);
}
WalOp::Catalog(catalog_batch) => {
self.catalog_batches.push(catalog_batch);
for op in ops {
match op {
WalOp::Write(new_write_batch) => {
let db_name = Arc::clone(&new_write_batch.database_name);

// insert the database write batch or add to existing
let write_batch =
self.database_to_write_batch
.entry(db_name)
.or_insert_with(|| WriteBatch {
database_id: new_write_batch.database_id,
database_name: new_write_batch.database_name,
table_chunks: Default::default(),
min_time_ns: i64::MAX,
max_time_ns: i64::MIN,
});
write_batch.add_write_batch(
new_write_batch.table_chunks,
new_write_batch.min_time_ns,
new_write_batch.max_time_ns,
);
}
WalOp::Catalog(catalog_batch) => {
self.catalog_batches.push(catalog_batch);
}
WalOp::Noop(_) => {}
}
WalOp::Noop(_) => {}
}

Ok(())
Expand All @@ -760,9 +762,7 @@ impl WalBuffer {
response: oneshot::Sender<WriteResult>,
) -> crate::Result<(), crate::Error> {
self.write_op_responses.push(response);
for op in ops {
self.buffer_op_unconfirmed(op)?;
}
self.write_ops_unconfirmed(ops)?;

Ok(())
}
Expand Down Expand Up @@ -979,7 +979,7 @@ mod tests {
min_time_ns: 1,
max_time_ns: 3,
});
wal.buffer_op_unconfirmed(op1.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op1.clone()]).await.unwrap();

let op2 = WalOp::Write(WriteBatch {
database_id: DbId::from(0),
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
min_time_ns: 62_000000000,
max_time_ns: 62_000000000,
});
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op2.clone()]).await.unwrap();

// create wal file 1
let ret = wal.flush_buffer(false).await;
Expand Down Expand Up @@ -1083,7 +1083,7 @@ mod tests {
);

// create wal file 2
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op2.clone()]).await.unwrap();
assert!(wal.flush_buffer(false).await.is_none());

let file_2_contents = create::wal_contents(
Expand Down Expand Up @@ -1207,7 +1207,7 @@ mod tests {
min_time_ns: 128_000000000,
max_time_ns: 128_000000000,
});
wal.buffer_op_unconfirmed(op3.clone()).await.unwrap();
wal.write_ops_unconfirmed(vec![op3.clone()]).await.unwrap();

let (snapshot_done, snapshot_info, snapshot_permit) =
wal.flush_buffer(false).await.unwrap();
Expand Down Expand Up @@ -1394,7 +1394,7 @@ mod tests {

flush_buffer
.wal_buffer
.buffer_op_unconfirmed(WalOp::Write(WriteBatch {
.write_ops_unconfirmed(vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: IndexMap::from([(
Expand Down Expand Up @@ -1425,7 +1425,7 @@ mod tests {
.into(),
min_time_ns: 128_000000000,
max_time_ns: 148_000000000,
}))
})])
.unwrap();

// wal buffer not empty, force snapshot set => snapshot (empty wal buffer
Expand All @@ -1452,7 +1452,7 @@ mod tests {
// not snapshot
flush_buffer
.wal_buffer
.buffer_op_unconfirmed(WalOp::Write(WriteBatch {
.write_ops_unconfirmed(vec![WalOp::Write(WriteBatch {
database_id: DbId::from(0),
database_name: "db1".into(),
table_chunks: IndexMap::from([(
Expand Down Expand Up @@ -1483,7 +1483,7 @@ mod tests {
.into(),
min_time_ns: 128_000000000,
max_time_ns: 148_000000000,
}))
})])
.unwrap();

let (wal_contents, _, maybe_snapshot) = flush_buffer
Expand Down
1 change: 1 addition & 0 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
ingest_time: Time,
accept_partial: bool,
precision: Precision,
no_sync: bool,
) -> write_buffer::Result<BufferedWriteRequest>;

/// Returns the database schema provider
Expand Down
Loading

0 comments on commit 43e186d

Please sign in to comment.