diff --git a/src/meta/model/src/actor.rs b/src/meta/model/src/actor.rs index 2164ea43c4031..54ea9286a3612 100644 --- a/src/meta/model/src/actor.rs +++ b/src/meta/model/src/actor.rs @@ -65,8 +65,6 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { - #[sea_orm(has_many = "super::actor_dispatcher::Entity")] - ActorDispatcher, #[sea_orm( belongs_to = "super::fragment::Entity", from = "Column::FragmentId", @@ -77,12 +75,6 @@ pub enum Relation { Fragment, } -impl Related for Entity { - fn to() -> RelationDef { - Relation::ActorDispatcher.def() - } -} - impl Related for Entity { fn to() -> RelationDef { Relation::Fragment.def() diff --git a/src/meta/model/src/actor_dispatcher.rs b/src/meta/model/src/actor_dispatcher.rs index 2358348c176e9..d4a010863ba82 100644 --- a/src/meta/model/src/actor_dispatcher.rs +++ b/src/meta/model/src/actor_dispatcher.rs @@ -14,12 +14,10 @@ use std::hash::Hash; -use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType}; +use risingwave_pb::stream_plan::PbDispatcherType; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -use crate::{ActorId, ActorMapping, FragmentId, I32Array}; - #[derive( Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize, )] @@ -57,79 +55,3 @@ impl From for PbDispatcherType { } } } - -impl From<(u32, PbDispatcher)> for Model { - fn from((actor_id, dispatcher): (u32, PbDispatcher)) -> Self { - Self { - id: 0, - actor_id: actor_id as _, - dispatcher_type: dispatcher.r#type().into(), - dist_key_indices: dispatcher.dist_key_indices.into(), - output_indices: dispatcher.output_indices.into(), - hash_mapping: dispatcher.hash_mapping.as_ref().map(ActorMapping::from), - dispatcher_id: dispatcher.dispatcher_id as _, - downstream_actor_ids: dispatcher.downstream_actor_id.into(), - } - } -} - -impl From for PbDispatcher { - fn from(model: Model) -> Self { - Self { - r#type: PbDispatcherType::from(model.dispatcher_type) as _, - dist_key_indices: model.dist_key_indices.into_u32_array(), - output_indices: model.output_indices.into_u32_array(), - hash_mapping: model.hash_mapping.map(|mapping| mapping.to_protobuf()), - dispatcher_id: model.dispatcher_id as _, - downstream_actor_id: model.downstream_actor_ids.into_u32_array(), - } - } -} - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Deserialize, Serialize)] -#[sea_orm(table_name = "actor_dispatcher")] -pub struct Model { - #[sea_orm(primary_key)] - pub id: i32, - pub actor_id: ActorId, - pub dispatcher_type: DispatcherType, - pub dist_key_indices: I32Array, - pub output_indices: I32Array, - pub hash_mapping: Option, - pub dispatcher_id: FragmentId, - pub downstream_actor_ids: I32Array, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation { - #[sea_orm( - belongs_to = "super::actor::Entity", - from = "Column::ActorId", - to = "super::actor::Column::ActorId", - on_update = "NoAction", - on_delete = "Cascade" - )] - Actor, - #[sea_orm( - belongs_to = "super::fragment::Entity", - from = "Column::DispatcherId", - to = "super::fragment::Column::FragmentId", - on_update = "NoAction", - on_delete = "Cascade" - )] - Fragment, -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Actor.def() - } -} - -impl Related for Entity { - fn to() -> RelationDef { - Relation::Fragment.def() - } -} - -impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/model/src/fragment.rs b/src/meta/model/src/fragment.rs index cb659cdb07a85..f060646cc4661 100644 --- a/src/meta/model/src/fragment.rs +++ b/src/meta/model/src/fragment.rs @@ -66,8 +66,6 @@ impl From for DistributionType { pub enum Relation { #[sea_orm(has_many = "super::actor::Entity")] Actor, - #[sea_orm(has_many = "super::actor_dispatcher::Entity")] - ActorDispatcher, #[sea_orm( belongs_to = "super::object::Entity", from = "Column::JobId", @@ -84,12 +82,6 @@ impl Related for Entity { } } -impl Related for Entity { - fn to() -> RelationDef { - Relation::ActorDispatcher.def() - } -} - impl Related for Entity { fn to() -> RelationDef { Relation::Object.def() diff --git a/src/meta/model/src/prelude.rs b/src/meta/model/src/prelude.rs index 079567839e5ae..3ac1cf8482a24 100644 --- a/src/meta/model/src/prelude.rs +++ b/src/meta/model/src/prelude.rs @@ -15,7 +15,6 @@ pub use {Source as SourceModel, Table as TableModel}; pub use super::actor::Entity as Actor; -pub use super::actor_dispatcher::Entity as ActorDispatcher; pub use super::catalog_version::Entity as CatalogVersion; pub use super::cluster::Entity as Cluster; pub use super::compaction_config::Entity as CompactionConfig; diff --git a/src/meta/src/backup_restore/restore_impl/v2.rs b/src/meta/src/backup_restore/restore_impl/v2.rs index 6ac82e1db00b8..51821ab710e90 100644 --- a/src/meta/src/backup_restore/restore_impl/v2.rs +++ b/src/meta/src/backup_restore/restore_impl/v2.rs @@ -143,7 +143,7 @@ impl Writer for WriterModelV2ToMetaStoreV2 { insert_models(metadata.streaming_jobs.clone(), db).await?; insert_models(metadata.fragments.clone(), db).await?; insert_models(metadata.actors.clone(), db).await?; - insert_models(metadata.actor_dispatchers.clone(), db).await?; + insert_models(metadata.fragment_relation.clone(), db).await?; insert_models(metadata.connections.clone(), db).await?; insert_models(metadata.sources.clone(), db).await?; insert_models(metadata.tables.clone(), db).await?; @@ -175,7 +175,6 @@ macro_rules! for_all_auto_increment { {"user", users, user_id}, {"user_privilege", user_privileges, id}, {"actor", actors, actor_id}, - {"actor_dispatcher", actor_dispatchers, id}, {"fragment", fragments, fragment_id}, {"object_dependency", object_dependencies, id} ) diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index b664f5f7973bf..52fb029790c2e 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -15,20 +15,22 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use anyhow::Context; +use anyhow::{anyhow, Context}; use futures::stream::BoxStream; -use futures::TryStreamExt; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model::actor::ActorStatus; +use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; -use risingwave_meta_model::prelude::{Actor, Fragment, Sink, StreamingJob}; +use risingwave_meta_model::prelude::{Actor, Fragment, FragmentRelation, Sink, StreamingJob}; use risingwave_meta_model::{ - actor, actor_dispatcher, database, fragment, object, sink, source, streaming_job, table, + actor, database, fragment, fragment_relation, object, sink, source, streaming_job, table, ActorId, ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; @@ -46,8 +48,8 @@ use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ - DispatchStrategy, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, PbStreamNode, - PbStreamScanType, StreamActor, StreamScanType, + DispatchStrategy, PbDispatcher, PbFragmentTypeFlag, PbStreamActor, PbStreamContext, + PbStreamNode, PbStreamScanType, StreamActor, StreamScanType, }; use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; @@ -61,8 +63,9 @@ use tracing::debug; use crate::barrier::SnapshotBackfillInfo; use crate::controller::catalog::{CatalogController, CatalogControllerInner}; use crate::controller::utils::{ - get_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, - FragmentDesc, PartialActorLocation, PartialFragmentStateTables, + get_fragment_actor_dispatchers, get_fragment_mappings, rebuild_fragment_mapping_from_actors, + resolve_no_shuffle_actor_dispatcher, FragmentDesc, PartialActorLocation, + PartialFragmentStateTables, }; use crate::manager::LocalNotification; use crate::model::{StreamContext, StreamJobFragments, TableParallelism}; @@ -167,14 +170,14 @@ impl CatalogController { } } - #[allow(clippy::type_complexity)] + #[expect(clippy::type_complexity)] pub fn extract_fragment_and_actors_from_fragments( stream_job_fragments: &StreamJobFragments, ) -> MetaResult< Vec<( fragment::Model, Vec, - HashMap>, + HashMap>, )>, > { let mut result = vec![]; @@ -202,7 +205,7 @@ impl CatalogController { ) -> MetaResult<( fragment::Model, Vec, - HashMap>, + HashMap>, )> { let vnode_count = fragment.vnode_count(); let PbFragment { @@ -276,13 +279,7 @@ impl CatalogController { vnode_bitmap: pb_vnode_bitmap.as_ref().map(VnodeBitmap::from), expr_context: ExprContext::from(pb_expr_context), }); - actor_dispatchers.insert( - *actor_id as _, - pb_dispatcher - .iter() - .map(|dispatcher| (*actor_id, dispatcher.clone()).into()) - .collect(), - ); + actor_dispatchers.insert(*actor_id as _, pb_dispatcher.clone()); } let upstream_fragment_id = pb_upstream_fragment_ids.clone().into(); @@ -307,7 +304,7 @@ impl CatalogController { Ok((fragment, actors, actor_dispatchers)) } - #[allow(clippy::type_complexity)] + #[expect(clippy::type_complexity)] pub fn compose_table_fragments( table_id: u32, state: PbState, @@ -315,7 +312,7 @@ impl CatalogController { fragments: Vec<( fragment::Model, Vec, - HashMap>, + HashMap>, )>, parallelism: StreamingParallelism, max_parallelism: usize, @@ -359,7 +356,7 @@ impl CatalogController { pub(crate) fn compose_fragment( fragment: fragment::Model, actors: Vec, - mut actor_dispatcher: HashMap>, + mut actor_dispatcher: HashMap>, ) -> MetaResult<( PbFragment, HashMap, @@ -416,12 +413,7 @@ impl CatalogController { let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.to_protobuf()); let pb_expr_context = Some(expr_context.to_protobuf()); - let pb_dispatcher = actor_dispatcher - .remove(&actor_id) - .unwrap_or_default() - .into_iter() - .map(Into::into) - .collect(); + let pb_dispatcher = actor_dispatcher.remove(&actor_id).unwrap_or_default(); pb_actor_status.insert( actor_id as _, @@ -647,11 +639,11 @@ impl CatalogController { .filter(fragment::Column::JobId.eq(job_id)) .all(&inner.db) .await?; - let mut actor_dispatchers = get_actor_dispatchers( + let mut fragment_actor_dispatchers = get_fragment_actor_dispatchers( &inner.db, fragment_actors .iter() - .flat_map(|(_, actors)| actors.iter().map(|actor| actor.actor_id)) + .map(|(fragment, _)| fragment.fragment_id) .collect(), ) .await?; @@ -663,9 +655,13 @@ impl CatalogController { let mut fragment_info = vec![]; for (fragment, actors) in fragment_actors { let mut dispatcher_info = HashMap::new(); - for actor in &actors { - if let Some(dispatchers) = actor_dispatchers.remove(&actor.actor_id) { - dispatcher_info.insert(actor.actor_id, dispatchers); + if let Some(mut fragment_actor_dispatchers) = + fragment_actor_dispatchers.remove(&fragment.fragment_id) + { + for actor in &actors { + if let Some(dispatchers) = fragment_actor_dispatchers.remove(&actor.actor_id) { + dispatcher_info.insert(actor.actor_id, dispatchers); + } } } fragment_info.push((fragment, actors, dispatcher_info)); @@ -847,22 +843,29 @@ impl CatalogController { .filter(fragment::Column::JobId.eq(job.job_id)) .all(&inner.db) .await?; - let mut actor_dispatchers = get_actor_dispatchers( + let mut fragment_actor_dispatchers = get_fragment_actor_dispatchers( &inner.db, fragment_actors .iter() - .flat_map(|(_, actors)| actors.iter().map(|actor| actor.actor_id)) + .map(|(fragment, _)| fragment.fragment_id) .collect(), ) .await?; let mut fragment_info = vec![]; for (fragment, actors) in fragment_actors { let mut dispatcher_info = HashMap::new(); - for actor in &actors { - if let Some(dispatchers) = actor_dispatchers.remove(&actor.actor_id) { - dispatcher_info.insert(actor.actor_id, dispatchers); + if let Some(mut fragment_actor_dispatchers) = + fragment_actor_dispatchers.remove(&fragment.fragment_id) + { + for actor in &actors { + if let Some(dispatchers) = + fragment_actor_dispatchers.remove(&actor.actor_id) + { + dispatcher_info.insert(actor.actor_id, dispatchers); + } } } + fragment_info.push((fragment, actors, dispatcher_info)); } table_fragments.insert( @@ -1239,26 +1242,22 @@ impl CatalogController { .await? }; - let mut actor_dispatchers = get_actor_dispatchers( + let mut fragment_actor_dispatcher = get_fragment_actor_dispatchers( &inner.db, fragment_actors .iter() - .flat_map(|(_, actors)| actors.iter().map(|actor| actor.actor_id)) + .map(|(fragment, _)| fragment.fragment_id) .collect(), ) .await?; let mut node_actors = HashMap::new(); for (fragment, actors) in fragment_actors { - let mut dispatcher_info = HashMap::new(); - for actor in &actors { - if let Some(dispatchers) = actor_dispatchers.remove(&actor.actor_id) { - dispatcher_info.insert(actor.actor_id, dispatchers); - } - } - + let fragment_dispatcher = fragment_actor_dispatcher + .remove(&fragment.fragment_id) + .unwrap_or_default(); let (table_fragments, actor_status, _) = - Self::compose_fragment(fragment, actors, dispatcher_info)?; + Self::compose_fragment(fragment, actors, fragment_dispatcher)?; for actor in table_fragments.actors { let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId; node_actors @@ -1376,6 +1375,7 @@ impl CatalogController { } /// Get the actor ids, and each actor's upstream source actor ids of the fragment with `fragment_id` with `Running` status. + /// (`backfill_actor_id`, `upstream_source_actor_id`) pub async fn get_running_actors_for_source_backfill( &self, source_backfill_fragment_id: FragmentId, @@ -1384,50 +1384,79 @@ impl CatalogController { let inner = self.inner.read().await; let txn = inner.db.begin().await?; - let source_backfill_actors: Vec = Actor::find() + let fragment_relation: DispatcherType = FragmentRelation::find() .select_only() - .column(actor::Column::ActorId) - .filter(actor::Column::FragmentId.eq(source_backfill_fragment_id)) - .filter(actor::Column::Status.eq(ActorStatus::Running)) + .column(fragment_relation::Column::DispatcherType) + .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id)) + .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id)) .into_tuple() - .all(&txn) - .await?; - - let source_actor_downstreams: Vec<(ActorId, I32Array)> = Actor::find() - .select_only() - .column(actor::Column::ActorId) - .column(actor_dispatcher::Column::DownstreamActorIds) - .filter(actor::Column::FragmentId.eq(source_fragment_id)) - .join(JoinType::InnerJoin, actor::Relation::ActorDispatcher.def()) - .into_tuple() - .all(&txn) - .await?; + .one(&txn) + .await? + .ok_or_else(|| { + anyhow!( + "no fragment connection from source fragment {} to source backfill fragment {}", + source_fragment_id, + source_backfill_fragment_id + ) + })?; - let mut source_backfill_actor_upstreams: HashMap> = HashMap::new(); - for (source_actor_id, downstream_actors) in source_actor_downstreams { - for downstream_source_backfill_actor_id in downstream_actors.into_inner() { - source_backfill_actor_upstreams - .entry(downstream_source_backfill_actor_id) - .or_default() - .push(source_actor_id); - } + if fragment_relation != DispatcherType::NoShuffle { + return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into()); } - Ok(source_backfill_actors - .into_iter() - .map(|actor_id| { - let upstream_source_actors = &source_backfill_actor_upstreams[&actor_id]; - assert_eq!( - upstream_source_actors.len(), - 1, - "expect only one upstream source actor, but got {:?}, actor_id: {}, fragment_id: {}", - upstream_source_actors, - actor_id, - source_backfill_fragment_id, - ); - (actor_id, upstream_source_actors[0]) - }) - .collect()) + let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move { + let result: MetaResult = try { + Fragment::find_by_id(fragment_id) + .select_only() + .column(fragment::Column::DistributionType) + .into_tuple() + .one(txn) + .await? + .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))? + }; + result + }; + + let source_backfill_distribution_type = + load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?; + let source_distribution_type = + load_fragment_distribution_type(&txn, source_fragment_id).await?; + + let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move { + Actor::find() + .select_only() + .column(actor::Column::ActorId) + .column(actor::Column::VnodeBitmap) + .filter(actor::Column::FragmentId.eq(fragment_id)) + .into_tuple() + .stream(txn) + .await? + .map(|result| { + result.map(|(actor_id, vnode): (ActorId, Option)| { + ( + actor_id, + vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())), + ) + }) + }) + .try_collect() + .await + }; + + let source_backfill_actors: HashMap> = + load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?; + + let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?; + + Ok(resolve_no_shuffle_actor_dispatcher( + source_distribution_type, + &source_actors, + source_backfill_distribution_type, + &source_backfill_actors, + ) + .into_iter() + .map(|(source_actor, source_backfill_actor)| (source_backfill_actor, source_actor)) + .collect()) } pub async fn get_actors_by_job_ids(&self, job_ids: Vec) -> MetaResult> { @@ -1479,18 +1508,25 @@ impl CatalogController { } } + let mut fragment_actor_dispatcher = get_fragment_actor_dispatchers( + &inner.db, + root_fragments + .values() + .map(|fragment| fragment.fragment_id) + .collect(), + ) + .await?; + let mut root_fragments_pb = HashMap::new(); for (_, fragment) in root_fragments { let actors = fragment.find_related(Actor).all(&inner.db).await?; - let actor_dispatchers = get_actor_dispatchers( - &inner.db, - actors.iter().map(|actor| actor.actor_id).collect(), - ) - .await?; + let fragment_dispatcher = fragment_actor_dispatcher + .remove(&fragment.fragment_id) + .unwrap_or_default(); root_fragments_pb.insert( fragment.job_id, - Self::compose_fragment(fragment, actors, actor_dispatchers)?.0, + Self::compose_fragment(fragment, actors, fragment_dispatcher)?.0, ); } @@ -1527,6 +1563,9 @@ impl CatalogController { let dispatches = root_fragment.dispatches(); let inner = self.inner.read().await; + let mut fragment_actor_dispatchers = + get_fragment_actor_dispatchers(&inner.db, dispatches.keys().cloned().collect()).await?; + let mut downstream_fragments = vec![]; for (fragment_id, dispatch_strategy) in dispatches { let mut fragment_actors = Fragment::find_by_id(fragment_id) @@ -1538,11 +1577,9 @@ impl CatalogController { } assert_eq!(fragment_actors.len(), 1); let (fragment, actors) = fragment_actors.pop().unwrap(); - let actor_dispatchers = get_actor_dispatchers( - &inner.db, - actors.iter().map(|actor| actor.actor_id).collect(), - ) - .await?; + let actor_dispatchers = fragment_actor_dispatchers + .remove(&fragment_id) + .unwrap_or_default(); let fragment = Self::compose_fragment(fragment, actors, actor_dispatchers)?.0; downstream_fragments.push((dispatch_strategy, fragment)); } @@ -1665,8 +1702,8 @@ mod tests { use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, ActorId, ConnectorSplits, ExprContext, FragmentId, - I32Array, ObjectId, StreamNode, TableId, VnodeBitmap, + actor, fragment, ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, + StreamNode, TableId, VnodeBitmap, }; use risingwave_pb::common::PbActorLocation; use risingwave_pb::meta::table_fragments::actor_status::PbActorState; @@ -1870,13 +1907,7 @@ mod tests { let actor_dispatchers: HashMap<_, _> = (0..actor_count) .map(|actor_id| { let dispatchers = generate_dispatchers_for_actor(actor_id); - ( - actor_id as ActorId, - dispatchers - .into_iter() - .map(|dispatcher| (actor_id, dispatcher).into()) - .collect_vec(), - ) + (actor_id as ActorId, dispatchers) }) .collect(); @@ -1929,7 +1960,7 @@ mod tests { fn check_actors( actors: Vec, actor_upstreams: &FragmentActorUpstreams, - mut actor_dispatchers: HashMap>, + mut actor_dispatchers: HashMap>, pb_actors: Vec, pb_actor_splits: HashMap, stream_node: &PbStreamNode, diff --git a/src/meta/src/controller/scale.rs b/src/meta/src/controller/scale.rs index 612a0507ee705..b370777b85f74 100644 --- a/src/meta/src/controller/scale.rs +++ b/src/meta/src/controller/scale.rs @@ -17,29 +17,29 @@ use std::ops::{BitAnd, BitOrAssign}; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash; use risingwave_connector::source::{SplitImpl, SplitMetaData}; use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::prelude::{ - Actor, ActorDispatcher, Fragment, Sink, Source, StreamingJob, Table, + Actor, Fragment, FragmentRelation, Sink, Source, StreamingJob, Table, }; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, sink, source, streaming_job, table, ActorId, ActorMapping, - ConnectorSplits, FragmentId, I32Array, ObjectId, VnodeBitmap, + actor, fragment, fragment_relation, sink, source, streaming_job, table, ActorId, + ConnectorSplits, FragmentId, ObjectId, VnodeBitmap, }; use risingwave_meta_model_migration::{ Alias, CommonTableExpression, Expr, IntoColumnRef, QueryStatementBuilder, SelectStatement, UnionType, WithClause, WithQuery, }; +use risingwave_pb::stream_plan::PbDispatcher; use sea_orm::{ ColumnTrait, ConnectionTrait, DbErr, DerivePartialModel, EntityTrait, FromQueryResult, - JoinType, QueryFilter, QuerySelect, RelationTrait, Statement, TransactionTrait, + QueryFilter, QuerySelect, Statement, TransactionTrait, }; use crate::controller::catalog::CatalogController; -use crate::controller::utils::get_existing_job_resource_group; +use crate::controller::utils::{get_existing_job_resource_group, get_fragment_actor_dispatchers}; use crate::{MetaError, MetaResult}; /// This function will construct a query using recursive cte to find `no_shuffle` upstream relation graph for target fragments. @@ -53,9 +53,9 @@ use crate::{MetaError, MetaResult}; /// /// let query = construct_no_shuffle_upstream_traverse_query(vec![2, 3]); /// -/// assert_eq!(query.to_string(MysqlQueryBuilder), r#"WITH RECURSIVE `shuffle_deps` (`fragment_id`, `dispatcher_type`, `dispatcher_id`) AS (SELECT DISTINCT `actor`.`fragment_id`, `actor_dispatcher`.`dispatcher_type`, `actor_dispatcher`.`dispatcher_id` FROM `actor` INNER JOIN `actor_dispatcher` ON `actor`.`actor_id` = `actor_dispatcher`.`actor_id` WHERE `actor_dispatcher`.`dispatcher_type` = 'NO_SHUFFLE' AND `actor_dispatcher`.`dispatcher_id` IN (2, 3) UNION ALL (SELECT `actor`.`fragment_id`, `actor_dispatcher`.`dispatcher_type`, `actor_dispatcher`.`dispatcher_id` FROM `actor` INNER JOIN `actor_dispatcher` ON `actor`.`actor_id` = `actor_dispatcher`.`actor_id` INNER JOIN `shuffle_deps` ON `shuffle_deps`.`fragment_id` = `actor_dispatcher`.`dispatcher_id` WHERE `actor_dispatcher`.`dispatcher_type` = 'NO_SHUFFLE')) SELECT DISTINCT `fragment_id`, `dispatcher_type`, `dispatcher_id` FROM `shuffle_deps`"#); -/// assert_eq!(query.to_string(PostgresQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("fragment_id", "dispatcher_type", "dispatcher_id") AS (SELECT DISTINCT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE' AND "actor_dispatcher"."dispatcher_id" IN (2, 3) UNION ALL (SELECT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" INNER JOIN "shuffle_deps" ON "shuffle_deps"."fragment_id" = "actor_dispatcher"."dispatcher_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE')) SELECT DISTINCT "fragment_id", "dispatcher_type", "dispatcher_id" FROM "shuffle_deps""#); -/// assert_eq!(query.to_string(SqliteQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("fragment_id", "dispatcher_type", "dispatcher_id") AS (SELECT DISTINCT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE' AND "actor_dispatcher"."dispatcher_id" IN (2, 3) UNION ALL SELECT "actor"."fragment_id", "actor_dispatcher"."dispatcher_type", "actor_dispatcher"."dispatcher_id" FROM "actor" INNER JOIN "actor_dispatcher" ON "actor"."actor_id" = "actor_dispatcher"."actor_id" INNER JOIN "shuffle_deps" ON "shuffle_deps"."fragment_id" = "actor_dispatcher"."dispatcher_id" WHERE "actor_dispatcher"."dispatcher_type" = 'NO_SHUFFLE') SELECT DISTINCT "fragment_id", "dispatcher_type", "dispatcher_id" FROM "shuffle_deps""#); +/// assert_eq!(query.to_string(MysqlQueryBuilder), r#"WITH RECURSIVE `shuffle_deps` (`source_fragment_id`, `dispatcher_type`, `target_fragment_id`) AS (SELECT DISTINCT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE' AND `fragment_relation`.`target_fragment_id` IN (2, 3) UNION ALL (SELECT `fragment_relation`.`source_fragment_id`, `fragment_relation`.`dispatcher_type`, `fragment_relation`.`target_fragment_id` FROM `fragment_relation` INNER JOIN `shuffle_deps` ON `shuffle_deps`.`source_fragment_id` = `fragment_relation`.`target_fragment_id` WHERE `fragment_relation`.`dispatcher_type` = 'NO_SHUFFLE')) SELECT DISTINCT `source_fragment_id`, `dispatcher_type`, `target_fragment_id` FROM `shuffle_deps`"#); +/// assert_eq!(query.to_string(PostgresQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL (SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE')) SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#); +/// assert_eq!(query.to_string(SqliteQueryBuilder), r#"WITH RECURSIVE "shuffle_deps" ("source_fragment_id", "dispatcher_type", "target_fragment_id") AS (SELECT DISTINCT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE' AND "fragment_relation"."target_fragment_id" IN (2, 3) UNION ALL SELECT "fragment_relation"."source_fragment_id", "fragment_relation"."dispatcher_type", "fragment_relation"."target_fragment_id" FROM "fragment_relation" INNER JOIN "shuffle_deps" ON "shuffle_deps"."source_fragment_id" = "fragment_relation"."target_fragment_id" WHERE "fragment_relation"."dispatcher_type" = 'NO_SHUFFLE') SELECT DISTINCT "source_fragment_id", "dispatcher_type", "target_fragment_id" FROM "shuffle_deps""#); /// ``` pub fn construct_no_shuffle_upstream_traverse_query(fragment_ids: Vec) -> WithQuery { construct_no_shuffle_traverse_query_helper(fragment_ids, NoShuffleResolveDirection::Upstream) @@ -77,76 +77,90 @@ fn construct_no_shuffle_traverse_query_helper( let cte_alias = Alias::new("shuffle_deps"); // If we need to look upwards - // resolve by fragment_id -> dispatcher_id + // resolve by upstream fragment_id -> downstream fragment_id // and if downwards - // resolve by dispatcher_id -> fragment_id + // resolve by downstream fragment_id -> upstream fragment_id let (cte_ref_column, compared_column) = match direction { NoShuffleResolveDirection::Upstream => ( - (cte_alias.clone(), actor::Column::FragmentId).into_column_ref(), - (ActorDispatcher, actor_dispatcher::Column::DispatcherId).into_column_ref(), + ( + cte_alias.clone(), + fragment_relation::Column::SourceFragmentId, + ) + .into_column_ref(), + ( + FragmentRelation, + fragment_relation::Column::TargetFragmentId, + ) + .into_column_ref(), ), NoShuffleResolveDirection::Downstream => ( - (cte_alias.clone(), actor_dispatcher::Column::DispatcherId).into_column_ref(), - (Actor, actor::Column::FragmentId).into_column_ref(), + ( + cte_alias.clone(), + fragment_relation::Column::TargetFragmentId, + ) + .into_column_ref(), + ( + FragmentRelation, + fragment_relation::Column::SourceFragmentId, + ) + .into_column_ref(), ), }; let mut base_query = SelectStatement::new() - .column((Actor, actor::Column::FragmentId)) - .column((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) - .column((ActorDispatcher, actor_dispatcher::Column::DispatcherId)) + .column(( + FragmentRelation, + fragment_relation::Column::SourceFragmentId, + )) + .column((FragmentRelation, fragment_relation::Column::DispatcherType)) + .column(( + FragmentRelation, + fragment_relation::Column::TargetFragmentId, + )) .distinct() - .from(Actor) - .inner_join( - ActorDispatcher, - Expr::col((Actor, actor::Column::ActorId)).eq(Expr::col(( - ActorDispatcher, - actor_dispatcher::Column::ActorId, - ))), - ) + .from(FragmentRelation) .and_where( - Expr::col((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) + Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType)) .eq(DispatcherType::NoShuffle), ) .and_where(Expr::col(compared_column.clone()).is_in(fragment_ids.clone())) .to_owned(); let cte_referencing = SelectStatement::new() - .column((Actor, actor::Column::FragmentId)) - .column((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) - .column((ActorDispatcher, actor_dispatcher::Column::DispatcherId)) + .column(( + FragmentRelation, + fragment_relation::Column::SourceFragmentId, + )) + .column((FragmentRelation, fragment_relation::Column::DispatcherType)) + .column(( + FragmentRelation, + fragment_relation::Column::TargetFragmentId, + )) // NOTE: Uncomment me once MySQL supports DISTINCT in the recursive block of CTE. //.distinct() - .from(Actor) - .inner_join( - ActorDispatcher, - Expr::col((Actor, actor::Column::ActorId)).eq(Expr::col(( - ActorDispatcher, - actor_dispatcher::Column::ActorId, - ))), - ) + .from(FragmentRelation) .inner_join( cte_alias.clone(), Expr::col(cte_ref_column).eq(Expr::col(compared_column)), ) .and_where( - Expr::col((ActorDispatcher, actor_dispatcher::Column::DispatcherType)) + Expr::col((FragmentRelation, fragment_relation::Column::DispatcherType)) .eq(DispatcherType::NoShuffle), ) .to_owned(); let common_table_expr = CommonTableExpression::new() .query(base_query.union(UnionType::All, cte_referencing).to_owned()) - .column(actor::Column::FragmentId) - .column(actor_dispatcher::Column::DispatcherType) - .column(actor_dispatcher::Column::DispatcherId) + .column(fragment_relation::Column::SourceFragmentId) + .column(fragment_relation::Column::DispatcherType) + .column(fragment_relation::Column::TargetFragmentId) .table_name(cte_alias.clone()) .to_owned(); SelectStatement::new() - .column(actor::Column::FragmentId) - .column(actor_dispatcher::Column::DispatcherType) - .column(actor_dispatcher::Column::DispatcherId) + .column(fragment_relation::Column::SourceFragmentId) + .column(fragment_relation::Column::DispatcherType) + .column(fragment_relation::Column::TargetFragmentId) .distinct() .from(cte_alias.clone()) .to_owned() @@ -163,10 +177,10 @@ fn construct_no_shuffle_traverse_query_helper( pub struct RescheduleWorkingSet { pub fragments: HashMap, pub actors: HashMap, - pub actor_dispatchers: HashMap>, + pub actor_dispatchers: HashMap>, - pub fragment_downstreams: HashMap>, - pub fragment_upstreams: HashMap>, + pub fragment_downstreams: HashMap>, + pub fragment_upstreams: HashMap>, pub job_resource_groups: HashMap, pub related_jobs: HashMap, @@ -323,18 +337,19 @@ impl CatalogController { .chain(fragment_ids.iter().cloned()) .collect(); - let query = Actor::find() + let query = FragmentRelation::find() .select_only() - .column(actor::Column::FragmentId) - .column(actor_dispatcher::Column::DispatcherType) - .column(actor_dispatcher::Column::DispatcherId) - .distinct() - .join(JoinType::InnerJoin, actor::Relation::ActorDispatcher.def()); + .column(fragment_relation::Column::SourceFragmentId) + .column(fragment_relation::Column::DispatcherType) + .column(fragment_relation::Column::TargetFragmentId) + .distinct(); // single-layer upstream fragment ids let upstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query .clone() - .filter(actor_dispatcher::Column::DispatcherId.is_in(extended_fragment_ids.clone())) + .filter( + fragment_relation::Column::TargetFragmentId.is_in(extended_fragment_ids.clone()), + ) .into_tuple() .all(txn) .await?; @@ -342,7 +357,9 @@ impl CatalogController { // single-layer downstream fragment ids let downstream_fragments: Vec<(FragmentId, DispatcherType, FragmentId)> = query .clone() - .filter(actor::Column::FragmentId.is_in(extended_fragment_ids.clone())) + .filter( + fragment_relation::Column::SourceFragmentId.is_in(extended_fragment_ids.clone()), + ) .into_tuple() .all(txn) .await?; @@ -354,20 +371,20 @@ impl CatalogController { .chain(downstream_fragments.into_iter()) .collect(); - let mut fragment_upstreams: HashMap> = + let mut fragment_upstreams: HashMap> = HashMap::new(); - let mut fragment_downstreams: HashMap> = + let mut fragment_downstreams: HashMap> = HashMap::new(); for (src, dispatcher_type, dst) in &all_fragment_relations { fragment_upstreams .entry(*dst) .or_default() - .push((*src, *dispatcher_type)); + .insert(*src, *dispatcher_type); fragment_downstreams .entry(*src) .or_default() - .push((*dst, *dispatcher_type)); + .insert(*dst, *dispatcher_type); } let all_fragment_ids: HashSet<_> = all_fragment_relations @@ -381,20 +398,15 @@ impl CatalogController { .all(txn) .await?; - let actor_and_dispatchers: Vec<(_, _)> = Actor::find() + let actors: Vec<_> = Actor::find() .filter(actor::Column::FragmentId.is_in(all_fragment_ids.clone())) - .find_with_related(ActorDispatcher) .all(txn) .await?; - let mut actors = HashMap::with_capacity(actor_and_dispatchers.len()); - let mut actor_dispatchers = HashMap::with_capacity(actor_and_dispatchers.len()); - - for (actor, dispatchers) in actor_and_dispatchers { - let actor_id = actor.actor_id; - actors.insert(actor_id, actor); - actor_dispatchers.insert(actor_id, dispatchers); - } + let actors: HashMap<_, _> = actors + .into_iter() + .map(|actor| (actor.actor_id, actor)) + .collect(); let fragments: HashMap = fragments .into_iter() @@ -435,10 +447,19 @@ impl CatalogController { }) .collect(); + let fragment_actor_dispatchers = get_fragment_actor_dispatchers( + txn, + fragments + .keys() + .map(|fragment_id| *fragment_id as _) + .collect(), + ) + .await?; + Ok(RescheduleWorkingSet { fragments, actors, - actor_dispatchers, + actor_dispatchers: fragment_actor_dispatchers.into_values().flatten().collect(), fragment_downstreams, fragment_upstreams, job_resource_groups, @@ -469,23 +490,11 @@ impl CatalogController { where C: ConnectionTrait, { - #[derive(Clone, DerivePartialModel, FromQueryResult)] - #[sea_orm(entity = "ActorDispatcher")] - pub struct PartialActorDispatcher { - pub id: i32, - pub actor_id: ActorId, - pub dispatcher_type: DispatcherType, - pub hash_mapping: Option, - pub dispatcher_id: FragmentId, - pub downstream_actor_ids: I32Array, - } - #[derive(Clone, DerivePartialModel, FromQueryResult)] #[sea_orm(entity = "Fragment")] pub struct PartialFragment { pub fragment_id: FragmentId, pub distribution_type: DistributionType, - pub upstream_fragment_id: I32Array, pub vnode_count: i32, } @@ -524,13 +533,6 @@ impl CatalogController { .map(|actor| (actor.actor_id, actor)) .collect(); - let actor_dispatchers: Vec = ActorDispatcher::find() - .into_partial_model() - .all(txn) - .await?; - - let mut discovered_upstream_fragments = HashMap::new(); - for (fragment_id, actor_ids) in &fragment_actors { crit_check_in_loop!( flag, @@ -642,232 +644,6 @@ impl CatalogController { } } - for PartialActorDispatcher { - id, - actor_id, - dispatcher_type, - hash_mapping, - dispatcher_id, - downstream_actor_ids, - } in &actor_dispatchers - { - crit_check_in_loop!( - flag, - actor_map.contains_key(actor_id), - format!("ActorDispatcher {id} has actor_id {actor_id} which does not exist",) - ); - - let actor = &actor_map[actor_id]; - - crit_check_in_loop!( - flag, - fragment_map.contains_key(dispatcher_id), - format!( - "ActorDispatcher {id} has dispatcher_id {dispatcher_id} which does not exist", - ) - ); - - discovered_upstream_fragments - .entry(*dispatcher_id) - .or_insert(HashSet::new()) - .insert(actor.fragment_id); - - let downstream_fragment = &fragment_map[dispatcher_id]; - - crit_check_in_loop!( - flag, - downstream_fragment.upstream_fragment_id.inner_ref().contains(&actor.fragment_id), - format!( - "ActorDispatcher {} has downstream fragment {} which does not have upstream fragment {}", - id, dispatcher_id, actor.fragment_id - ) - ); - - crit_check_in_loop!( - flag, - fragment_actors.contains_key(dispatcher_id), - format!( - "ActorDispatcher {id} has downstream fragment {dispatcher_id} which has no actors", - ) - ); - - let dispatcher_downstream_actor_ids: HashSet<_> = - downstream_actor_ids.inner_ref().iter().cloned().collect(); - - let target_fragment_actor_ids = &fragment_actors[dispatcher_id]; - - for dispatcher_downstream_actor_id in &dispatcher_downstream_actor_ids { - crit_check_in_loop!( - flag, - actor_map.contains_key(dispatcher_downstream_actor_id), - format!( - "ActorDispatcher {id} has downstream_actor_id {dispatcher_downstream_actor_id} which does not exist", - ) - ); - } - - match dispatcher_type { - DispatcherType::NoShuffle => {} - _ => { - crit_check_in_loop!( - flag, - &dispatcher_downstream_actor_ids == target_fragment_actor_ids, - format!( - "ActorDispatcher {id} has downstream fragment {dispatcher_id}, but dispatcher downstream actor ids: {dispatcher_downstream_actor_ids:?} != target fragment actor ids: {target_fragment_actor_ids:?}", - ) - ); - } - } - - match dispatcher_type { - DispatcherType::Hash => { - crit_check_in_loop!( - flag, - hash_mapping.is_some(), - format!( - "ActorDispatcher {id} has no hash_mapping set for {dispatcher_type:?}", - ) - ); - } - _ => { - crit_check_in_loop!( - flag, - hash_mapping.is_none(), - format!( - "ActorDispatcher {id} has hash_mapping set for {dispatcher_type:?}" - ) - ); - } - } - - match dispatcher_type { - DispatcherType::Simple | DispatcherType::NoShuffle => { - crit_check_in_loop!( - flag, - dispatcher_downstream_actor_ids.len() == 1, - format!( - "ActorDispatcher {id} has more than one downstream_actor_ids for {dispatcher_type:?}", - ) - ); - } - _ => {} - } - - match dispatcher_type { - DispatcherType::Hash => { - let mapping = hash::ActorMapping::from_protobuf( - &hash_mapping.as_ref().unwrap().to_protobuf(), - ); - - let mapping_actors: HashSet<_> = - mapping.iter().map(|actor_id| actor_id as ActorId).collect(); - - crit_check_in_loop!( - flag, - &mapping_actors == target_fragment_actor_ids, - format!( - "ActorDispatcher {id} has downstream fragment {dispatcher_id}, but dispatcher mapping actor ids {mapping_actors:?} != target fragment actor ids: {target_fragment_actor_ids:?}", - ) - ); - - // actors only from hash distribution fragment can have hash mapping - match downstream_fragment.distribution_type { - DistributionType::Hash => { - let mut downstream_bitmaps = HashMap::new(); - - for downstream_actor in target_fragment_actor_ids { - let bitmap = Bitmap::from( - &actor_map[downstream_actor] - .vnode_bitmap - .as_ref() - .unwrap() - .to_protobuf(), - ); - - downstream_bitmaps - .insert(*downstream_actor as hash::ActorId, bitmap); - } - - crit_check_in_loop!( - flag, - mapping.to_bitmaps() == downstream_bitmaps, - format!( - "ActorDispatcher {id} has hash downstream fragment {dispatcher_id}, but dispatcher mapping {mapping:?} != discovered downstream actor bitmaps: {downstream_bitmaps:?}" - ) - ); - } - DistributionType::Single => { - tracing::warn!( - "ActorDispatcher {id} has hash downstream fragment {dispatcher_id} which has single distribution type" - ); - } - } - } - - DispatcherType::Simple => { - crit_check_in_loop!( - flag, - target_fragment_actor_ids.len() == 1, - format!( - "ActorDispatcher {id} has more than one actors in downstream fragment {dispatcher_id} for {dispatcher_type:?}", - ) - ); - - crit_check_in_loop!( - flag, - downstream_fragment.distribution_type != DistributionType::Hash, - format!( - "ActorDispatcher {id} has downstream fragment {dispatcher_id} which has hash distribution type for {dispatcher_type:?}", - ) - ); - } - - DispatcherType::NoShuffle => { - let downstream_actor_id = - dispatcher_downstream_actor_ids.iter().next().unwrap(); - let downstream_actor = &actor_map[downstream_actor_id]; - - crit_check_in_loop!( - flag, - actor.vnode_bitmap == downstream_actor.vnode_bitmap, - format!( - "ActorDispatcher {id} has different vnode_bitmap with downstream_actor_id {downstream_actor_id} for {dispatcher_type:?}", - ) - ); - } - - DispatcherType::Broadcast => { - if let DistributionType::Single = downstream_fragment.distribution_type { - tracing::warn!( - "ActorDispatcher {id} has broadcast downstream fragment {dispatcher_id} which has single distribution type" - ); - } - } - } - } - - for (fragment_id, fragment) in &fragment_map { - let discovered_upstream_fragment_ids = discovered_upstream_fragments - .get(&fragment.fragment_id) - .cloned() - .unwrap_or_default(); - - let upstream_fragment_ids: HashSet<_> = fragment - .upstream_fragment_id - .inner_ref() - .iter() - .copied() - .collect(); - - crit_check_in_loop!( - flag, - discovered_upstream_fragment_ids == upstream_fragment_ids, - format!( - "Fragment {fragment_id} has different upstream_fragment_ids from discovered: {discovered_upstream_fragment_ids:?} != fragment upstream fragment ids: {upstream_fragment_ids:?}", - ) - ); - } - for PartialActor { actor_id, status, .. } in actor_map.values() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index e8154a81ade22..6ed61063ee1e1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -24,12 +24,8 @@ use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_str use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::WithPropertiesExt; use risingwave_meta_model::actor::ActorStatus; -use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::object::ObjectType; -use risingwave_meta_model::prelude::{ - Actor, ActorDispatcher, Fragment, FragmentRelation, Index, Object, ObjectDependency, Sink, - Source, StreamingJob as StreamingJobModel, Table, -}; +use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *}; use risingwave_meta_model::table::TableType; use risingwave_meta_model::*; use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; @@ -45,9 +41,7 @@ use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; -use risingwave_pb::stream_plan::{ - PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, PbStreamNode, -}; +use risingwave_pb::stream_plan::{PbDispatcher, PbFragmentTypeFlag, PbStreamActor, PbStreamNode}; use risingwave_pb::user::PbUserInfo; use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr}; use sea_orm::ActiveValue::Set; @@ -420,16 +414,26 @@ impl CatalogController { continue; } - let target_fragment_id = dispatcher.dispatcher_id; + let target_fragment_id = dispatcher.dispatcher_id as FragmentId; fragment_relations.insert( key, fragment_relation::Model { source_fragment_id: fragment.fragment_id, target_fragment_id, - dispatcher_type: dispatcher.dispatcher_type, - dist_key_indices: dispatcher.dist_key_indices.clone(), - output_indices: dispatcher.output_indices.clone(), + dispatcher_type: dispatcher.get_type().unwrap().into(), + dist_key_indices: dispatcher + .dist_key_indices + .iter() + .map(|idx| *idx as i32) + .collect_vec() + .into(), + output_indices: dispatcher + .output_indices + .iter() + .map(|idx| *idx as i32) + .collect_vec() + .into(), }, ); } @@ -438,9 +442,9 @@ impl CatalogController { } // Add fragments. - let (fragments, actor_with_dispatchers): (Vec<_>, Vec<_>) = fragment_actors + let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors .into_iter() - .map(|(fragment, actors, actor_dispatchers)| (fragment, (actors, actor_dispatchers))) + .map(|(fragment, actors, _)| (fragment, actors)) .unzip(); for fragment in fragments { let fragment_id = fragment.fragment_id; @@ -489,18 +493,11 @@ impl CatalogController { } // Add actors and actor dispatchers. - for (actors, actor_dispatchers) in actor_with_dispatchers { + for actors in actors { for actor in actors { let actor = actor.into_active_model(); Actor::insert(actor).exec(&txn).await?; } - for (_, actor_dispatchers) in actor_dispatchers { - for actor_dispatcher in actor_dispatchers { - let mut actor_dispatcher = actor_dispatcher.into_active_model(); - actor_dispatcher.id = NotSet; - ActorDispatcher::insert(actor_dispatcher).exec(&txn).await?; - } - } } if !for_replace { @@ -737,23 +734,6 @@ impl CatalogController { .await?; } - let mut actor_dispatchers = vec![]; - for (actor_id, dispatchers) in new_actor_dispatchers.values().flatten() { - for dispatcher in dispatchers { - let mut actor_dispatcher = - actor_dispatcher::Model::from((*actor_id, dispatcher.clone())) - .into_active_model(); - actor_dispatcher.id = NotSet; - actor_dispatchers.push(actor_dispatcher); - } - } - - if !actor_dispatchers.is_empty() { - ActorDispatcher::insert_many(actor_dispatchers) - .exec(&txn) - .await?; - } - // Mark job as CREATING. streaming_job::ActiveModel { job_id: Set(job_id), @@ -1641,23 +1621,6 @@ impl CatalogController { reschedules: HashMap, post_updates: &JobReschedulePostUpdates, ) -> MetaResult<()> { - fn update_actors( - actors: &mut Vec, - to_remove: &HashSet, - to_create: &Vec, - ) { - let actor_id_set: HashSet<_> = actors.iter().copied().collect(); - for actor_id in to_create { - debug_assert!(!actor_id_set.contains(actor_id)); - } - for actor_id in to_remove { - debug_assert!(actor_id_set.contains(actor_id)); - } - - actors.retain(|actor_id| !to_remove.contains(actor_id)); - actors.extend_from_slice(to_create); - } - let new_created_actors: HashSet<_> = reschedules .values() .flat_map(|reschedule| { @@ -1675,19 +1638,13 @@ impl CatalogController { let mut fragment_mapping_to_notify = vec![]; - // for assert only - let mut assert_dispatcher_update_checker = HashSet::new(); - for ( fragment_id, Reschedule { - added_actors, removed_actors, vnode_bitmap_updates, actor_splits, newly_created_actors, - upstream_fragment_dispatcher_ids, - upstream_dispatcher_mapping, .. }, ) in reschedules @@ -1706,7 +1663,6 @@ impl CatalogController { PbStreamActor { actor_id, fragment_id, - dispatcher, vnode_bitmap, expr_context, .. @@ -1714,8 +1670,6 @@ impl CatalogController { actor_status, ) in newly_created_actors { - let mut new_actor_dispatchers = vec![]; - let splits = actor_splits .get(&actor_id) .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec()); @@ -1732,34 +1686,6 @@ impl CatalogController { }) .exec(&txn) .await?; - - for PbDispatcher { - r#type: dispatcher_type, - dist_key_indices, - output_indices, - hash_mapping, - dispatcher_id, - downstream_actor_id, - } in dispatcher - { - new_actor_dispatchers.push(actor_dispatcher::ActiveModel { - id: Default::default(), - actor_id: Set(actor_id as _), - dispatcher_type: Set(PbDispatcherType::try_from(dispatcher_type) - .unwrap() - .into()), - dist_key_indices: Set(dist_key_indices.into()), - output_indices: Set(output_indices.into()), - hash_mapping: Set(hash_mapping.as_ref().map(|mapping| mapping.into())), - dispatcher_id: Set(dispatcher_id as _), - downstream_actor_ids: Set(downstream_actor_id.into()), - }) - } - if !new_actor_dispatchers.is_empty() { - ActorDispatcher::insert_many(new_actor_dispatchers) - .exec(&txn) - .await?; - } } // actor update @@ -1815,61 +1741,6 @@ impl CatalogController { .collect_vec(); fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors)); - - // for downstream and upstream - let removed_actor_ids: HashSet<_> = removed_actors - .iter() - .map(|actor_id| *actor_id as ActorId) - .collect(); - - let added_actor_ids = added_actors - .values() - .flatten() - .map(|actor_id| *actor_id as ActorId) - .collect_vec(); - - // first step, upstream fragment - for (upstream_fragment_id, dispatcher_id) in upstream_fragment_dispatcher_ids { - let upstream_fragment = Fragment::find_by_id(upstream_fragment_id as FragmentId) - .one(&txn) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?; - - let all_dispatchers = actor_dispatcher::Entity::find() - .join(JoinType::InnerJoin, actor_dispatcher::Relation::Actor.def()) - .filter(actor::Column::FragmentId.eq(upstream_fragment.fragment_id)) - .filter(actor_dispatcher::Column::DispatcherId.eq(dispatcher_id as i32)) - .all(&txn) - .await?; - - for dispatcher in all_dispatchers { - debug_assert!(assert_dispatcher_update_checker.insert(dispatcher.id)); - if new_created_actors.contains(&dispatcher.actor_id) { - continue; - } - - let mut dispatcher = dispatcher.into_active_model(); - - // Only hash dispatcher needs mapping - if dispatcher.dispatcher_type.as_ref() == &DispatcherType::Hash { - dispatcher.hash_mapping = Set(upstream_dispatcher_mapping - .as_ref() - .map(|m| risingwave_meta_model::ActorMapping::from(&m.to_protobuf()))); - } - - let mut new_downstream_actor_ids = - dispatcher.downstream_actor_ids.as_ref().inner_ref().clone(); - - update_actors( - new_downstream_actor_ids.as_mut(), - &removed_actor_ids, - &added_actor_ids, - ); - - dispatcher.downstream_actor_ids = Set(new_downstream_actor_ids.into()); - dispatcher.update(&txn).await?; - } - } } let JobReschedulePostUpdates { diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index bac3a11b5bb74..a938ea0e238a5 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -12,21 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::{BTreeSet, HashMap, HashSet}; +use std::sync::Arc; use anyhow::{anyhow, Context}; use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{ActorMapping, WorkerSlotId, WorkerSlotMapping}; +use risingwave_common::hash::{ActorMapping, VnodeBitmapExt, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP; use risingwave_common::{bail, hash}; use risingwave_meta_model::actor::ActorStatus; +use risingwave_meta_model::actor_dispatcher::DispatcherType; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::object::ObjectType; use risingwave_meta_model::prelude::*; use risingwave_meta_model::table::TableType; use risingwave_meta_model::{ - actor, actor_dispatcher, connection, database, fragment, function, index, object, + actor, connection, database, fragment, fragment_relation, function, index, object, object_dependency, schema, secret, sink, source, streaming_job, subscription, table, user, user_privilege, view, ActorId, ConnectorSplits, DataTypeArray, DatabaseId, FragmentId, I32Array, ObjectId, PrivilegeId, SchemaId, SourceId, StreamNode, TableId, UserId, VnodeBitmap, @@ -43,7 +46,7 @@ use risingwave_pb::meta::subscribe_response::Info as NotificationInfo; use risingwave_pb::meta::{ FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup, }; -use risingwave_pb::stream_plan::PbFragmentTypeFlag; +use risingwave_pb::stream_plan::{PbDispatcher, PbDispatcherType, PbFragmentTypeFlag}; use risingwave_pb::user::grant_privilege::{ PbAction, PbActionWithGrantOption, PbObject as PbGrantObject, }; @@ -908,28 +911,277 @@ pub fn extract_grant_obj_id(object: &PbGrantObject) -> ObjectId { } } -pub async fn get_actor_dispatchers( +pub async fn get_fragment_actor_dispatchers( db: &C, - actor_ids: Vec, -) -> MetaResult>> + fragment_ids: Vec, +) -> MetaResult>>> where C: ConnectionTrait, { - let actor_dispatchers = ActorDispatcher::find() - .filter(actor_dispatcher::Column::ActorId.is_in(actor_ids)) + type FragmentActorInfo = (DistributionType, Arc>>); + let mut fragment_actor_cache: HashMap = HashMap::new(); + let get_fragment_actors = |fragment_id: FragmentId| async move { + let result: MetaResult = try { + let mut fragment_actors = Fragment::find_by_id(fragment_id) + .find_with_related(Actor) + .filter(actor::Column::Status.eq(ActorStatus::Running)) + .all(db) + .await?; + if fragment_actors.is_empty() { + return Err(anyhow!("failed to find fragment: {}", fragment_id).into()); + } + assert_eq!( + fragment_actors.len(), + 1, + "find multiple fragment {:?}", + fragment_actors + ); + let (fragment, actors) = fragment_actors.pop().unwrap(); + ( + fragment.distribution_type, + Arc::new( + actors + .into_iter() + .map(|actor| { + ( + actor.actor_id, + actor + .vnode_bitmap + .map(|bitmap| Bitmap::from(bitmap.to_protobuf())), + ) + }) + .collect(), + ), + ) + }; + result + }; + let fragment_relations = FragmentRelation::find() + .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids)) .all(db) .await?; - let mut actor_dispatchers_map = HashMap::new(); - for actor_dispatcher in actor_dispatchers { - actor_dispatchers_map - .entry(actor_dispatcher.actor_id) - .or_insert_with(Vec::new) - .push(actor_dispatcher); + let mut actor_dispatchers_map: HashMap<_, HashMap<_, Vec<_>>> = HashMap::new(); + for fragment_relation::Model { + source_fragment_id, + target_fragment_id, + dispatcher_type, + dist_key_indices, + output_indices, + } in fragment_relations + { + let (source_fragment_distribution, source_fragment_actors) = { + let (distribution, actors) = { + match fragment_actor_cache.entry(source_fragment_id) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + entry.insert(get_fragment_actors(source_fragment_id).await?) + } + } + }; + (*distribution, actors.clone()) + }; + let (target_fragment_distribution, target_fragment_actors) = { + let (distribution, actors) = { + match fragment_actor_cache.entry(target_fragment_id) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + entry.insert(get_fragment_actors(target_fragment_id).await?) + } + } + }; + (*distribution, actors.clone()) + }; + let dispatchers = compose_dispatchers( + source_fragment_distribution, + &source_fragment_actors, + target_fragment_id, + target_fragment_distribution, + &target_fragment_actors, + dispatcher_type, + dist_key_indices.into_u32_array(), + output_indices.into_u32_array(), + ); + let actor_dispatchers_map = actor_dispatchers_map.entry(source_fragment_id).or_default(); + for (actor_id, dispatchers) in dispatchers { + actor_dispatchers_map + .entry(actor_id) + .or_default() + .push(dispatchers); + } } Ok(actor_dispatchers_map) } +fn compose_dispatchers( + source_fragment_distribution: DistributionType, + source_fragment_actors: &HashMap>, + target_fragment_id: FragmentId, + target_fragment_distribution: DistributionType, + target_fragment_actors: &HashMap>, + dispatcher_type: DispatcherType, + dist_key_indices: Vec, + output_indices: Vec, +) -> HashMap { + match dispatcher_type { + DispatcherType::Hash => { + let dispatcher = PbDispatcher { + r#type: PbDispatcherType::from(dispatcher_type) as _, + dist_key_indices: dist_key_indices.clone(), + output_indices: output_indices.clone(), + hash_mapping: Some( + ActorMapping::from_bitmaps( + &target_fragment_actors + .iter() + .map(|(actor_id, bitmap)| { + ( + *actor_id as _, + bitmap + .clone() + .expect("downstream hash dispatch must have distribution"), + ) + }) + .collect(), + ) + .to_protobuf(), + ), + dispatcher_id: target_fragment_id as _, + downstream_actor_id: target_fragment_actors + .keys() + .map(|actor_id| *actor_id as _) + .collect(), + }; + source_fragment_actors + .keys() + .map(|source_actor_id| (*source_actor_id, dispatcher.clone())) + .collect() + } + DispatcherType::Broadcast | DispatcherType::Simple => { + let dispatcher = PbDispatcher { + r#type: PbDispatcherType::from(dispatcher_type) as _, + dist_key_indices: dist_key_indices.clone(), + output_indices: output_indices.clone(), + hash_mapping: None, + dispatcher_id: target_fragment_id as _, + downstream_actor_id: target_fragment_actors + .keys() + .map(|actor_id| *actor_id as _) + .collect(), + }; + source_fragment_actors + .keys() + .map(|source_actor_id| (*source_actor_id, dispatcher.clone())) + .collect() + } + DispatcherType::NoShuffle => resolve_no_shuffle_actor_dispatcher( + source_fragment_distribution, + source_fragment_actors, + target_fragment_distribution, + target_fragment_actors, + ) + .into_iter() + .map(|(upstream_actor_id, downstream_actor_id)| { + ( + upstream_actor_id, + PbDispatcher { + r#type: PbDispatcherType::NoShuffle as _, + dist_key_indices: dist_key_indices.clone(), + output_indices: output_indices.clone(), + hash_mapping: None, + dispatcher_id: target_fragment_id as _, + downstream_actor_id: vec![downstream_actor_id as _], + }, + ) + }) + .collect(), + } +} + +/// return (`upstream_actor_id` -> `downstream_actor_id`) +pub fn resolve_no_shuffle_actor_dispatcher( + source_fragment_distribution: DistributionType, + source_fragment_actors: &HashMap>, + target_fragment_distribution: DistributionType, + target_fragment_actors: &HashMap>, +) -> Vec<(ActorId, ActorId)> { + assert_eq!(source_fragment_distribution, target_fragment_distribution); + assert_eq!( + source_fragment_actors.len(), + target_fragment_actors.len(), + "no-shuffle should have equal upstream downstream actor count: {:?} {:?}", + source_fragment_actors, + target_fragment_actors + ); + match source_fragment_distribution { + DistributionType::Single => { + let assert_singleton = |bitmap: &Option| { + assert!( + bitmap.as_ref().map(|bitmap| bitmap.all()).unwrap_or(true), + "not singleton: {:?}", + bitmap + ); + }; + assert_eq!( + source_fragment_actors.len(), + 1, + "singleton distribution actor count not 1: {:?}", + source_fragment_distribution + ); + assert_eq!( + target_fragment_actors.len(), + 1, + "singleton distribution actor count not 1: {:?}", + target_fragment_distribution + ); + let (source_actor_id, bitmap) = source_fragment_actors.iter().next().unwrap(); + assert_singleton(bitmap); + let (target_actor_id, bitmap) = target_fragment_actors.iter().next().unwrap(); + assert_singleton(bitmap); + vec![(*source_actor_id, *target_actor_id)] + } + DistributionType::Hash => { + let target_fragment_actors: HashMap<_, _> = target_fragment_actors + .iter() + .map(|(actor_id, bitmap)| { + let bitmap = bitmap + .as_ref() + .expect("hash distribution should have bitmap"); + let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap"); + (first_vnode, (*actor_id, bitmap)) + }) + .collect(); + source_fragment_actors + .iter() + .map(|(source_actor_id, bitmap)| { + let bitmap = bitmap + .as_ref() + .expect("hash distribution should have bitmap"); + let first_vnode = bitmap.iter_vnodes().next().expect("non-empty bitmap"); + let (target_actor_id, target_bitmap) = + target_fragment_actors.get(&first_vnode).unwrap_or_else(|| { + panic!( + "cannot find matched target actor: {} {:?} {:?} {:?}", + source_actor_id, + first_vnode, + source_fragment_actors, + target_fragment_actors + ); + }); + assert_eq!( + bitmap, + *target_bitmap, + "cannot find matched target actor due to bitmap mismatch: {} {:?} {:?} {:?}", + source_actor_id, + first_vnode, + source_fragment_actors, + target_fragment_actors + ); + (*source_actor_id, *target_actor_id) + }).collect() + } + } +} + /// `get_fragment_mappings` returns the fragment vnode mappings of the given job. pub async fn get_fragment_mappings( db: &C, diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 4ea2f2b6b8571..8ae35f0c83c4b 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -566,6 +566,7 @@ impl MetadataManager { Ok(actor_ids.into_iter().map(|id| id as ActorId).collect()) } + // (backfill_actor_id, upstream_source_actor_id) pub async fn get_running_actors_for_source_backfill( &self, source_backfill_fragment_id: FragmentId, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 7993a73be755c..8c88fb59ee44d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -15,12 +15,12 @@ use std::cmp::{min, Ordering}; use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::fmt::Debug; use std::hash::{Hash, Hasher}; use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; -use futures::future::try_join_all; use itertools::Itertools; use num_integer::Integer; use num_traits::abs; @@ -39,8 +39,7 @@ use risingwave_pb::meta::table_fragments::fragment::{ use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State}; use risingwave_pb::meta::FragmentWorkerSlotMappings; use risingwave_pb::stream_plan::{ - Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamActor, - StreamNode, + Dispatcher, DispatcherType, FragmentTypeFlag, PbStreamActor, StreamActor, StreamNode, }; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -137,6 +136,7 @@ impl CustomFragmentInfo { } use educe::Educe; +use futures::future::try_join_all; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_pb::stream_plan::stream_node::NodeBody; @@ -535,12 +535,7 @@ impl ScaleController { }, ) in actors { - let dispatchers = actor_dispatchers - .remove(&actor_id) - .unwrap_or_default() - .into_iter() - .map(PbDispatcher::from) - .collect(); + let dispatchers = actor_dispatchers.remove(&actor_id).unwrap_or_default(); let actor_info = CustomActorInfo { actor_id: actor_id as _, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 38d54154be005..d9f407d87f503 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::DatabaseId; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_common::panic_if_debug; use risingwave_connector::error::ConnectorResult; diff --git a/src/meta/src/stream/source_manager/split_assignment.rs b/src/meta/src/stream/source_manager/split_assignment.rs index 8653a0ba12e57..c4f7e72d31697 100644 --- a/src/meta/src/stream/source_manager/split_assignment.rs +++ b/src/meta/src/stream/source_manager/split_assignment.rs @@ -92,12 +92,11 @@ impl SourceManager { } /// Allocates splits to actors for a newly created source executor. - pub async fn allocate_splits(&self, job_id: &TableId) -> MetaResult { + pub async fn allocate_splits( + &self, + table_fragments: &StreamJobFragments, + ) -> MetaResult { let core = self.core.lock().await; - let table_fragments = core - .metadata_manager - .get_job_fragments_by_id(job_id) - .await?; let source_fragments = table_fragments.stream_source_fragments(); @@ -146,20 +145,16 @@ impl SourceManager { /// Allocates splits to actors for replace source job. pub async fn allocate_splits_for_replace_source( &self, - job_id: &TableId, + table_fragments: &StreamJobFragments, merge_updates: &HashMap>, ) -> MetaResult { tracing::debug!(?merge_updates, "allocate_splits_for_replace_source"); if merge_updates.is_empty() { // no existing downstream. We can just re-allocate splits arbitrarily. - return self.allocate_splits(job_id).await; + return self.allocate_splits(table_fragments).await; } let core = self.core.lock().await; - let table_fragments = core - .metadata_manager - .get_job_fragments_by_id(job_id) - .await?; let source_fragments = table_fragments.stream_source_fragments(); assert_eq!( @@ -234,15 +229,11 @@ impl SourceManager { /// this method aligns the splits for backfill fragments with its upstream source fragment ([`align_splits`]). pub async fn allocate_splits_for_backfill( &self, - table_id: &TableId, + table_fragments: &StreamJobFragments, // dispatchers from SourceExecutor to SourceBackfillExecutor dispatchers: &HashMap>>, ) -> MetaResult { let core = self.core.lock().await; - let table_fragments = core - .metadata_manager - .get_job_fragments_by_id(table_id) - .await?; let source_backfill_fragments = table_fragments.source_backfill_fragments()?; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 966051f4e7ede..1e2f9e3996a73 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -366,7 +366,10 @@ impl GlobalStreamManager { .await?; let tmp_table_id = stream_job_fragments.stream_job_id(); - let init_split_assignment = self.source_manager.allocate_splits(&tmp_table_id).await?; + let init_split_assignment = self + .source_manager + .allocate_splits(&stream_job_fragments) + .await?; replace_table_command = Some(ReplaceStreamJobPlan { old_fragments: context.old_fragments, @@ -380,16 +383,17 @@ impl GlobalStreamManager { }); } - let table_id = stream_job_fragments.stream_job_id(); - // Here we need to consider: // - Shared source // - Table with connector // - MV on shared source - let mut init_split_assignment = self.source_manager.allocate_splits(&table_id).await?; + let mut init_split_assignment = self + .source_manager + .allocate_splits(&stream_job_fragments) + .await?; init_split_assignment.extend( self.source_manager - .allocate_splits_for_backfill(&table_id, &dispatchers) + .allocate_splits_for_backfill(&stream_job_fragments, &dispatchers) .await?, ); @@ -464,13 +468,12 @@ impl GlobalStreamManager { .. }: ReplaceStreamJobContext, ) -> MetaResult<()> { - let tmp_table_id = new_fragments.stream_job_id(); let init_split_assignment = if streaming_job.is_source() { self.source_manager - .allocate_splits_for_replace_source(&tmp_table_id, &merge_updates) + .allocate_splits_for_replace_source(&new_fragments, &merge_updates) .await? } else { - self.source_manager.allocate_splits(&tmp_table_id).await? + self.source_manager.allocate_splits(&new_fragments).await? }; tracing::info!( "replace_stream_job - allocate split: {:?}", diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index c77a20b24d5e6..3de577412e463 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -38,7 +38,7 @@ macro_rules! for_all_metadata_models_v2 { {compaction_configs, risingwave_meta_model::compaction_config}, {actors, risingwave_meta_model::actor}, {clusters, risingwave_meta_model::cluster}, - {actor_dispatchers, risingwave_meta_model::actor_dispatcher}, + {fragment_relation, risingwave_meta_model::fragment_relation}, {catalog_versions, risingwave_meta_model::catalog_version}, {connections, risingwave_meta_model::connection}, {databases, risingwave_meta_model::database},