Skip to content

Commit

Permalink
Check if all nodes have reported a persisted lsn before trimming the log
Browse files Browse the repository at this point in the history
Until we can share partition processor snapshots between Restate nodes (e.g.
by fetching them from S3), we can only trim the log if all known nodes have
reached the trim point. Otherwise, we risk that a node that is currently not
available needs log entries which were trimmed. One crucial assumption is
that no new nodes will join the cluster once the first log trimming has
happened. For this to work, we also need the sharing of partition processor
snapshots.

This fixes restatedev#1781.
  • Loading branch information
tillrohrmann committed Aug 2, 2024
1 parent af2ab90 commit 76e3023
Showing 1 changed file with 166 additions and 109 deletions.
275 changes: 166 additions & 109 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,25 @@ where
}

for (partition_id, persisted_lsns) in persisted_lsns_per_partition.into_iter() {
// todo: Check that we haven't forgotten a replica which is not part of the cluster state, yet
let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID);
let log_id = LogId::from(partition_id);
// trim point is before the oldest record
let current_trim_point = bifrost_admin.get_trim_point(log_id).await?;

if min_persisted_lsn >= current_trim_point + self.log_trim_threshold {
debug!(
// todo: Remove once Restate nodes can share partition processor snapshots
// only try to trim if we know about the persisted lsns of all known nodes; otherwise we
// risk that a node cannot fully replay the log; this assumes that no new nodes join the
// cluster after the first trimming has happened
if persisted_lsns.len() >= cluster_state.nodes.len() {
let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID);
// trim point is before the oldest record
let current_trim_point = bifrost_admin.get_trim_point(log_id).await?;

if min_persisted_lsn >= current_trim_point + self.log_trim_threshold {
debug!(
"Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'"
);
bifrost_admin.trim(log_id, min_persisted_lsn).await?
bifrost_admin.trim(log_id, min_persisted_lsn).await?
}
} else {
debug!("Ignore automatically trimming log '{log_id}' because not all nodes reported their persisted lsn.");
}
}

Expand Down Expand Up @@ -423,7 +431,7 @@ mod tests {
use googletest::{assert_that, pat};
use restate_bifrost::{Bifrost, Record, TrimGap};
use restate_core::network::{MessageHandler, NetworkSender};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnvBuilder};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
Expand All @@ -435,6 +443,7 @@ mod tests {
use restate_types::net::{AdvertisedAddress, MessageEnvelope};
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role};
use restate_types::{GenerationalNodeId, Version};
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -498,37 +507,39 @@ mod tests {
struct PartitionProcessorStatusHandler {
network_sender: MockNetworkSender,
persisted_lsn: Arc<AtomicU64>,
black_list: BTreeSet<GenerationalNodeId>,
}

impl MessageHandler for PartitionProcessorStatusHandler {
type MessageType = GetProcessorsState;

async fn on_message(&self, msg: MessageEnvelope<Self::MessageType>) {
let (sender, msg) = msg.split();

let partition_processor_status = PartitionProcessorStatus {
last_persisted_log_lsn: Some(Lsn::from(self.persisted_lsn.load(Ordering::Relaxed))),
..PartitionProcessorStatus::new()
};

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = ProcessorsStateResponse {
request_id: msg.request_id,
state,
};

self.network_sender
.send(sender.into(), &response)
.await
.expect("send should succeed");
let (target, msg) = msg.split();

if !self.black_list.contains(&target) {
let partition_processor_status = PartitionProcessorStatus {
last_persisted_log_lsn: Some(Lsn::from(
self.persisted_lsn.load(Ordering::Relaxed),
)),
..PartitionProcessorStatus::new()
};

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = ProcessorsStateResponse {
request_id: msg.request_id,
state,
};

self.network_sender
.send(target.into(), &response)
.await
.expect("send should succeed");
}
}
}

#[test(tokio::test(start_paused = true))]
async fn auto_log_trim() -> anyhow::Result<()> {
let mut builder = TestCoreEnvBuilder::new_with_mock_network();

let metadata = builder.metadata.clone();
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 5;
let interval_duration = Duration::from_secs(10);
Expand All @@ -538,47 +549,17 @@ mod tests {
..Default::default()
};

let svc = Service::new(
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
&mut builder.router_builder,
);

let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
nodes_config.upsert_node(NodeConfig::new(
"node".to_owned(),
GenerationalNodeId::new(1, 1),
AdvertisedAddress::Uds("foobar".into()),
Role::Worker.into(),
));
let persisted_lsn = Arc::new(AtomicU64::new(0));
let (node_env, bifrost) = create_test_env(config, |builder| {
let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
black_list: BTreeSet::new(),
};

let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
};

let node_env = builder
.add_message_handler(get_processor_state_handler)
.with_nodes_config(nodes_config)
.build()
.await;

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(bifrost.clone(), None),
)?;
builder.add_message_handler(get_processor_state_handler)
})
.await?;

let log_id = LogId::from(0);

Expand Down Expand Up @@ -622,9 +603,6 @@ mod tests {

#[test(tokio::test(start_paused = true))]
async fn auto_log_trim_zero_threshold() -> anyhow::Result<()> {
let mut builder = TestCoreEnvBuilder::new_with_mock_network();

let metadata = builder.metadata.clone();
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
Expand All @@ -634,47 +612,17 @@ mod tests {
..Default::default()
};

let svc = Service::new(
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
&mut builder.router_builder,
);

let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
nodes_config.upsert_node(NodeConfig::new(
"node".to_owned(),
GenerationalNodeId::new(1, 1),
AdvertisedAddress::Uds("foobar".into()),
Role::Worker.into(),
));
let persisted_lsn = Arc::new(AtomicU64::new(0));
let (node_env, bifrost) = create_test_env(config, |builder| {
let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
black_list: BTreeSet::new(),
};

let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
};

let node_env = builder
.add_message_handler(get_processor_state_handler)
.with_nodes_config(nodes_config)
.build()
.await;

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(bifrost.clone(), None),
)?;
builder.add_message_handler(get_processor_state_handler)
})
.await?;

let log_id = LogId::from(0);

Expand Down Expand Up @@ -716,4 +664,113 @@ mod tests {

Ok(())
}

#[test(tokio::test(start_paused = true))]
async fn do_not_trim_if_not_all_nodes_report_persisted_lsn() -> anyhow::Result<()> {
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
admin: admin_options,
..Default::default()
};

let persisted_lsn = Arc::new(AtomicU64::new(0));

let (node_env, bifrost) = create_test_env(config, |builder| {
let black_list = builder
.nodes_config
.iter()
.next()
.map(|(_, node_config)| node_config.current_generation)
.into_iter()
.collect();

let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
black_list,
};

builder.add_message_handler(get_processor_state_handler)
})
.await?;

let log_id = LogId::from(0);

node_env
.tc
.run_in_scope("test", None, async move {
for i in 1..=5 {
let lsn = bifrost
.append(log_id, Payload::new(format!("record{}", i)))
.await?;
assert_eq!(Lsn::from(i), lsn);
}

// report persisted lsn back to cluster controller for a subset of the nodes
persisted_lsn.store(5, Ordering::Relaxed);

tokio::time::sleep(interval_duration * 10).await;
// no trimming should have happened because one node did not report the persisted lsn
assert_eq!(Lsn::INVALID, bifrost.get_trim_point(log_id).await?);

Ok::<(), anyhow::Error>(())
})
.await?;

Ok(())
}

async fn create_test_env<F>(
config: Configuration,
mut modify_builder: F,
) -> anyhow::Result<(TestCoreEnv<MockNetworkSender>, Bifrost)>
where
F: FnMut(TestCoreEnvBuilder<MockNetworkSender>) -> TestCoreEnvBuilder<MockNetworkSender>,
{
let mut builder = TestCoreEnvBuilder::new_with_mock_network();
let metadata = builder.metadata.clone();

let svc = Service::new(
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
&mut builder.router_builder,
);

let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
nodes_config.upsert_node(NodeConfig::new(
"node-1".to_owned(),
GenerationalNodeId::new(1, 1),
AdvertisedAddress::Uds("foobar".into()),
Role::Worker.into(),
));
nodes_config.upsert_node(NodeConfig::new(
"node-2".to_owned(),
GenerationalNodeId::new(2, 2),
AdvertisedAddress::Uds("bar".into()),
Role::Worker.into(),
));
let builder = modify_builder(builder.with_nodes_config(nodes_config));

let node_env = builder.build().await;

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(bifrost.clone(), None),
)?;
Ok((node_env, bifrost))
}
}

0 comments on commit 76e3023

Please sign in to comment.