Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): pin & unpin snapshot for mview creation with correct order #5606

Merged
merged 8 commits into from
Sep 29, 2022
Merged
4 changes: 1 addition & 3 deletions src/bench/s3_bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(type_ascription)]
use std::collections::hash_map::{Entry, HashMap};
use std::ops::Div;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};

use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
use aws_sdk_s3::output::UploadPartOutput;
use aws_sdk_s3::Client;
use aws_smithy_http::body::SdkBody;
use bytesize::ByteSize;
Expand Down Expand Up @@ -235,7 +233,7 @@ async fn multi_part_upload(
let part_t = Instant::now();
let result = a.send().await.unwrap();
let part_ttl = part_t.elapsed();
Ok((result, part_ttl)): Result<(UploadPartOutput, Duration), RwError>
Ok::<_, RwError>((result, part_ttl))
})
.collect_vec();
let ttfb = t.elapsed();
Expand Down
33 changes: 31 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_rpc_client::StreamClientPoolRef;
use uuid::Uuid;

use super::info::BarrierActorInfo;
use super::snapshot::SnapshotManagerRef;
use crate::barrier::CommandChanges;
use crate::manager::{FragmentManagerRef, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
Expand Down Expand Up @@ -158,6 +159,8 @@ impl Command {
pub struct CommandContext<S: MetaStore> {
fragment_manager: FragmentManagerRef<S>,

snapshot_manager: SnapshotManagerRef<S>,

client_pool: StreamClientPoolRef,

/// Resolved info in this barrier loop.
Expand All @@ -173,8 +176,10 @@ pub struct CommandContext<S: MetaStore> {
}

impl<S: MetaStore> CommandContext<S> {
pub fn new(
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
fragment_manager: FragmentManagerRef<S>,
snapshot_manager: SnapshotManagerRef<S>,
client_pool: StreamClientPoolRef,
info: BarrierActorInfo,
prev_epoch: Epoch,
Expand All @@ -184,6 +189,7 @@ impl<S: MetaStore> CommandContext<S> {
) -> Self {
Self {
fragment_manager,
snapshot_manager,
client_pool,
info: Arc::new(info),
prev_epoch,
Expand Down Expand Up @@ -371,7 +377,8 @@ where
}
}

/// Do some stuffs after barriers are collected, for the given command.
/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
pub async fn post_collect(&self) -> MetaResult<()> {
match &self.command {
#[allow(clippy::single_match)]
Expand Down Expand Up @@ -438,6 +445,11 @@ where
dependent_table_actors,
)
.await?;

// For mview creation, the snapshot ingestion may last for several epochs. By
// pinning a snapshot in `post_collect` which is called sequentially, we can ensure
// that the pinned snapshot is the just committed one.
self.snapshot_manager.pin(self.prev_epoch).await?;
}

Command::RescheduleFragment(reschedules) => {
Expand Down Expand Up @@ -489,4 +501,21 @@ where

Ok(())
}

/// Do some stuffs before the barrier is `finish`ed. Only used for `CreateMaterializedView`.
pub async fn pre_finish(&self) -> MetaResult<()> {
#[allow(clippy::single_match)]
match &self.command {
Command::CreateMaterializedView { .. } => {
// Since the compute node reports that the chain actors have caught up with the
// upstream and finished the creation, we can unpin the snapshot.
// TODO: we can unpin the snapshot earlier, when the snapshot ingestion is done.
self.snapshot_manager.unpin(self.prev_epoch).await?;
}

_ => {}
}

Ok(())
}
}
106 changes: 64 additions & 42 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use uuid::Uuid;
use self::command::CommandContext;
use self::info::BarrierActorInfo;
use self::notifier::Notifier;
use self::progress::TrackingCommand;
use self::snapshot::SnapshotManagerRef;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::snapshot::SnapshotManager;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::HummockManagerRef;
use crate::manager::{
Expand All @@ -60,6 +63,7 @@ mod notifier;
mod progress;
mod recovery;
mod schedule;
mod snapshot;

pub use self::command::{Command, Reschedule};
pub use self::schedule::BarrierScheduler;
Expand Down Expand Up @@ -123,6 +127,8 @@ pub struct GlobalBarrierManager<S: MetaStore> {

hummock_manager: HummockManagerRef<S>,

snapshot_manager: SnapshotManagerRef<S>,

source_manager: SourceManagerRef<S>,

metrics: Arc<MetaMetrics>,
Expand Down Expand Up @@ -151,7 +157,7 @@ struct CheckpointControl<S: MetaStore> {
metrics: Arc<MetaMetrics>,

/// Get notified when we finished Create MV and collect a barrier(checkpoint = true)
finished_notifiers: Vec<Notifier>,
finished_commands: Vec<TrackingCommand<S>>,
}

impl<S> CheckpointControl<S>
Expand All @@ -166,20 +172,32 @@ where
adding_actors: Default::default(),
removing_actors: Default::default(),
metrics,
finished_notifiers: Default::default(),
finished_commands: Default::default(),
}
}

/// To add `finished_notifiers`, we need an barrier(checkpoint = true) to handle it
fn add_finished_notifiers(&mut self, finished_notifiers: Vec<Notifier>) {
self.finished_notifiers.extend(finished_notifiers);
/// Stash a command to finish later.
fn stash_command_to_finish(&mut self, finished_command: TrackingCommand<S>) {
self.finished_commands.push(finished_command);
}

/// Process `finished_notifiers`, send success message
fn post_finished_notifiers(&mut self) {
self.finished_notifiers
.drain(..)
.for_each(Notifier::notify_finished);
/// Finish stashed commands. If the current barrier is not a `checkpoint`, we will not finish
/// the commands that requires a checkpoint, else we will finish all the commands.
///
/// Returns whether there are still remaining stashed commands to finish.
fn finish_commands(&mut self, checkpoint: bool) -> bool {
if checkpoint {
self.finished_commands
.drain(..)
.flat_map(|c| c.notifiers)
.for_each(Notifier::notify_finished);
} else {
self.finished_commands
.drain_filter(|c| !c.context.checkpoint)
.flat_map(|c| c.notifiers)
.for_each(Notifier::notify_finished);
}
!self.finished_commands.is_empty()
}

/// Before resolving the actors to be sent or collected, we should first record the newly
Expand Down Expand Up @@ -440,6 +458,8 @@ where
in_flight_barrier_nums,
);

let snapshot_manager = SnapshotManager::new(hummock_manager.clone()).into();

Self {
interval,
enable_recovery,
Expand All @@ -449,6 +469,7 @@ where
catalog_manager,
fragment_manager,
hummock_manager,
snapshot_manager,
source_manager,
metrics,
env,
Expand All @@ -466,7 +487,7 @@ where

/// Start an infinite loop to take scheduled barriers and send them.
async fn run(&self, mut shutdown_rx: Receiver<()>) {
let mut tracker = CreateMviewProgressTracker::default();
let mut tracker = CreateMviewProgressTracker::new();
let mut state = BarrierManagerState::create(self.env.meta_store()).await;
if self.enable_recovery {
// handle init, here we simply trigger a recovery process to achieve the consistency. We
Expand All @@ -475,12 +496,7 @@ where
assert!(new_epoch > state.in_flight_prev_epoch);
state.in_flight_prev_epoch = new_epoch;

let (new_epoch, actors_to_track, create_mview_progress) =
self.recovery(state.in_flight_prev_epoch, true).await;
tracker.add(new_epoch, actors_to_track, vec![]);
for progress in &create_mview_progress {
tracker.update(progress);
}
let new_epoch = self.recovery(state.in_flight_prev_epoch, true).await;
state.in_flight_prev_epoch = new_epoch;
state
.update_inflight_prev_epoch(self.env.meta_store())
Expand Down Expand Up @@ -557,6 +573,7 @@ where

let command_ctx = Arc::new(CommandContext::new(
self.fragment_manager.clone(),
self.snapshot_manager.clone(),
self.env.stream_client_pool_ref(),
info,
prev_epoch,
Expand Down Expand Up @@ -697,7 +714,7 @@ where
prev_epoch: u64,
result: MetaResult<Vec<BarrierCompleteResponse>>,
state: &mut BarrierManagerState,
tracker: &mut CreateMviewProgressTracker,
tracker: &mut CreateMviewProgressTracker<S>,
checkpoint_control: &mut CheckpointControl<S>,
) {
if let Err(err) = result {
Expand Down Expand Up @@ -738,7 +755,7 @@ where
err: MetaError,
fail_nodes: impl IntoIterator<Item = EpochNode<S>>,
state: &mut BarrierManagerState,
tracker: &mut CreateMviewProgressTracker,
tracker: &mut CreateMviewProgressTracker<S>,
checkpoint_control: &mut CheckpointControl<S>,
) {
checkpoint_control.clear_changes();
Expand All @@ -755,13 +772,8 @@ where
}
if self.enable_recovery {
// If failed, enter recovery mode.
let (new_epoch, actors_to_track, create_mview_progress) =
self.recovery(state.in_flight_prev_epoch, false).await;
*tracker = CreateMviewProgressTracker::default();
tracker.add(new_epoch, actors_to_track, vec![]);
for progress in &create_mview_progress {
tracker.update(progress);
}
*tracker = CreateMviewProgressTracker::new();
let new_epoch = self.recovery(state.in_flight_prev_epoch, false).await;
state.in_flight_prev_epoch = new_epoch;
state
.update_inflight_prev_epoch(self.env.meta_store())
Expand All @@ -776,7 +788,7 @@ where
async fn complete_barrier(
&self,
node: &mut EpochNode<S>,
tracker: &mut CreateMviewProgressTracker,
tracker: &mut CreateMviewProgressTracker<S>,
checkpoint_control: &mut CheckpointControl<S>,
) -> MetaResult<()> {
let prev_epoch = node.command_ctx.prev_epoch.0;
Expand Down Expand Up @@ -829,25 +841,35 @@ where
notifier.notify_collected();
});

// Save `finished_notifier` for Create MVs.
let actors_to_finish = node.command_ctx.actors_to_track();
let mut finished_notifiers =
tracker.add(node.command_ctx.curr_epoch, actors_to_finish, notifiers);
for progress in resps.iter().flat_map(|r| r.create_mview_progress.clone()) {
if let Some(mut notifier) = tracker.update(&progress) {
finished_notifiers.append(&mut notifier);
// Save `finished_commands` for Create MVs.
let finished_commands = {
let mut commands = vec![];
if let Some(command) = tracker.add(TrackingCommand {
context: node.command_ctx.clone(),
notifiers,
}) {
commands.push(command);
}
}
for progress in resps.iter().flat_map(|r| &r.create_mview_progress) {
if let Some(command) = tracker.update(progress) {
commands.push(command);
}
}
commands
};

// Force checkpoint in next barrier to finish creating mv.
if !finished_notifiers.is_empty() && !checkpoint {
self.scheduled_barriers.force_checkpoint_in_next_barrier();
for command in finished_commands {
// The command is ready to finish. We can now call `pre_finish`.
command.context.pre_finish().await?;
checkpoint_control.stash_command_to_finish(command);
}
checkpoint_control.add_finished_notifiers(finished_notifiers);

// Notify about collected with a barrier(checkpoint = true).
if checkpoint {
checkpoint_control.post_finished_notifiers();
let remaining = checkpoint_control.finish_commands(checkpoint);
// If there are remaining commands (that requires checkpoint to finish), we force
// the next barrier to be a checkpoint.
if remaining {
assert!(!checkpoint);
self.scheduled_barriers.force_checkpoint_in_next_barrier();
}

node.timer.take().unwrap().observe_duration();
Expand Down
Loading