Skip to content

Commit

Permalink
refactor(meta): deprecate persisted fragment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 20, 2025
1 parent 48b2492 commit 304b97d
Show file tree
Hide file tree
Showing 16 changed files with 125 additions and 280 deletions.
8 changes: 2 additions & 6 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,8 @@ message TableFragments {
reserved "vnode_mapping";

repeated uint32 state_table_ids = 6;
// Note that this can be derived backwards from the upstream actors of the Actor held by the Fragment,
// but in some scenarios (e.g. Scaling) it will lead to a lot of duplicate code,
// so we pre-generate and store it here, this member will only be initialized when creating the Fragment
// and modified when creating the mv-on-mv
repeated uint32 upstream_fragment_ids = 7;
reserved 7;
reserved "upstream_fragment_ids";

// Total vnode count of the fragment (then all internal tables).
// Duplicated from the length of the vnode bitmap in any actor of the fragment.
Expand Down Expand Up @@ -256,7 +253,6 @@ message ListFragmentDistributionResponse {
uint32 table_id = 2;
TableFragments.Fragment.FragmentDistributionType distribution_type = 3;
repeated uint32 state_table_ids = 4;
repeated uint32 upstream_fragment_ids = 5;
uint32 fragment_type_mask = 6;
uint32 parallelism = 7;
uint32 vnode_count = 8;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use risingwave_frontend_macro::system_catalog;
f.fragment_id,
f.distribution_type,
f.state_table_ids,
f.upstream_fragment_ids,
f.flags,
f.parallelism,
f.max_parallelism
Expand All @@ -50,7 +49,6 @@ struct RwFragmentParallelism {
fragment_id: i32,
distribution_type: String,
state_table_ids: Vec<i32>,
upstream_fragment_ids: Vec<i32>,
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct RwFragment {
table_id: i32,
distribution_type: String,
state_table_ids: Vec<i32>,
upstream_fragment_ids: Vec<i32>,
flags: Vec<String>,
parallelism: i32,
max_parallelism: i32,
Expand Down Expand Up @@ -63,11 +62,6 @@ async fn read_rw_fragment(reader: &SysCatalogReaderImpl) -> Result<Vec<RwFragmen
.into_iter()
.map(|id| id as i32)
.collect(),
upstream_fragment_ids: distribution
.upstream_fragment_ids
.into_iter()
.map(|id| id as i32)
.collect(),
flags: extract_fragment_type_flag(distribution.fragment_type_mask)
.into_iter()
.flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_"))
Expand Down
1 change: 1 addition & 0 deletions src/meta/model/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Model {
/// Specifically, `Merge` nodes' `upstream_actor_id` will be filled. (See `compose_fragment`)
pub stream_node: StreamNode,
pub state_table_ids: I32Array,
#[deprecated]
pub upstream_fragment_id: I32Array,
pub vnode_count: i32,
}
Expand Down
1 change: 0 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ impl StreamManagerService for StreamServiceImpl {
fragment_desc.distribution_type,
) as _,
state_table_ids: fragment_desc.state_table_ids.into_u32_array(),
upstream_fragment_ids: fragment_desc.upstream_fragment_id.into_u32_array(),
fragment_type_mask: fragment_desc.fragment_type_mask as _,
parallelism: fragment_desc.parallelism as _,
vnode_count: fragment_desc.vnode_count as _,
Expand Down
42 changes: 19 additions & 23 deletions src/meta/src/controller/catalog/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,10 @@ impl CatalogController {
.exec(txn)
.await?;

let fragments: Vec<(FragmentId, I32Array, StreamNode, i32)> = Fragment::find()
let fragments: Vec<(FragmentId, StreamNode, i32)> = Fragment::find()
.select_only()
.columns(vec![
fragment::Column::FragmentId,
fragment::Column::UpstreamFragmentId,
fragment::Column::StreamNode,
fragment::Column::FragmentTypeMask,
])
Expand All @@ -405,33 +404,28 @@ impl CatalogController {
.all(txn)
.await?;

for (fragment_id, upstream_fragment_ids, stream_node, fragment_mask) in fragments {
let mut upstream_fragment_ids = upstream_fragment_ids.into_inner();

let dirty_upstream_fragment_ids = upstream_fragment_ids
.extract_if(|id| !all_fragment_ids.contains(id))
.collect_vec();

if !dirty_upstream_fragment_ids.is_empty() {
for (fragment_id, stream_node, fragment_mask) in fragments {
{
// dirty downstream should be materialize fragment of table
assert!(fragment_mask & FragmentTypeFlag::Mview as i32 > 0);

tracing::info!(
"cleaning dirty table sink fragment {:?} from downstream fragment {}",
dirty_upstream_fragment_ids,
fragment_id
);
let mut dirty_upstream_fragment_ids = HashSet::new();

let mut pb_stream_node = stream_node.to_protobuf();

visit_stream_node_cont_mut(&mut pb_stream_node, |node| {
if let Some(NodeBody::Union(_)) = node.node_body {
node.input.retain_mut(|input| {
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body
&& all_fragment_ids
if let Some(NodeBody::Merge(merge_node)) = &mut input.node_body {
if all_fragment_ids
.contains(&(merge_node.upstream_fragment_id as i32))
{
true
{
true
} else {
dirty_upstream_fragment_ids
.insert(merge_node.upstream_fragment_id);
false
}
} else {
false
}
Expand All @@ -440,11 +434,13 @@ impl CatalogController {
true
});

tracing::info!(
"cleaning dirty table sink fragment {:?} from downstream fragment {}",
dirty_upstream_fragment_ids,
fragment_id
);

Fragment::update_many()
.col_expr(
fragment::Column::UpstreamFragmentId,
I32Array::from(upstream_fragment_ids).into(),
)
.col_expr(
fragment::Column::StreamNode,
StreamNode::from(&pb_stream_node).into(),
Expand Down
99 changes: 5 additions & 94 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ impl CatalogController {
distribution_type: pb_distribution_type,
actors: pb_actors,
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
nodes,
..
} = fragment;
Expand Down Expand Up @@ -282,22 +281,21 @@ impl CatalogController {
actor_dispatchers.insert(*actor_id as _, pb_dispatcher.clone());
}

let upstream_fragment_id = pb_upstream_fragment_ids.clone().into();

let stream_node = StreamNode::from(&stream_node);

let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
.unwrap()
.into();

#[expect(deprecated)]
let fragment = fragment::Model {
fragment_id: *pb_fragment_id as _,
job_id,
fragment_type_mask: *pb_fragment_type_mask as _,
distribution_type,
stream_node,
state_table_ids,
upstream_fragment_id,
upstream_fragment_id: Default::default(),
vnode_count: vnode_count as _,
};

Expand Down Expand Up @@ -364,13 +362,12 @@ impl CatalogController {
)> {
let fragment::Model {
fragment_id,
job_id: _,
fragment_type_mask,
distribution_type,
stream_node,
state_table_ids,
upstream_fragment_id,
vnode_count,
..
} = fragment;

let stream_node = stream_node.to_protobuf();
Expand Down Expand Up @@ -437,7 +434,6 @@ impl CatalogController {
})
}

let pb_upstream_fragment_ids = upstream_fragment_id.into_u32_array();
let pb_state_table_ids = state_table_ids.into_u32_array();
let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
let pb_fragment = PbFragment {
Expand All @@ -446,7 +442,6 @@ impl CatalogController {
distribution_type: pb_distribution_type,
actors: pb_actors,
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
nodes: Some(stream_node),
};
Expand Down Expand Up @@ -520,80 +515,6 @@ impl CatalogController {
Ok(fragment_jobs.into_iter().collect())
}

/// Gets the counts for each upstream relation that each stream job
/// indicated by `table_ids` depends on.
/// For example in the following query:
/// ```sql
/// CREATE MATERIALIZED VIEW m1 AS
/// SELECT * FROM t1 JOIN t2 ON t1.a = t2.a JOIN t3 ON t2.b = t3.b
/// ```
///
/// We have t1 occurring once, and t2 occurring once.
pub async fn get_upstream_job_counts(
&self,
job_ids: Vec<ObjectId>,
) -> MetaResult<HashMap<ObjectId, HashMap<ObjectId, usize>>> {
let inner = self.inner.read().await;
let upstream_fragments: Vec<(ObjectId, i32, I32Array)> = Fragment::find()
.select_only()
.columns([
fragment::Column::JobId,
fragment::Column::FragmentTypeMask,
fragment::Column::UpstreamFragmentId,
])
.filter(fragment::Column::JobId.is_in(job_ids))
.into_tuple()
.all(&inner.db)
.await?;

// filter out stream scan node.
let upstream_fragments = upstream_fragments
.into_iter()
.filter(|(_, mask, _)| (*mask & PbFragmentTypeFlag::StreamScan as i32) != 0)
.map(|(obj, _, upstream_fragments)| (obj, upstream_fragments.into_inner()))
.collect_vec();

// count by fragment id.
let upstream_fragment_counts = upstream_fragments
.iter()
.flat_map(|(_, upstream_fragments)| upstream_fragments.iter().cloned())
.counts();

// get fragment id to job id mapping.
let fragment_job_ids: Vec<(FragmentId, ObjectId)> = Fragment::find()
.select_only()
.columns([fragment::Column::FragmentId, fragment::Column::JobId])
.filter(
fragment::Column::FragmentId
.is_in(upstream_fragment_counts.keys().cloned().collect_vec()),
)
.into_tuple()
.all(&inner.db)
.await?;
let fragment_job_mapping: HashMap<FragmentId, ObjectId> =
fragment_job_ids.into_iter().collect();

// get upstream job counts.
let upstream_job_counts = upstream_fragments
.into_iter()
.map(|(job_id, upstream_fragments)| {
let upstream_job_counts = upstream_fragments
.into_iter()
.map(|upstream_fragment_id| {
let upstream_job_id =
fragment_job_mapping.get(&upstream_fragment_id).unwrap();
(
*upstream_job_id,
*upstream_fragment_counts.get(&upstream_fragment_id).unwrap(),
)
})
.collect();
(job_id, upstream_job_counts)
})
.collect();
Ok(upstream_job_counts)
}

pub async fn get_fragment_job_id(
&self,
fragment_ids: Vec<FragmentId>,
Expand Down Expand Up @@ -934,7 +855,6 @@ impl CatalogController {
fragment::Column::FragmentTypeMask,
fragment::Column::DistributionType,
fragment::Column::StateTableIds,
fragment::Column::UpstreamFragmentId,
fragment::Column::VnodeCount,
fragment::Column::StreamNode,
])
Expand Down Expand Up @@ -1813,10 +1733,6 @@ mod tests {
distribution_type: PbFragmentDistributionType::Hash as _,
actors: pb_actors.clone(),
state_table_ids: vec![TEST_STATE_TABLE_ID as _],
upstream_fragment_ids: upstream_actor_ids
.values()
.flat_map(|m| m.keys().map(|x| *x as _))
.collect(),
maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
nodes: Some(stream_node.clone()),
};
Expand Down Expand Up @@ -1921,14 +1837,15 @@ mod tests {
generate_merger_stream_node(template_upstream_actor_ids)
};

#[expect(deprecated)]
let fragment = fragment::Model {
fragment_id: TEST_FRAGMENT_ID,
job_id: TEST_JOB_ID,
fragment_type_mask: 0,
distribution_type: DistributionType::Hash,
stream_node: StreamNode::from(&stream_node),
state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
upstream_fragment_id: I32Array::default(),
upstream_fragment_id: Default::default(),
vnode_count: VirtualNode::COUNT_FOR_TEST as _,
};

Expand Down Expand Up @@ -2032,7 +1949,6 @@ mod tests {
distribution_type: pb_distribution_type,
actors: _,
state_table_ids: pb_state_table_ids,
upstream_fragment_ids: pb_upstream_fragment_ids,
maybe_vnode_count: _,
nodes,
} = pb_fragment;
Expand All @@ -2044,11 +1960,6 @@ mod tests {
PbFragmentDistributionType::from(fragment.distribution_type) as i32
);

assert_eq!(
pb_upstream_fragment_ids,
fragment.upstream_fragment_id.into_u32_array()
);

assert_eq!(
pb_state_table_ids,
fragment.state_table_ids.into_u32_array()
Expand Down
27 changes: 8 additions & 19 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1207,19 +1207,14 @@ impl CatalogController {
// 2. update merges.
// update downstream fragment's Merge node, and upstream_fragment_id
for (fragment_id, merge_updates) in merge_updates {
let (fragment_id, mut stream_node, mut upstream_fragment_id) =
Fragment::find_by_id(fragment_id as FragmentId)
.select_only()
.columns([
fragment::Column::FragmentId,
fragment::Column::StreamNode,
fragment::Column::UpstreamFragmentId,
])
.into_tuple::<(FragmentId, StreamNode, I32Array)>()
.one(txn)
.await?
.map(|(id, node, upstream)| (id, node.to_protobuf(), upstream))
.ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
.select_only()
.columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
.into_tuple::<(FragmentId, StreamNode)>()
.one(txn)
.await?
.map(|(id, node)| (id, node.to_protobuf()))
.ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
let fragment_replace_map: HashMap<_, _> = merge_updates
.iter()
.map(|update| {
Expand All @@ -1237,15 +1232,9 @@ impl CatalogController {
m.upstream_fragment_id = *new_fragment_id;
}
});
for fragment_id in &mut upstream_fragment_id.0 {
if let Some(new_fragment_id) = fragment_replace_map.get(&(*fragment_id as _)) {
*fragment_id = *new_fragment_id as _;
}
}
fragment::ActiveModel {
fragment_id: Set(fragment_id),
stream_node: Set(StreamNode::from(&stream_node)),
upstream_fragment_id: Set(upstream_fragment_id),
..Default::default()
}
.update(txn)
Expand Down
Loading

0 comments on commit 304b97d

Please sign in to comment.