Skip to content

Commit

Permalink
dekaf: update log_appender to buffer incoming logs while an append …
Browse files Browse the repository at this point in the history
…is in progress and send them as a batch next time it's able to
  • Loading branch information
jshearer committed Feb 12, 2025
1 parent 5dad8cf commit 8e1ef10
Showing 1 changed file with 133 additions and 61 deletions.
194 changes: 133 additions & 61 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use flow_client::fetch_task_authorization;
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use gazette::{
journal,
uuid::{self, Producer},
Expand All @@ -14,6 +14,7 @@ use rand::Rng;
use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
mem,
sync::Arc,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -68,8 +69,18 @@ impl StatsAggregator {
// This abstraction exists mostly in order to make testing easier.
#[async_trait]
pub trait TaskWriter: Send + Sync {
async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()>;
async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()>;
async fn append_logs<S>(
&mut self,
log_data: impl Fn() -> S + Send + Sync,
) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static;
async fn append_stats<S>(
&mut self,
stat_data: impl Fn() -> S + Send + Sync,
) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static;

async fn set_task_name(&mut self, name: String) -> anyhow::Result<()>;
}
Expand All @@ -93,19 +104,28 @@ impl TaskWriter for GazetteWriter {
Ok(())
}

async fn append_logs(&mut self, data: Bytes) -> anyhow::Result<()> {
async fn append_logs<S>(&mut self, log_data: impl Fn() -> S + Send + Sync) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
self.logs_appender
.as_mut()
.context("not initialized")?
.append(data)
.append(log_data)
.await
}

async fn append_stats(&mut self, data: Bytes) -> anyhow::Result<()> {
async fn append_stats<S>(
&mut self,
stat_data: impl Fn() -> S + Send + Sync,
) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
self.stats_appender
.as_mut()
.context("not initialized")?
.append(data)
.append(stat_data)
.await
}
}
Expand Down Expand Up @@ -164,7 +184,10 @@ impl GazetteAppender {
})
}

async fn append(&mut self, data: Bytes) -> anyhow::Result<()> {
async fn append<S>(&mut self, data: impl Fn() -> S + Send + Sync) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
if (self.exp - SystemTime::now()).whole_seconds() < 60 {
self.refresh().await?;
}
Expand All @@ -174,52 +197,21 @@ impl GazetteAppender {
journal: self.journal_name.clone(),
..Default::default()
},
|| {
futures::stream::once({
let value = data.clone();
async move { Ok(value) }
})
},
data,
);

tokio::pin!(resp);
loop {
match resp.try_next().await {
Ok(_) => return Ok(()),
Err(RetryError { inner: err, .. })
if matches!(
&err,
gazette::Error::Grpc(status) if status.code() == tonic::Code::DeadlineExceeded
) =>
{
tracing::warn!(
?err,
"DeadlineExceeded error likely means that the data-plane access token has expired, but tokens get refreshed so this should never happen"
);

return Err(err.into());
}
Err(RetryError { attempt, ref inner }) if inner.is_transient() && attempt < 3 => {
let wait_ms = rand::thread_rng().gen_range(400..5_000);

Err(RetryError { attempt, ref inner }) if inner.is_transient() => {
tracing::warn!(
?attempt,
?inner,
?wait_ms,
"Got recoverable error trying to write logs, retrying"
);

tokio::time::sleep(Duration::from_millis(wait_ms)).await;
continue;
}
Err(err) if err.inner.is_transient() => {
tracing::warn!(
attempt=err.attempt,
inner=?err.inner,
"Got recoverable error multiple times while trying to write logs"
);
return Err(err.inner.into());
}
Err(err) => {
tracing::warn!(?err, "Got fatal error while trying to write logs");
return Err(err.inner.into());
Expand Down Expand Up @@ -349,9 +341,40 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {

// TODO(jshearer): Do we want to make this configurable?
let mut stats_interval = tokio::time::interval(std::time::Duration::from_secs(30));
let mut pending_logs = Vec::new();

loop {
tokio::select! {
// We always want to start a new append before accumulating more log messages because in
// the extreme case where we're getting messages faster than we can store them, we don't want
// to end up with an infinitely growing buffer of `pending_logs`.
biased;

Err(append_error) = Self::append_logs_to_writer(
&mut writer,
&mut pending_logs,
task_name.clone(),
uuid_producer.clone(),
), if pending_logs.len() > 0 => {
tracing::error!(?append_error, "Error appending logs");
}

_ = stats_interval.tick() => {
// Take current stats and write if non-zero
if let Some(current_stats) = stats.take(){
if let Err(append_error) = writer.append_stats(||{
let serialized = Self::serialize_stats(
uuid_producer,
current_stats.clone(),
task_name.to_owned(),
);
futures::stream::once(async move { Ok(serialized.clone().into()) })
}).await {
tracing::error!(?append_error, "Error appending stats")
}
}
}

msg = event_stream.next() => {
match msg {
Some(TaskWriterMessage::SetTaskName(new_name)) => {
Expand All @@ -369,9 +392,7 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
}
}

writer
.append_logs(Self::serialize_log(uuid_producer, log, task_name.to_owned()).into())
.await?;
pending_logs.push(log);
}
Some(TaskWriterMessage::Stats((collection_name, new_stats))) => {
stats.add(collection_name, new_stats);
Expand All @@ -380,20 +401,24 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
None => break,
}
},
_ = stats_interval.tick() => {
// Take current stats and write if non-zero
if let Some(current_stats) = stats.take(){
let data = Self::serialize_stats(uuid_producer, current_stats, task_name.to_owned());
writer.append_stats(data.into()).await?;
}
}
}
}

// Flush any remaining stats after stream ends
if let Some(remaining_stats) = stats.take() {
let data = Self::serialize_stats(uuid_producer, remaining_stats, task_name);
writer.append_stats(data.into()).await?;
if let Err(append_error) = writer
.append_stats(|| {
let serialized = Self::serialize_stats(
uuid_producer,
remaining_stats.clone(),
task_name.to_owned(),
);
futures::stream::once(async move { Ok(serialized.clone().into()) })
})
.await
{
tracing::error!(?append_error, "Error appending stats")
};
}

Ok(())
Expand All @@ -403,7 +428,7 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
producer: Producer,
stats: BTreeMap<String, ops::stats::Binding>,
task_name: String,
) -> Vec<u8> {
) -> bytes::Bytes {
let uuid = gazette::uuid::build(
producer,
gazette::uuid::Clock::from_time(std::time::SystemTime::now()),
Expand All @@ -427,10 +452,10 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
let mut buf = serde_json::to_vec(&stats_output).expect("Value always serializes");
buf.push(b'\n');

buf
bytes::Bytes::from(buf)
}

fn serialize_log(producer: Producer, mut log: ops::Log, task_name: String) -> Vec<u8> {
fn serialize_log(producer: Producer, mut log: ops::Log, task_name: String) -> bytes::Bytes {
let uuid = gazette::uuid::build(
producer,
gazette::uuid::Clock::from_time(std::time::SystemTime::now()),
Expand All @@ -445,7 +470,7 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
let mut buf = serde_json::to_vec(&log).expect("Value always serializes");
buf.push(b'\n');

buf
bytes::Bytes::from(buf)
}

pub fn set_task_name(&self, name: String) {
Expand Down Expand Up @@ -499,6 +524,31 @@ impl<W: TaskWriter + 'static> TaskForwarder<W> {
}
}
}

async fn append_logs_to_writer(
writer: &mut W,
pending_logs: &mut Vec<ops::Log>,
task_name: String,
uuid_producer: Producer,
) -> anyhow::Result<()> {
let logs_to_append = mem::take(pending_logs);

writer
.append_logs(move || {
futures::stream::iter(logs_to_append.clone().into_iter().map({
let value = task_name.clone();
move |log| {
let serialized = TaskForwarder::<W>::serialize_log(
uuid_producer.clone(),
log,
value.to_owned(),
);
Ok(serialized)
}
}))
})
.await
}
}

fn dekaf_shard_ref(task_name: String) -> ops::ShardRef {
Expand Down Expand Up @@ -536,12 +586,34 @@ mod tests {
Ok(())
}

async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()> {
self.logs.lock().await.push_back(log_data);
async fn append_logs<S>(
&mut self,
log_data: impl Fn() -> S + Send + Sync,
) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
let mut logs = self.logs.lock().await;
let mut stream = Box::pin(log_data());

while let Some(Ok(data)) = stream.next().await {
logs.push_back(data);
}
Ok(())
}
async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()> {
self.stats.lock().await.push_back(log_data);
async fn append_stats<S>(
&mut self,
stat_data: impl Fn() -> S + Send + Sync,
) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
let mut stats = self.stats.lock().await;
let mut stream = Box::pin(stat_data());

while let Some(Ok(data)) = stream.next().await {
stats.push_back(data);
}
Ok(())
}
}
Expand Down

0 comments on commit 8e1ef10

Please sign in to comment.