Skip to content

Commit

Permalink
Remove temporary solution for avoiding stuck invocations because of l…
Browse files Browse the repository at this point in the history
…ost SubmitNotifications

With restatedev#1651 being fixed, we no longer need to wait for all partitions to have a
running leader before we can start the ingress to avoid stucks invocations. That's
because the ingress will now retry until it finds an active leader for a given partition.

This fixes restatedev#2206.
  • Loading branch information
tillrohrmann committed Nov 4, 2024
1 parent 3a293a4 commit 17f6c79
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 113 deletions.
71 changes: 4 additions & 67 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use restate_types::partition_table::PartitionTable;
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher};
use super::cluster_state_refresher::ClusterStateRefresher;
use super::grpc_svc_handler::ClusterCtrlSvcHandler;
use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer;
use crate::cluster_controller::logs_controller::{
Expand Down Expand Up @@ -250,10 +250,7 @@ impl<T: TransportConnect> Service<T> {
}
}

pub async fn run(
mut self,
all_partitions_started_tx: Option<oneshot::Sender<()>>,
) -> anyhow::Result<()> {
pub async fn run(mut self) -> anyhow::Result<()> {
self.init_partition_table().await?;

let bifrost_admin = BifrostAdmin::new(
Expand All @@ -266,21 +263,6 @@ impl<T: TransportConnect> Service<T> {
let mut config_watcher = Configuration::watcher();
let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher();

// todo: This is a temporary band-aid for https://github.com/restatedev/restate/issues/1651
// Remove once it is properly fixed.
if let Some(all_partition_started_tx) = all_partitions_started_tx {
self.task_center.spawn_child(
TaskKind::SystemBoot,
"signal-all-partitions-started",
None,
signal_all_partitions_started(
cluster_state_watcher.clone(),
self.metadata.clone(),
all_partition_started_tx,
),
)?;
}

self.task_center.spawn_child(
TaskKind::SystemService,
"cluster-controller-metadata-sync",
Expand Down Expand Up @@ -611,51 +593,6 @@ async fn sync_cluster_controller_metadata(metadata: Metadata) -> anyhow::Result<
Ok(())
}

async fn signal_all_partitions_started(
mut cluster_state_watcher: ClusterStateWatcher,
metadata: Metadata,
all_partitions_started_tx: oneshot::Sender<()>,
) -> anyhow::Result<()> {
loop {
let cluster_state = cluster_state_watcher.next_cluster_state().await?;

if cluster_state.partition_table_version != metadata.partition_table_version()
|| metadata.partition_table_version() == Version::INVALID
{
// syncing of PartitionTable since we obviously don't have up-to-date information
metadata
.sync(
MetadataKind::PartitionTable,
TargetVersion::Version(cluster_state.partition_table_version.max(Version::MIN)),
)
.await?;
} else {
let partition_table = metadata.partition_table_ref();

let mut pending_partitions_wo_leader = partition_table.num_partitions();

for alive_node in cluster_state.alive_nodes() {
alive_node
.partitions
.iter()
.for_each(|(_, partition_state)| {
// currently, there can only be a single leader per partition
if partition_state.is_effective_leader() {
pending_partitions_wo_leader =
pending_partitions_wo_leader.saturating_sub(1);
}
})
}

if pending_partitions_wo_leader == 0 {
// send result can be ignored because rx should only go away in case of a shutdown
let _ = all_partitions_started_tx.send(());
return Ok(());
}
}
}
}

#[derive(Clone)]
struct PartitionProcessorManagerClient<N>
where
Expand Down Expand Up @@ -765,7 +702,7 @@ mod tests {
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(None),
svc.run(),
)?;

node_env
Expand Down Expand Up @@ -1089,7 +1026,7 @@ mod tests {
TaskKind::SystemService,
"cluster-controller",
None,
svc.run(None),
svc.run(),
)?;

Ok((node_env, bifrost))
Expand Down
24 changes: 4 additions & 20 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod cluster_marker;
mod network_server;
mod roles;

use tokio::sync::oneshot;
use tracing::{debug, error, info, trace};

use codederror::CodedError;
Expand Down Expand Up @@ -401,31 +400,16 @@ impl Node {
)?;
}

let all_partitions_started_rx = if let Some(admin_role) = self.admin_role {
// todo: This is a temporary fix for https://github.com/restatedev/restate/issues/1651
let (all_partitions_started_tx, all_partitions_started_rx) = oneshot::channel();
tc.spawn(
TaskKind::SystemBoot,
"admin-init",
None,
admin_role.start(all_partitions_started_tx),
)?;

all_partitions_started_rx
} else {
// We don't wait for all partitions being the leader if we are not co-located with the
// admin role which should not be the normal deployment today.
let (all_partitions_started_tx, all_partitions_started_rx) = oneshot::channel();
let _ = all_partitions_started_tx.send(());
all_partitions_started_rx
};
if let Some(admin_role) = self.admin_role {
tc.spawn(TaskKind::SystemBoot, "admin-init", None, admin_role.start())?;
}

if let Some(worker_role) = self.worker_role {
tc.spawn(
TaskKind::SystemBoot,
"worker-init",
None,
worker_role.start(all_partitions_started_rx),
worker_role.start(),
)?;
}

Expand Down
9 changes: 2 additions & 7 deletions crates/node/src/roles/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

use std::time::Duration;

use tokio::sync::oneshot;

use codederror::CodedError;
use restate_admin::cluster_controller;
use restate_admin::service::AdminService;
Expand Down Expand Up @@ -132,18 +130,15 @@ impl<T: TransportConnect> AdminRole<T> {
})
}

pub async fn start(
self,
all_partitions_started_tx: oneshot::Sender<()>,
) -> Result<(), anyhow::Error> {
pub async fn start(self) -> Result<(), anyhow::Error> {
let tc = task_center();

if let Some(cluster_controller) = self.controller {
tc.spawn_child(
TaskKind::SystemService,
"cluster-controller-service",
None,
cluster_controller.run(Some(all_partitions_started_tx)),
cluster_controller.run(),
)?;
}

Expand Down
8 changes: 2 additions & 6 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use codederror::CodedError;
use tokio::sync::oneshot;

use restate_bifrost::Bifrost;
use restate_core::network::MessageRouterBuilder;
Expand Down Expand Up @@ -103,10 +102,7 @@ impl<T: TransportConnect> WorkerRole<T> {
self.worker.storage_query_context()
}

pub async fn start(
self,
all_partitions_started_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
pub async fn start(self) -> anyhow::Result<()> {
let tc = task_center();
// todo: only run subscriptions on node 0 once being distributed
tc.spawn_child(
Expand All @@ -117,7 +113,7 @@ impl<T: TransportConnect> WorkerRole<T> {
)?;

tc.spawn_child(TaskKind::RoleRunner, "worker-service", None, async {
self.worker.run(all_partitions_started_rx).await
self.worker.run().await
})?;

Ok(())
Expand Down
16 changes: 3 additions & 13 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod subscription_integration;

use codederror::CodedError;
use std::time::Duration;
use tokio::sync::oneshot;

use restate_bifrost::Bifrost;
use restate_core::network::partition_processor_rpc_client::PartitionProcessorRpcClient;
Expand All @@ -32,7 +31,7 @@ use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::routing_info::PartitionRoutingRefresher;
use restate_core::worker_api::ProcessorsManagerHandle;
use restate_core::{cancellation_watcher, task_center, Metadata, TaskKind};
use restate_core::{task_center, Metadata, TaskKind};
use restate_ingress_dispatcher::IngressDispatcher;
use restate_ingress_http::HyperServerIngress;
use restate_ingress_kafka::Service as IngressKafkaService;
Expand Down Expand Up @@ -222,24 +221,15 @@ impl<T: TransportConnect> Worker<T> {
self.partition_processor_manager.handle()
}

pub async fn run(self, all_partitions_started_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
pub async fn run(self) -> anyhow::Result<()> {
let tc = task_center();

// Ingress RPC server
tc.spawn_child(
TaskKind::IngressServer,
"ingress-rpc-server",
None,
async move {
tokio::select! {
Ok(_) = all_partitions_started_rx => {
self.external_client_ingress.run().await
}
_ = cancellation_watcher() => {
Ok(())
}
}
},
self.external_client_ingress.run(),
)?;

// Postgres external server
Expand Down

0 comments on commit 17f6c79

Please sign in to comment.