Skip to content

Commit

Permalink
dekaf: Update logging to use journal::Client::append_stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 11, 2025
1 parent 96a2577 commit 794d951
Showing 1 changed file with 86 additions and 84 deletions.
170 changes: 86 additions & 84 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
use crate::{dekaf_shard_template_id, topology::fetch_dekaf_task_auth, App};
use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use flow_client::fetch_task_authorization;
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use gazette::{
broker::AppendRequest,
journal,
uuid::{self, Producer},
RetryError,
};
use proto_gazette::message_flags;
use rand::Rng;
use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::sync::mpsc::error::TrySendError;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tokio_util::task::AbortOnDropHandle;

#[derive(Debug)]
enum TaskWriterMessage {
Expand Down Expand Up @@ -98,15 +99,13 @@ impl TaskWriter for GazetteWriter {
.as_mut()
.context("not initialized")?
.append(data)
.await
}

async fn append_stats(&mut self, data: Bytes) -> anyhow::Result<()> {
self.stats_appender
.as_mut()
.context("not initialized")?
.append(data)
.await
}
}

Expand Down Expand Up @@ -140,11 +139,8 @@ impl GazetteWriter {

#[derive(Clone)]
struct GazetteAppender {
client: journal::Client,
journal_name: String,
exp: time::OffsetDateTime,
app: Arc<App>,
task_name: String,
message_tx: tokio::sync::mpsc::UnboundedSender<Box<dyn journal::FramedMessage>>,
_handle: Arc<AbortOnDropHandle<()>>,
}

impl GazetteAppender {
Expand All @@ -153,87 +149,92 @@ impl GazetteAppender {
task_name: String,
app: Arc<App>,
) -> anyhow::Result<Self> {
let (client, exp) = Self::refresh_client(&task_name, &journal_name, app.clone()).await?;
let (message_tx, messages_rx) = tokio::sync::mpsc::unbounded_channel();

Ok(Self {
client,
exp,
task_name,
journal_name,
app,
})
}
let handle = tokio::spawn(async move {
let message_stream = UnboundedReceiverStream::new(messages_rx);
// Outer loop handles refreshing clients
loop {
let (client, exp) =
match Self::refresh_client(&task_name, &journal_name, app.clone()).await {
Ok((client, exp)) => (client, exp),
Err(e) => {
tracing::error!(
?e,
?task_name,
?journal_name,
"Failed to refresh client"
);
break;
}
};

async fn append(&mut self, data: Bytes) -> anyhow::Result<()> {
if (self.exp - SystemTime::now()).whole_seconds() < 60 {
self.refresh().await?;
}
let until_exp = exp - time::OffsetDateTime::now_utc();

let resp = self.client.append(
gazette::broker::AppendRequest {
journal: self.journal_name.clone(),
..Default::default()
},
|| {
futures::stream::once({
let value = data.clone();
async move { Ok(value) }
})
},
);
let exp_fut = tokio::time::sleep(std::time::Duration::from_secs(
until_exp.whole_seconds() as u64,
));

tokio::pin!(resp);
loop {
match resp.try_next().await {
Ok(_) => return Ok(()),
Err(RetryError { inner: err, .. })
if matches!(
&err,
gazette::Error::Grpc(status) if status.code() == tonic::Code::DeadlineExceeded
) =>
{
tracing::warn!(
?err,
"DeadlineExceeded error likely means that the data-plane access token has expired, but tokens get refreshed so this should never happen"
);

return Err(err.into());
}
Err(RetryError { attempt, ref inner }) if inner.is_transient() && attempt < 3 => {
let wait_ms = rand::thread_rng().gen_range(400..5_000);

tracing::warn!(
?attempt,
?inner,
?wait_ms,
"Got recoverable error trying to write logs, retrying"
);

tokio::time::sleep(Duration::from_millis(wait_ms)).await;
continue;
}
Err(err) if err.inner.is_transient() => {
tracing::warn!(
attempt=err.attempt,
inner=?err.inner,
"Got recoverable error multiple times while trying to write logs"
);
return Err(err.inner.into());
}
Err(err) => {
tracing::warn!(?err, "Got fatal error while trying to write logs");
return Err(err.inner.into());
let pending_messages = message_stream.as_ref().len();
tracing::debug!(?task_name, pending_messages, "Starting up new log appender");

// client_messages will signal that it's done 10 minutes before our token is set to expire.
// This way, we should have enough time to finish appending any pending messages.
let client_messages = Box::pin(message_stream.take_until(exp_fut));

let response_stream = client.append_stream(
AppendRequest {
journal: journal_name.to_string(),
..Default::default()
},
client_messages,
);

tokio::pin!(response_stream);

// Inner loop drives appender through its lifecycle. We will retry transient errors indefinitely,
// and fail as loudly as we can if we get unrecoverable errors.
loop {
if let Some(resp) = response_stream.next().await {
match resp {
Ok(_) => continue,
Err(RetryError { attempt, ref inner }) if inner.is_transient() => {
tracing::warn!(
?attempt,
?inner,
"Got recoverable error trying to write logs, retrying"
);

continue;
}
Err(err) => {
tracing::error!(
?err,
?task_name,
"Got fatal error while trying to write logs or stats"
);
}
}
} else {
tracing::debug!("Message stream closed, refreshing client");
}
}
}
}
});

Ok(Self {
message_tx,
_handle: Arc::new(AbortOnDropHandle::new(handle)),
})
}

async fn refresh(&mut self) -> anyhow::Result<()> {
let (client, exp) =
Self::refresh_client(&self.task_name, &self.journal_name, self.app.clone()).await?;
self.client = client;
self.exp = exp;
Ok(())
fn append(&mut self, data: Bytes) -> anyhow::Result<()> {
self.message_tx
.send(Box::new(|mut buf: bytes::BytesMut| async {
buf.extend(data);
Ok(buf)
}))
.map_err(|e| anyhow::Error::new(e))
}

async fn refresh_client(
Expand Down Expand Up @@ -289,6 +290,7 @@ const WELL_KNOWN_LOG_FIELDS: &'static [&'static str] = &[
SESSION_CLIENT_ID_FIELD_MARKER,
];
pub const LOG_MESSAGE_QUEUE_SIZE: usize = 50;
pub const LOG_BUFFER_SIZE: usize = 2 ^ 22; // 2mb

impl<W: TaskWriter + 'static> TaskForwarder<W> {
pub fn new(producer: Producer, writer: W) -> Self {
Expand Down

0 comments on commit 794d951

Please sign in to comment.