Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): correctly resolve update of vnode mapping after scaling #8652

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ pub type ExpandedParallelUnitMapping = ExpandedMapping<marker::ParallelUnit>;

impl ActorMapping {
/// Transform this actor mapping to a parallel unit mapping, essentially `transform`.
pub fn to_parallel_unit(
&self,
to_map: &HashMap<ActorId, ParallelUnitId>,
) -> ParallelUnitMapping {
pub fn to_parallel_unit<M>(&self, to_map: &M) -> ParallelUnitMapping
where
M: for<'a> Index<&'a ActorId, Output = ParallelUnitId>,
{
self.transform(to_map)
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ pub struct Reschedule {
/// The upstream fragments of this fragment, and the dispatchers that should be updated.
pub upstream_fragment_dispatcher_ids: Vec<(FragmentId, DispatcherId)>,
/// New hash mapping of the upstream dispatcher to be updated.
///
/// This field exists only when there's upstream fragment and the current fragment is
/// hash-sharded.
pub upstream_dispatcher_mapping: Option<ActorMapping>,

/// The downstream fragments of this fragment.
Expand Down
67 changes: 39 additions & 28 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use std::sync::Arc;

use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::hash::{ActorMapping, ParallelUnitMapping};
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{ParallelUnit, WorkerNode};
Expand Down Expand Up @@ -736,44 +737,54 @@ where
let actor_status = table_fragment.actor_status.clone();
let fragment = table_fragment.fragments.get_mut(&fragment_id).unwrap();

fragment
.actors
.retain(|a| !removed_actor_ids.contains(&a.actor_id));

// update vnode mapping for actors.
for actor in &mut fragment.actors {
if let Some(bitmap) = vnode_bitmap_updates.get(&actor.actor_id) {
actor.vnode_bitmap = Some(bitmap.to_protobuf());
}
}

fragment
.actors
.retain(|a| !removed_actor_ids.contains(&a.actor_id));

// update fragment's vnode mapping
if let Some(vnode_mapping) = fragment.vnode_mapping.as_mut() {
let mut actor_to_parallel_unit = HashMap::with_capacity(fragment.actors.len());
for actor in &fragment.actors {
if let Some(actor_status) = actor_status.get(&actor.actor_id) {
if let Some(parallel_unit) = actor_status.parallel_unit.as_ref() {
actor_to_parallel_unit.insert(
actor.actor_id as ActorId,
parallel_unit.id as ParallelUnitId,
);
}
}
let mut actor_to_parallel_unit = HashMap::with_capacity(fragment.actors.len());
let mut actor_to_vnode_bitmap = HashMap::with_capacity(fragment.actors.len());
for actor in &fragment.actors {
let actor_status = &actor_status[&actor.actor_id];
let parallel_unit_id = actor_status.parallel_unit.as_ref().unwrap().id;
actor_to_parallel_unit.insert(actor.actor_id, parallel_unit_id);

if let Some(vnode_bitmap) = &actor.vnode_bitmap {
let bitmap = Bitmap::from(vnode_bitmap);
actor_to_vnode_bitmap.insert(actor.actor_id, bitmap);
}
}

if let Some(actor_mapping) = upstream_dispatcher_mapping.as_ref() {
*vnode_mapping = actor_mapping
.to_parallel_unit(&actor_to_parallel_unit)
.to_protobuf();
}
let vnode_mapping = if actor_to_vnode_bitmap.is_empty() {
// If there's no `vnode_bitmap`, then the fragment must be a singleton fragment.
// We directly use the single parallel unit to construct the mapping.
// TODO: also fill `vnode_bitmap` for the actor of singleton fragment so that we
// don't need this branch.
let parallel_unit = *actor_to_parallel_unit.values().exactly_one().unwrap();
ParallelUnitMapping::new_single(parallel_unit)
} else {
// Generate the parallel unit mapping from the fragment's actor bitmaps.
assert_eq!(actor_to_vnode_bitmap.len(), actor_to_parallel_unit.len());
ActorMapping::from_bitmaps(&actor_to_vnode_bitmap)
.to_parallel_unit(&actor_to_parallel_unit)
}
.to_protobuf();

if !fragment.state_table_ids.is_empty() {
let fragment_mapping = FragmentParallelUnitMapping {
fragment_id: fragment_id as FragmentId,
mapping: Some(vnode_mapping.clone()),
};
fragment_mapping_to_notify.push(fragment_mapping);
}
*fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone();

if !fragment.state_table_ids.is_empty() {
let fragment_mapping = FragmentParallelUnitMapping {
fragment_id: fragment_id as FragmentId,
mapping: Some(vnode_mapping),
};
fragment_mapping_to_notify.push(fragment_mapping);
}

// Second step, update upstream fragments
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/test_scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
for parallel_unit_num in simulated_parallel_unit_nums(None, None) {
let (actor_mapping, _) = generate_actor_mapping(parallel_unit_num);

let actor_to_parallel_unit_map = (0..parallel_unit_num)
let actor_to_parallel_unit_map: HashMap<_, _> = (0..parallel_unit_num)
.map(|i| (i as ActorId, i as ParallelUnitId))
.collect();
let parallel_unit_mapping = actor_mapping.to_parallel_unit(&actor_to_parallel_unit_map);
Expand Down