Skip to content

Commit

Permalink
feat: add no_sync write_lp param for fast writes
Browse files Browse the repository at this point in the history
In cases where a user does not need the guarantees that data is
persisted to the WAL on write and needs faster ingest speed then the
no_sync param added in this commit are what they need. Rather
than waiting on a sync to the WAL we write to the buffer without
confirming that writes have made it to the WAL.

The upside to this is that they can ingest data faster, but there is a
small risk that between writing the data and it eventually being written
to object storage, that the server crashes and it's irrevocably lost.
Also if the write to the WAL fails, then at most the user will not get a
failed response code they can handle. The data will still be in the
buffer, but will not be durable until persisted as a parquet file in
this case.

However, in many cases that might be acceptable. This commit expands on
what's possible so that the user can use InfluxDB Core the way that
works best for their workload.

Note that this feature is only added for the /api/v3/write_lp endpoint.
The legacy endpoints for writing can take the parameter, but won't do
anything with it at all.

Closes #25597
  • Loading branch information
mgattozzi committed Jan 24, 2025
1 parent 061b62a commit b6b35e2
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 462 deletions.
51 changes: 51 additions & 0 deletions influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,54 @@ async fn writes_with_different_schema_should_fail() {
"the request should hae failed with an API Error"
);
}
#[tokio::test]
/// Check that the no_sync param can be used on any endpoint. However, this only means that serde
/// will parse it just fine. It is only able to be used in the v3 endpoint and will
/// default to requiring the WAL to synce before returning.
async fn api_no_sync_param() {
let server = TestServer::spawn().await;
let client = reqwest::Client::new();
let v1_write_url = format!("{base}/write", base = server.client_addr());
let v2_write_url = format!("{base}/api/v2/write", base = server.client_addr());
let v3_write_url = format!("{base}/api/v3/write_lp", base = server.client_addr());
let write_body = "cpu,host=a usage=0.5";

let params = vec![("db", "foo"), ("no_sync", "true")];
let resp = client
.post(&v1_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send /write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);

let params = vec![("bucket", "foo"), ("no_sync", "true")];
let resp = client
.post(&v2_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v2/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);

let params = vec![("db", "foo"), ("no_sync", "true")];
let resp = client
.post(&v3_write_url)
.query(&params)
.body(write_body)
.send()
.await
.expect("send api/v3/write request");
let status = resp.status();
let body = resp.text().await.expect("response body as text");
println!("Response [{status}]:\n{body}");
assert_eq!(status, StatusCode::NO_CONTENT);
}
6 changes: 6 additions & 0 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -960,6 +961,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1020,6 +1022,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1088,6 +1091,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1182,6 +1186,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down Expand Up @@ -1242,6 +1247,7 @@ mod tests {
start_time,
false,
Precision::Nanosecond,
false,
)
.await?;

Expand Down
4 changes: 4 additions & 0 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ where
default_time,
params.accept_partial,
params.precision,
params.no_sync,
)
.await?;

Expand Down Expand Up @@ -1615,6 +1616,8 @@ pub(crate) struct WriteParams {
pub(crate) accept_partial: bool,
#[serde(default)]
pub(crate) precision: Precision,
#[serde(default)]
pub(crate) no_sync: bool,
}

impl From<iox_http::write::WriteParams> for WriteParams {
Expand All @@ -1624,6 +1627,7 @@ impl From<iox_http::write::WriteParams> for WriteParams {
// legacy behaviour was to not accept partial:
accept_partial: false,
precision: legacy.precision.into(),
no_sync: false,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions influxdb3_server/src/query_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1010,6 +1011,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1075,6 +1077,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1124,6 +1127,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -1189,6 +1193,7 @@ mod tests {
Time::from_timestamp_nanos(time),
false,
influxdb3_write::Precision::Nanosecond,
false,
)
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;

#[async_trait]
pub trait Wal: Debug + Send + Sync + 'static {
/// Buffer into a single larger operation in memory. Returns before the operation is persisted.
async fn buffer_op_unconfirmed(&self, op: WalOp) -> Result<(), Error>;
/// Buffer writes ops into the buffer, but returns before the operation is persisted to the WAL.
async fn write_ops_unconfirmed(&self, op: Vec<WalOp>) -> Result<(), Error>;

/// Writes the ops into the buffer and waits until the WAL file is persisted. When this returns
/// the operations are durable in the configured object store and the file notifier has been
Expand Down
Loading

0 comments on commit b6b35e2

Please sign in to comment.