From 0c9af564d43bcdf1356992acce7db6160a7643f4 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Tue, 6 Sep 2022 13:50:15 +0800 Subject: [PATCH 1/4] fix(compaction): periodically trigger compaction for all compaction groups (#5102) * fix(compaction): periodically trigger compaction for all compaction groups * add config periodic_compaction_interval_sec * fmt * fix test * fix test * minor --- .../src/hummock/compaction_group/manager.rs | 11 +++++++- src/meta/src/hummock/compaction_scheduler.rs | 20 +++++++++++--- src/meta/src/hummock/compactor_manager.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 2 +- src/meta/src/hummock/manager/tests.rs | 26 +++++++++---------- src/meta/src/hummock/test_utils.rs | 6 ++--- src/meta/src/lib.rs | 5 ++++ src/meta/src/manager/env.rs | 3 +++ .../hummock_test/src/compactor_tests.rs | 12 ++++----- src/storage/src/hummock/validator.rs | 6 ++--- 10 files changed, 62 insertions(+), 31 deletions(-) diff --git a/src/meta/src/hummock/compaction_group/manager.rs b/src/meta/src/hummock/compaction_group/manager.rs index 7376abbef71f2..4bfa1cb6bf138 100644 --- a/src/meta/src/hummock/compaction_group/manager.rs +++ b/src/meta/src/hummock/compaction_group/manager.rs @@ -72,6 +72,16 @@ impl CompactionGroupManager { .collect_vec() } + pub async fn compaction_group_ids(&self) -> Vec { + self.inner + .read() + .await + .compaction_groups + .values() + .map(|cg| cg.group_id) + .collect_vec() + } + pub async fn compaction_group(&self, id: CompactionGroupId) -> Option { self.inner.read().await.compaction_groups.get(&id).cloned() } @@ -359,7 +369,6 @@ impl CompactionGroupManagerInner { #[cfg(test)] mod tests { - use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; diff --git a/src/meta/src/hummock/compaction_scheduler.rs b/src/meta/src/hummock/compaction_scheduler.rs index b22ddc2ddc669..04075ac0aef8e 100644 --- a/src/meta/src/hummock/compaction_scheduler.rs +++ b/src/meta/src/hummock/compaction_scheduler.rs @@ -33,6 +33,7 @@ use crate::storage::MetaStore; pub type CompactionSchedulerRef = Arc>; pub type CompactionRequestChannelRef = Arc; + /// [`CompactionRequestChannel`] wrappers a mpsc channel and deduplicate requests from same /// compaction groups. pub struct CompactionRequestChannel { @@ -72,9 +73,9 @@ pub struct CompactionScheduler where S: MetaStore, { + env: MetaSrvEnv, hummock_manager: HummockManagerRef, compactor_manager: CompactorManagerRef, - compactor_selection_retry_interval_sec: u64, } impl CompactionScheduler @@ -87,9 +88,9 @@ where compactor_manager: CompactorManagerRef, ) -> Self { Self { + env, hummock_manager, compactor_manager, - compactor_selection_retry_interval_sec: env.opts.compactor_selection_retry_interval_sec, } } @@ -100,6 +101,10 @@ where self.hummock_manager .set_compaction_scheduler(request_channel.clone()); tracing::info!("Start compaction scheduler."); + let mut min_trigger_interval = tokio::time::interval(Duration::from_secs( + self.env.opts.periodic_compaction_interval_sec, + )); + min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { let compaction_group: CompactionGroupId = tokio::select! { compaction_group = request_rx.recv() => { @@ -112,6 +117,15 @@ where } } }, + _ = min_trigger_interval.tick() => { + // Periodically trigger compaction for all compaction groups. + for cg_id in self.hummock_manager.compaction_group_manager().compaction_group_ids().await { + if let Err(e) = request_channel.try_send(cg_id) { + tracing::warn!("Failed to schedule compaction for compaction group {}. {}", cg_id, e); + } + } + continue; + }, // Shutdown compactor scheduler _ = &mut shutdown_rx => { break; @@ -166,7 +180,7 @@ where self.hummock_manager.list_assigned_tasks_number().await; tracing::warn!("No idle compactor available. The assigned task number for every compactor is (context_id, count):\n {:?}", current_compactor_tasks); tokio::time::sleep(Duration::from_secs( - self.compactor_selection_retry_interval_sec, + self.env.opts.compactor_selection_retry_interval_sec, )) .await; match self diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index 51f5acf367f2d..65eb6763f46b7 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -350,7 +350,7 @@ mod tests { { let original_tables = generate_test_tables(epoch, get_sst_ids(hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 275123e12cb67..bbb5263d12aa5 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -1540,7 +1540,7 @@ where assignment_ref.get(&task_id).cloned() } - pub fn compaction_group_manager_ref_for_test(&self) -> CompactionGroupManagerRef { + pub fn compaction_group_manager(&self) -> CompactionGroupManagerRef { self.compaction_group_manager.clone() } diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index ba8fef15702d7..7b60cba174fd4 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -176,7 +176,7 @@ async fn test_hummock_compaction_task() { let epoch: u64 = 1; let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -253,7 +253,7 @@ async fn test_hummock_table() { let epoch: u64 = 1; let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -314,7 +314,7 @@ async fn test_hummock_transaction() { // Add tables in epoch1 let tables_in_epoch1 = generate_test_tables(epoch1, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &tables_in_epoch1, StaticCompactionGroupId::StateDefault.into(), ) @@ -373,7 +373,7 @@ async fn test_hummock_transaction() { // Add tables in epoch2 let tables_in_epoch2 = generate_test_tables(epoch2, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &tables_in_epoch2, StaticCompactionGroupId::StateDefault.into(), ) @@ -614,7 +614,7 @@ async fn test_hummock_manager_basic() { let commit_one = |epoch: HummockEpoch, hummock_manager: HummockManagerRef| async move { let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -758,7 +758,7 @@ async fn test_pin_snapshot_response_lost() { let mut epoch: u64 = 1; let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -780,7 +780,7 @@ async fn test_pin_snapshot_response_lost() { let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -807,7 +807,7 @@ async fn test_pin_snapshot_response_lost() { let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -829,7 +829,7 @@ async fn test_pin_snapshot_response_lost() { let test_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -857,7 +857,7 @@ async fn test_print_compact_task() { let epoch: u64 = 1; let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, 2).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -897,7 +897,7 @@ async fn test_invalid_sst_id() { let epoch = 1; let ssts = generate_test_tables(epoch, vec![1]); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &ssts, StaticCompactionGroupId::StateDefault.into(), ) @@ -1032,7 +1032,7 @@ async fn test_hummock_compaction_task_heartbeat() { let epoch: u64 = 1; let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -1151,7 +1151,7 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() { let epoch: u64 = 1; let original_tables = generate_test_tables(epoch, get_sst_ids(&hummock_manager, sst_num).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &original_tables, StaticCompactionGroupId::StateDefault.into(), ) diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 24d59d70cc794..e044eddfd6748 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -54,7 +54,7 @@ where let table_ids = get_sst_ids(hummock_manager, 3).await; let test_tables = generate_test_tables(epoch, table_ids); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables, StaticCompactionGroupId::StateDefault.into(), ) @@ -80,7 +80,7 @@ where .unwrap(); let test_tables_2 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables_2, StaticCompactionGroupId::StateDefault.into(), ) @@ -97,7 +97,7 @@ where epoch += 1; let test_tables_3 = generate_test_tables(epoch, get_sst_ids(hummock_manager, 1).await); register_sstable_infos_to_compaction_group( - hummock_manager.compaction_group_manager_ref_for_test(), + hummock_manager.compaction_group_manager(), &test_tables_3, StaticCompactionGroupId::StateDefault.into(), ) diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 7b4b691331a18..a75a5909014fa 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -147,6 +147,10 @@ pub struct MetaNodeOpts { /// Enable sanity check when SSTs are committed. By default disabled. #[clap(long)] enable_committed_sst_sanity_check: bool, + + /// Schedule compaction for all compaction groups with this interval. + #[clap(long, default_value = "60")] + pub periodic_compaction_interval_sec: u64, } use std::future::Future; @@ -208,6 +212,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { compactor_selection_retry_interval_sec: opts.compactor_selection_retry_interval_sec, collect_gc_watermark_spin_interval_sec: opts.collect_gc_watermark_spin_interval_sec, enable_committed_sst_sanity_check: opts.enable_committed_sst_sanity_check, + periodic_compaction_interval_sec: opts.periodic_compaction_interval_sec, }, ) .await diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 95071bce5453c..802b66df8ea8a 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -81,6 +81,8 @@ pub struct MetaOpts { pub collect_gc_watermark_spin_interval_sec: u64, /// Enable sanity check when SSTs are committed pub enable_committed_sst_sanity_check: bool, + /// Schedule compaction for all compaction groups with this interval. + pub periodic_compaction_interval_sec: u64, } impl Default for MetaOpts { @@ -95,6 +97,7 @@ impl Default for MetaOpts { compactor_selection_retry_interval_sec: 5, collect_gc_watermark_spin_interval_sec: 5, enable_committed_sst_sanity_check: false, + periodic_compaction_interval_sec: 60, } } } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 7b70eb369f69f..7baa22c218c57 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -429,7 +429,7 @@ mod tests { let keyspace = Keyspace::table_root(storage.clone(), &TableId::new(existing_table_id)); // Only registered table_ids are accepted in commit_epoch register_table_ids_to_compaction_group( - hummock_manager_ref.compaction_group_manager_ref_for_test(), + hummock_manager_ref.compaction_group_manager(), &[existing_table_id], StaticCompactionGroupId::StateDefault.into(), ) @@ -454,7 +454,7 @@ mod tests { // Mimic dropping table unregister_table_ids_from_compaction_group( - hummock_manager_ref.compaction_group_manager_ref_for_test(), + hummock_manager_ref.compaction_group_manager(), &[existing_table_id], ) .await; @@ -559,7 +559,7 @@ mod tests { }; let keyspace = Keyspace::table_root(storage.clone(), &TableId::new(table_id)); register_table_ids_to_compaction_group( - hummock_manager_ref.compaction_group_manager_ref_for_test(), + hummock_manager_ref.compaction_group_manager(), &[table_id], StaticCompactionGroupId::StateDefault.into(), ) @@ -580,7 +580,7 @@ mod tests { // Mimic dropping table unregister_table_ids_from_compaction_group( - hummock_manager_ref.compaction_group_manager_ref_for_test(), + hummock_manager_ref.compaction_group_manager(), &[drop_table_id], ) .await; @@ -718,7 +718,7 @@ mod tests { let millisec_interval_epoch: u64 = (1 << 16) * 100; let keyspace = Keyspace::table_root(storage.clone(), &TableId::new(existing_table_id)); register_table_ids_to_compaction_group( - hummock_manager_ref.compaction_group_manager_ref_for_test(), + hummock_manager_ref.compaction_group_manager(), &[existing_table_id], StaticCompactionGroupId::StateDefault.into(), ) @@ -883,7 +883,7 @@ mod tests { let millisec_interval_epoch: u64 = (1 << 16) * 100; let keyspace = Keyspace::table_root(storage.clone(), &TableId::new(existing_table_id)); register_table_ids_to_compaction_group( - hummock_manager_ref.compaction_group_manager_ref_for_test(), + hummock_manager_ref.compaction_group_manager(), &[keyspace.table_id().table_id], StaticCompactionGroupId::StateDefault.into(), ) diff --git a/src/storage/src/hummock/validator.rs b/src/storage/src/hummock/validator.rs index 02e9f8362a783..1604d2d752f70 100644 --- a/src/storage/src/hummock/validator.rs +++ b/src/storage/src/hummock/validator.rs @@ -48,7 +48,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) Ok(holder) => holder, Err(err) => { // One reasonable cause is the SST has been vacuumed. - tracing::warn!("Skip sanity check for SST {}. {:#?}", sst_id, err); + tracing::warn!("Skip sanity check for SST {}. {}", sst_id, err); continue; } }; @@ -62,7 +62,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) ); let mut previous_key: Option> = None; if let Err(err) = iter.rewind().await { - tracing::warn!("Skip sanity check for SST {}. {:#?}", sst_id, err); + tracing::warn!("Skip sanity check for SST {}. {}", sst_id, err); } while iter.is_valid() { key_counts += 1; @@ -91,7 +91,7 @@ pub async fn validate_ssts(task: ValidationTask, sstable_store: SstableStoreRef) } previous_key = Some(current_key); if let Err(err) = iter.next().await { - tracing::warn!("Skip remaining sanity check for SST {}. {:#?}", sst_id, err); + tracing::warn!("Skip remaining sanity check for SST {}. {}", sst_id, err); break; } } From d574cf586ab5e96696a445e82d69d2323848b79b Mon Sep 17 00:00:00 2001 From: Liang <44948473+soundOfDestiny@users.noreply.github.com> Date: Tue, 6 Sep 2022 14:05:31 +0800 Subject: [PATCH 2/4] fix(notification): fix lock order issue (#5126) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- src/meta/src/rpc/service/notification_service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/meta/src/rpc/service/notification_service.rs b/src/meta/src/rpc/service/notification_service.rs index bb03e6610bf19..259ab5ccecfc2 100644 --- a/src/meta/src/rpc/service/notification_service.rs +++ b/src/meta/src/rpc/service/notification_service.rs @@ -84,9 +84,6 @@ where let creating_tables = catalog_guard.database.list_creating_tables(); let users = catalog_guard.user.list_users(); - let cluster_guard = self.cluster_manager.get_cluster_core_guard().await; - let nodes = cluster_guard.list_worker_node(WorkerType::ComputeNode, Some(Running)); - let fragment_ids: HashSet = HashSet::from_iter(tables.iter().map(|t| t.fragment_id)); let fragment_guard = self.fragment_manager.get_fragment_read_guard().await; let parallel_unit_mappings = fragment_guard @@ -97,6 +94,9 @@ where let hummock_manager_guard = self.hummock_manager.get_read_guard().await; + let cluster_guard = self.cluster_manager.get_cluster_core_guard().await; + let nodes = cluster_guard.list_worker_node(WorkerType::ComputeNode, Some(Running)); + // Send the snapshot on subscription. After that we will send only updates. let meta_snapshot = match worker_type { WorkerType::Frontend => MetaSnapshot { From ede7432d5253e83635d92000d390b93023a0682a Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 6 Sep 2022 14:23:38 +0800 Subject: [PATCH 3/4] feat(test): add etcd simulator (#5084) * add etcd to simulation Signed-off-by: Runji Wang * update madsim and note the etcd timeout issue Signed-off-by: Runji Wang * enable etcd service timeout Signed-off-by: Runji Wang * update madsim Signed-off-by: Runji Wang * ci: disable recovery test since it's too slow Signed-off-by: Runji Wang * add --etcd-timeout-rate argument Signed-off-by: Runji Wang * meta: fix lock order Signed-off-by: Runji Wang * improve error info readability Signed-off-by: Runji Wang Signed-off-by: Runji Wang Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- Cargo.lock | 25 ++++++++++++++++---- ci/scripts/deterministic-e2e-test.sh | 8 +++---- src/meta/Cargo.toml | 2 +- src/tests/simulation/Cargo.toml | 3 ++- src/tests/simulation/src/main.rs | 34 ++++++++++++++++++++++++++-- 5 files changed, 59 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cfdb9da833b5..f3fd322571fc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2664,9 +2664,9 @@ dependencies = [ [[package]] name = "madsim" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fbcfa3e52808d948f10928c17a720efd3324942b1059cef77cc88077ea639f9" +checksum = "69239c86b096ba429e109d5e36c3e639d19d36e0d0a8c3df1e168b72a5008770" dependencies = [ "ahash", "async-channel", @@ -2689,11 +2689,25 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "madsim-etcd-client" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54cc5db0885f71b5e8eb71ec1002b94bf5b8f5b1011c4d709b23a1fd3b5c01d8" +dependencies = [ + "etcd-client", + "http", + "madsim", + "spin 0.9.4", + "tonic", + "tracing", +] + [[package]] name = "madsim-macros" -version = "0.2.0" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a95505369fabd293bcb3eaab7cefd668f174a7e533dcf247eed97a91ea5d2a" +checksum = "8abbe137adbf2023c066b8acb3fdef0a4bec6494642ef2174d060fc99a9f80ea" dependencies = [ "darling 0.14.1", "proc-macro2", @@ -4649,7 +4663,6 @@ dependencies = [ "crc32fast", "derivative", "either", - "etcd-client", "fail", "function_name", "futures", @@ -4657,6 +4670,7 @@ dependencies = [ "hyper", "itertools", "lazy_static", + "madsim-etcd-client", "madsim-tokio", "madsim-tonic", "memcomparable", @@ -4799,6 +4813,7 @@ dependencies = [ "futures", "glob", "madsim", + "madsim-etcd-client", "madsim-tokio", "rand 0.8.5", "risingwave_compactor", diff --git a/ci/scripts/deterministic-e2e-test.sh b/ci/scripts/deterministic-e2e-test.sh index 36bb1015b5a8e..ad6333c3f6cd9 100755 --- a/ci/scripts/deterministic-e2e-test.sh +++ b/ci/scripts/deterministic-e2e-test.sh @@ -30,12 +30,12 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch" seq 10 | parallel MADSIM_TEST_SEED={} $RUNNER -j 16 './e2e_test/batch/\*\*/\*.slt' # bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527 -echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming" -seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/streaming/\*\*/\*.slt' || true +# echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming" +# seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/streaming/\*\*/\*.slt' || true # bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527 -echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch" -seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/batch/\*\*/\*.slt' || true +# echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch" +# seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/batch/\*\*/\*.slt' || true echo "--- deterministic simulation e2e, ci-3cn-1fe, fuzzing" seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 63fc91ec37756..c5e7dd931b59a 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -15,7 +15,7 @@ clap = { version = "3", features = ["derive", "env"] } crc32fast = "1" derivative = "2" either = "1" -etcd-client = "0.10" +etcd-client = { version = "0.2", package = "madsim-etcd-client" } fail = "0.5" function_name = "0.3.0" futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 4c99ef0ae0f02..599a6a31b77da 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -7,9 +7,10 @@ edition = "2021" [dependencies] async-trait = "0.1" clap = "3" +etcd-client = { version = "0.2.6", package = "madsim-etcd-client" } futures = { version = "0.3", default-features = false, features = ["alloc"] } glob = "0.3" -madsim = "0.2.4" +madsim = "0.2.6" rand = "0.8" risingwave_compactor = { path = "../../storage/compactor" } risingwave_compute = { path = "../../compute" } diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 0fe9a044a03d0..a3693eb04669e 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -66,6 +66,10 @@ pub struct Args { #[clap(short, long)] jobs: Option, + /// The probability of etcd request timeout. + #[clap(long, default_value = "0.0")] + etcd_timeout_rate: f32, + /// Randomly kill the meta node after each query. /// /// Currently only available when `-j` is not set. @@ -109,6 +113,23 @@ async fn main() { println!("seed = {}", handle.seed()); println!("{:?}", args); + // etcd node + handle + .create_node() + .name("etcd") + .ip("192.168.10.1".parse().unwrap()) + .init(|| async { + let addr = "0.0.0.0:2388".parse().unwrap(); + etcd_client::SimServer::builder() + .timeout_rate(args.etcd_timeout_rate) + .serve(addr) + .await + .unwrap(); + }) + .build(); + // wait for the service to be ready + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // meta node handle .create_node() @@ -121,12 +142,16 @@ async fn main() { // "src/config/risingwave.toml", "--listen-addr", "0.0.0.0:5690", + "--backend", + "etcd", + "--etcd-endpoints", + "192.168.10.1:2388", ]); risingwave_meta::start(opts).await }) .build(); // wait for the service to be ready - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(30)).await; // frontend node let mut frontend_ip = vec![]; @@ -309,7 +334,11 @@ async fn run_slt_task(glob: &str, host: &str) { let file = file.unwrap(); let path = file.as_path(); println!("{}", path.display()); - tester.run_file_async(path).await.unwrap(); + tester + .run_file_async(path) + .await + .map_err(|e| panic!("{e}")) + .unwrap() } } @@ -324,6 +353,7 @@ async fn run_parallel_slt_task( tester .run_parallel_async(glob, hosts.to_vec(), Risingwave::connect, jobs) .await + .map_err(|e| panic!("{e}")) } struct Risingwave { From d0a377d2e4266bb1eb2239ab47f80b9fc2f757b6 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 6 Sep 2022 14:38:54 +0800 Subject: [PATCH 4/4] feat: create source with avro row format (#5059) * stash Signed-off-by: tabVersion * create with avro format Signed-off-by: tabVersion * fix Signed-off-by: tabVersion * fix Signed-off-by: tabVersion * trigger Signed-off-by: tabVersion * resolve comment Signed-off-by: tabVersion * fix Signed-off-by: tabVersion * add e2e Signed-off-by: tabVersion * another try Signed-off-by: tabVersion * another try Signed-off-by: tabVersion Signed-off-by: tabVersion Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- ci/scripts/build.sh | 4 + ci/scripts/e2e-source-test.sh | 3 + e2e_test/source/basic_test.slt | 13 ++++ scripts/source/test_data/avro_bin.1 | Bin 0 -> 614 bytes src/frontend/src/handler/create_source.rs | 38 ++++++++- src/source/src/parser/avro_parser.rs | 90 ++++++++++++++++++++++ src/source/src/parser/mod.rs | 2 +- src/sqlparser/src/ast/statement.rs | 48 +++++++++++- 8 files changed, 192 insertions(+), 6 deletions(-) create mode 100644 scripts/source/test_data/avro_bin.1 diff --git a/ci/scripts/build.sh b/ci/scripts/build.sh index 9aa4ccf166a6a..54623a5ca87d0 100755 --- a/ci/scripts/build.sh +++ b/ci/scripts/build.sh @@ -56,3 +56,7 @@ buildkite-agent artifact upload risingwave-"$profile" buildkite-agent artifact upload risedev-dev-"$profile" buildkite-agent artifact upload risingwave_regress_test-"$profile" buildkite-agent artifact upload ./sqlsmith-"$profile" + +echo "--- upload misc" +cp src/source/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc +buildkite-agent artifact upload ./avro-simple-schema.avsc diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh index 1d5a2798cbc4d..d093cb0b781d8 100755 --- a/ci/scripts/e2e-source-test.sh +++ b/ci/scripts/e2e-source-test.sh @@ -28,6 +28,9 @@ buildkite-agent artifact download risedev-dev-"$profile" target/debug/ mv target/debug/risingwave-"$profile" target/debug/risingwave mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev +echo "--- Download mise" +buildkite-agent artifact download avro-simple-schema.avsc ./ + echo "--- Adjust permission" chmod +x ./target/debug/risingwave chmod +x ./target/debug/risedev-dev diff --git a/e2e_test/source/basic_test.slt b/e2e_test/source/basic_test.slt index 81e99b5bfb122..19e90433a9f3e 100644 --- a/e2e_test/source/basic_test.slt +++ b/e2e_test/source/basic_test.slt @@ -106,6 +106,16 @@ create materialized source s8 ( statement ok select * from s8 +statement ok +create materialized source s9 with ( + connector = 'kafka', topic = 'avro_bin', + properties.bootstrap.server = '127.0.0.1:29092', + scan.startup.mode = 'earliest' +) row format avro message 'test_student' row schema location 'file:///risingwave/avro-simple-schema.avsc' + +statement ok +select * from s9 + statement ok flush; @@ -242,3 +252,6 @@ drop source s6 statement ok drop source s8 + +statement ok +drop source s9 diff --git a/scripts/source/test_data/avro_bin.1 b/scripts/source/test_data/avro_bin.1 new file mode 100644 index 0000000000000000000000000000000000000000..97b87a4884deba45200225197e85ba2e2260c01e GIT binary patch literal 614 zcmZ`%!Ait16l_KCA|6CMdCGZR#A8p3cvsMih?LjnwHtX&YLgbqQqT|ZGdz0n6ZS0r zh2kf8H0`c!DjLWkVP-NjFK5xMzqt=pp{6>XV+LPa3ufvZS-=X6Rl(VWNyyr>L2W{9 zN{-Ul+e9F7&4TO24?wiwHCc2RjtP`xSa8%#dYHz$5@n2`izg*h%l?zlz27TqQ;0E^ zF<0H1#=%EI3R<^=@YMAJRT}CsgR3&4R^cCt&>?{~=!A`wA_VDj5+IF^E!$ZEZDk&HxP<{auiY_GO1b1Y{>W(NpaewYa&p>1hH#7_FA z+3S8ntT@KoT0)-J-t)WvcJ=u2w0!w&mJb)N8>a^}yZ3kP7{UsuM9=e1?6&KC=6l|; J+L`|5d;@mH(aQh; literal 0 HcmV?d00001 diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 2b35dc2e81481..5941d8383b835 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::DEFAULT_SCHEMA_NAME; @@ -22,8 +24,10 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType}; use risingwave_pb::user::grant_privilege::{Action, Object}; -use risingwave_source::ProtobufParser; -use risingwave_sqlparser::ast::{CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema}; +use risingwave_source::{AvroParser, ProtobufParser}; +use risingwave_sqlparser::ast::{ + AvroSchema, CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema, +}; use super::create_table::{ bind_sql_columns, bind_sql_table_constraints, gen_materialized_source_plan, @@ -72,6 +76,22 @@ pub(crate) fn make_prost_source( }) } +/// Map an Avro schema to a relational schema. +async fn extract_avro_table_schema( + schema: &AvroSchema, + with_properties: HashMap, +) -> Result> { + let parser = AvroParser::new(schema.row_schema_location.0.as_str(), with_properties).await?; + let vec_column_desc = parser.map_to_columns()?; + Ok(vec_column_desc + .into_iter() + .map(|c| ProstColumnCatalog { + column_desc: Some(c), + is_hidden: false, + }) + .collect_vec()) +} + /// Map a protobuf schema to a relational schema. fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result> { let parser = ProtobufParser::new(&schema.row_schema_location.0, &schema.message_name.0)?; @@ -112,6 +132,20 @@ pub async fn handle_create_source( pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(), } } + SourceSchema::Avro(avro_schema) => { + assert_eq!(columns.len(), 1); + assert_eq!(pk_column_ids, vec![0.into()]); + assert_eq!(row_id_index, Some(0)); + columns.extend(extract_avro_table_schema(avro_schema, with_properties.clone()).await?); + StreamSourceInfo { + properties: with_properties.clone(), + row_format: RowFormatType::Avro as i32, + row_schema_location: avro_schema.row_schema_location.0.clone(), + row_id_index: row_id_index.map(|index| ProstColumnIndex { index: index as _ }), + columns, + pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(), + } + } SourceSchema::Json => StreamSourceInfo { properties: with_properties.clone(), row_format: RowFormatType::Json as i32, diff --git a/src/source/src/parser/avro_parser.rs b/src/source/src/parser/avro_parser.rs index 82eee08e09a2b..2c0501a88d62c 100644 --- a/src/source/src/parser/avro_parser.rs +++ b/src/source/src/parser/avro_parser.rs @@ -20,6 +20,7 @@ use std::path::Path; use apache_avro::types::Value; use apache_avro::{Reader, Schema}; use chrono::{Datelike, NaiveDate}; +use itertools::Itertools; use num_traits::FromPrimitive; use risingwave_common::array::Op; use risingwave_common::error::ErrorCode::{InternalError, InvalidConfigValue, ProtocolError}; @@ -28,6 +29,7 @@ use risingwave_common::types::{ DataType, Datum, Decimal, NaiveDateTimeWrapper, NaiveDateWrapper, ScalarImpl, }; use risingwave_connector::aws_utils::{default_conn_config, s3_client, AwsConfigV2}; +use risingwave_pb::plan_common::ColumnDesc; use url::Url; use crate::{Event, SourceColumnDesc, SourceParser}; @@ -77,6 +79,86 @@ impl AvroParser { Err(arvo_schema.err().unwrap()) } } + + pub fn map_to_columns(&self) -> Result> { + if let Schema::Record { fields, .. } = &self.schema { + let mut index = 0; + Ok(fields + .iter() + .map(|field| { + Self::avro_field_to_column_desc(&field.name, &field.schema, &mut index) + }) + .collect::>>()?) + } else { + Err(RwError::from(InternalError( + "schema invalid, record required".into(), + ))) + } + } + + fn avro_field_to_column_desc( + name: &str, + schema: &Schema, + index: &mut i32, + ) -> Result { + let data_type = Self::avro_type_mapping(schema)?; + if let Schema::Record { + name: schema_name, + fields, + .. + } = schema + { + let vec_column = fields + .iter() + .map(|f| Self::avro_field_to_column_desc(&f.name, &f.schema, index)) + .collect::>>()?; + *index += 1; + Ok(ColumnDesc { + column_type: Some(data_type.to_protobuf()), + column_id: *index, + name: name.to_owned(), + field_descs: vec_column, + type_name: schema_name.to_string(), + }) + } else { + *index += 1; + Ok(ColumnDesc { + column_type: Some(data_type.to_protobuf()), + column_id: *index, + name: name.to_owned(), + ..Default::default() + }) + } + } + + fn avro_type_mapping(schema: &Schema) -> Result { + let data_type = match schema { + Schema::String => DataType::Varchar, + Schema::Int => DataType::Int32, + Schema::Long => DataType::Int64, + Schema::Boolean => DataType::Boolean, + Schema::Float => DataType::Float32, + Schema::Double => DataType::Float64, + Schema::Date => DataType::Date, + Schema::TimestampMillis => DataType::Timestamp, + Schema::Record { fields, .. } => { + let struct_fields = fields + .iter() + .map(|f| Self::avro_type_mapping(&f.schema)) + .collect::>>()?; + let struct_names = fields.iter().map(|f| f.name.clone()).collect_vec(); + DataType::new_struct(struct_fields, struct_names) + } + _ => { + return Err(RwError::from(InternalError(format!( + "unsupported type in Avro: {:?}", + schema + )))); + } + }; + + Ok(data_type) + } } macro_rules! from_avro_datetime { @@ -579,6 +661,14 @@ mod test { record } + #[tokio::test] + async fn test_map_to_columns() { + let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc") + .await + .unwrap(); + println!("{:?}", avro_parser_rs.map_to_columns().unwrap()); + } + #[tokio::test] async fn test_new_avro_parser() { let avro_parser_rs = new_avro_parser_from_local("simple-schema.avsc").await; diff --git a/src/source/src/parser/mod.rs b/src/source/src/parser/mod.rs index 15ebd198affb0..9c4cdfc8c76df 100644 --- a/src/source/src/parser/mod.rs +++ b/src/source/src/parser/mod.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; +pub use avro_parser::*; pub use debezium::*; pub use json_parser::*; pub use protobuf_parser::*; @@ -24,7 +25,6 @@ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::Datum; -use crate::parser::avro_parser::AvroParser; use crate::{SourceColumnDesc, SourceFormat}; mod avro_parser; diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 1b8d7eb02ab70..8bae2892f7625 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -85,8 +85,9 @@ pub struct CreateSourceStatement { pub enum SourceSchema { Protobuf(ProtobufSchema), // Keyword::PROTOBUF ProtobufSchema - Json, // Keyword::JSON - DebeziumJson, // Keyword::DEBEZIUM_JSON + Json, // Keyword::JSON + DebeziumJson, // Keyword::DEBEZIUM_JSON + Avro(AvroSchema), // Keyword::AVRO } impl ParseTo for SourceSchema { @@ -98,9 +99,12 @@ impl ParseTo for SourceSchema { SourceSchema::Protobuf(protobuf_schema) } else if p.parse_keywords(&[Keyword::DEBEZIUM_JSON]) { SourceSchema::DebeziumJson + } else if p.parse_keywords(&[Keyword::AVRO]) { + impl_parse_to!(avro_schema: AvroSchema, p); + SourceSchema::Avro(avro_schema) } else { return Err(ParserError::ParserError( - "expected JSON | PROTOBUF after ROW FORMAT".to_string(), + "expected JSON | PROTOBUF | DEBEZIUMJSON | AVRO after ROW FORMAT".to_string(), )); }; Ok(schema) @@ -113,6 +117,7 @@ impl fmt::Display for SourceSchema { SourceSchema::Protobuf(protobuf_schema) => write!(f, "PROTOBUF {}", protobuf_schema), SourceSchema::Json => write!(f, "JSON"), SourceSchema::DebeziumJson => write!(f, "DEBEZIUM JSON"), + SourceSchema::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema), } } } @@ -154,6 +159,43 @@ impl fmt::Display for ProtobufSchema { } } +// sql_grammar!(AvroSchema { +// [Keyword::MESSAGE], +// message_name: AstString, +// [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], +// row_schema_location: AstString, +// }); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct AvroSchema { + pub message_name: AstString, + pub row_schema_location: AstString, +} + +impl ParseTo for AvroSchema { + fn parse_to(p: &mut Parser) -> Result { + impl_parse_to!([Keyword::MESSAGE], p); + impl_parse_to!(message_name: AstString, p); + impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p); + impl_parse_to!(row_schema_location: AstString, p); + Ok(Self { + message_name, + row_schema_location, + }) + } +} + +impl fmt::Display for AvroSchema { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut v: Vec = vec![]; + impl_fmt_display!([Keyword::MESSAGE], v); + impl_fmt_display!(message_name, v, self); + impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v); + impl_fmt_display!(row_schema_location, v, self); + v.iter().join(" ").fmt(f) + } +} + impl ParseTo for CreateSourceStatement { fn parse_to(p: &mut Parser) -> Result { impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);