Skip to content

Commit

Permalink
Check if all nodes have reported a persisted lsn before trimming the log
Browse files Browse the repository at this point in the history
Until we can share partition processor snapshots between Restate nodes (e.g.
by fetching them from S3), we can only trim the log if all known nodes have
reached the trim point. Otherwise, we risk that a node that is currently not
available needs log entries which were trimmed. One crucial assumption is
that no new nodes will join the cluster once the first log trimming has
happened. For this to work, we also need the sharing of partition processor
snapshots.

This fixes #1781.
  • Loading branch information
tillrohrmann committed Aug 7, 2024
1 parent 800c46c commit 51075c1
Showing 1 changed file with 155 additions and 94 deletions.
249 changes: 155 additions & 94 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,17 +291,25 @@ where
}

for (partition_id, persisted_lsns) in persisted_lsns_per_partition.into_iter() {
// todo: Check that we haven't forgotten a replica which is not part of the cluster state, yet
let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID);
let log_id = LogId::from(partition_id);
// trim point is before the oldest record
let current_trim_point = bifrost_admin.get_trim_point(log_id).await?;

if min_persisted_lsn >= current_trim_point + self.log_trim_threshold {
debug!(
// todo: Remove once Restate nodes can share partition processor snapshots
// only try to trim if we know about the persisted lsns of all known nodes; otherwise we
// risk that a node cannot fully replay the log; this assumes that no new nodes join the
// cluster after the first trimming has happened
if persisted_lsns.len() >= cluster_state.nodes.len() {
let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID);
// trim point is before the oldest record
let current_trim_point = bifrost_admin.get_trim_point(log_id).await?;

if min_persisted_lsn >= current_trim_point + self.log_trim_threshold {
debug!(
"Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'"
);
bifrost_admin.trim(log_id, min_persisted_lsn).await?
bifrost_admin.trim(log_id, min_persisted_lsn).await?
}
} else {
warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log.");
}
}

Expand Down Expand Up @@ -426,7 +434,7 @@ mod tests {
use googletest::{assert_that, pat};
use restate_bifrost::{Bifrost, Record, TrimGap};
use restate_core::network::{MessageHandler, NetworkSender};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnvBuilder};
use restate_core::{MockNetworkSender, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
Expand All @@ -438,6 +446,7 @@ mod tests {
use restate_types::net::{AdvertisedAddress, MessageEnvelope};
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role};
use restate_types::{GenerationalNodeId, Version};
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -501,13 +510,20 @@ mod tests {
struct PartitionProcessorStatusHandler {
network_sender: MockNetworkSender,
persisted_lsn: Arc<AtomicU64>,
// set of node ids for which the handler won't send a response to the caller, this allows to simulate
// dead nodes
block_list: BTreeSet<GenerationalNodeId>,
}

impl MessageHandler for PartitionProcessorStatusHandler {
type MessageType = GetProcessorsState;

async fn on_message(&self, msg: MessageEnvelope<Self::MessageType>) {
let (sender, msg) = msg.split();
let (target, msg) = msg.split();

if self.block_list.contains(&target) {
return;
}

let partition_processor_status = PartitionProcessorStatus {
last_persisted_log_lsn: Some(Lsn::from(self.persisted_lsn.load(Ordering::Relaxed))),
Expand All @@ -521,17 +537,16 @@ mod tests {
};

self.network_sender
.send(sender.into(), &response)
// We are not really sending something back to target, we just need to provide a known
// node_id. The response will be sent to a handler running on the very same node.
.send(target.into(), &response)
.await
.expect("send should succeed");
}
}

#[test(tokio::test(start_paused = true))]
async fn auto_log_trim() -> anyhow::Result<()> {
let mut builder = TestCoreEnvBuilder::new_with_mock_network();

let metadata = builder.metadata.clone();
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 5;
let interval_duration = Duration::from_secs(10);
Expand All @@ -541,47 +556,17 @@ mod tests {
..Default::default()
};

let svc = Service::new(
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
&mut builder.router_builder,
);

let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
nodes_config.upsert_node(NodeConfig::new(
"node".to_owned(),
GenerationalNodeId::new(1, 1),
AdvertisedAddress::Uds("foobar".into()),
Role::Worker.into(),
));
let persisted_lsn = Arc::new(AtomicU64::new(0));
let (node_env, bifrost) = create_test_env(config, |builder| {
let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
block_list: BTreeSet::new(),
};

let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
};

let node_env = builder
.add_message_handler(get_processor_state_handler)
.with_nodes_config(nodes_config)
.build()
.await;

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(bifrost.clone(), None),
)?;
builder.add_message_handler(get_processor_state_handler)
})
.await?;

let log_id = LogId::from(0);

Expand Down Expand Up @@ -625,9 +610,6 @@ mod tests {

#[test(tokio::test(start_paused = true))]
async fn auto_log_trim_zero_threshold() -> anyhow::Result<()> {
let mut builder = TestCoreEnvBuilder::new_with_mock_network();

let metadata = builder.metadata.clone();
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
Expand All @@ -637,47 +619,17 @@ mod tests {
..Default::default()
};

let svc = Service::new(
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
&mut builder.router_builder,
);

let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
nodes_config.upsert_node(NodeConfig::new(
"node".to_owned(),
GenerationalNodeId::new(1, 1),
AdvertisedAddress::Uds("foobar".into()),
Role::Worker.into(),
));
let persisted_lsn = Arc::new(AtomicU64::new(0));
let (node_env, bifrost) = create_test_env(config, |builder| {
let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
block_list: BTreeSet::new(),
};

let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
};

let node_env = builder
.add_message_handler(get_processor_state_handler)
.with_nodes_config(nodes_config)
.build()
.await;

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(bifrost.clone(), None),
)?;
builder.add_message_handler(get_processor_state_handler)
})
.await?;

let log_id = LogId::from(0);

Expand Down Expand Up @@ -719,4 +671,113 @@ mod tests {

Ok(())
}

#[test(tokio::test(start_paused = true))]
async fn do_not_trim_if_not_all_nodes_report_persisted_lsn() -> anyhow::Result<()> {
let mut admin_options = AdminOptions::default();
admin_options.log_trim_threshold = 0;
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
admin: admin_options,
..Default::default()
};

let persisted_lsn = Arc::new(AtomicU64::new(0));

let (node_env, bifrost) = create_test_env(config, |builder| {
let black_list = builder
.nodes_config
.iter()
.next()
.map(|(_, node_config)| node_config.current_generation)
.into_iter()
.collect();

let get_processor_state_handler = PartitionProcessorStatusHandler {
network_sender: builder.network_sender.clone(),
persisted_lsn: Arc::clone(&persisted_lsn),
block_list: black_list,
};

builder.add_message_handler(get_processor_state_handler)
})
.await?;

let log_id = LogId::from(0);

node_env
.tc
.run_in_scope("test", None, async move {
for i in 1..=5 {
let lsn = bifrost
.append(log_id, Payload::new(format!("record{}", i)))
.await?;
assert_eq!(Lsn::from(i), lsn);
}

// report persisted lsn back to cluster controller for a subset of the nodes
persisted_lsn.store(5, Ordering::Relaxed);

tokio::time::sleep(interval_duration * 10).await;
// no trimming should have happened because one node did not report the persisted lsn
assert_eq!(Lsn::INVALID, bifrost.get_trim_point(log_id).await?);

Ok::<(), anyhow::Error>(())
})
.await?;

Ok(())
}

async fn create_test_env<F>(
config: Configuration,
mut modify_builder: F,
) -> anyhow::Result<(TestCoreEnv<MockNetworkSender>, Bifrost)>
where
F: FnMut(TestCoreEnvBuilder<MockNetworkSender>) -> TestCoreEnvBuilder<MockNetworkSender>,
{
let mut builder = TestCoreEnvBuilder::new_with_mock_network();
let metadata = builder.metadata.clone();

let svc = Service::new(
Live::from_value(config),
builder.metadata_writer.clone(),
builder.metadata_store_client.clone(),
builder.tc.clone(),
builder.metadata.clone(),
builder.network_sender.clone(),
&mut builder.router_builder,
);

let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned());
nodes_config.upsert_node(NodeConfig::new(
"node-1".to_owned(),
GenerationalNodeId::new(1, 1),
AdvertisedAddress::Uds("foobar".into()),
Role::Worker.into(),
));
nodes_config.upsert_node(NodeConfig::new(
"node-2".to_owned(),
GenerationalNodeId::new(2, 2),
AdvertisedAddress::Uds("bar".into()),
Role::Worker.into(),
));
let builder = modify_builder(builder.with_nodes_config(nodes_config));

let node_env = builder.build().await;

let bifrost = node_env
.tc
.run_in_scope("init", None, Bifrost::init_in_memory(metadata))
.await;

node_env.tc.spawn(
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(bifrost.clone(), None),
)?;
Ok((node_env, bifrost))
}
}

0 comments on commit 51075c1

Please sign in to comment.