Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into perf/optimize-to-char…
Browse files Browse the repository at this point in the history
…-with-const-template
  • Loading branch information
TennyZhuang committed Sep 6, 2022
2 parents fc8b258 + d0a377d commit dc5e0c4
Show file tree
Hide file tree
Showing 24 changed files with 316 additions and 53 deletions.
25 changes: 20 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,7 @@ buildkite-agent artifact upload risingwave-"$profile"
buildkite-agent artifact upload risedev-dev-"$profile"
buildkite-agent artifact upload risingwave_regress_test-"$profile"
buildkite-agent artifact upload ./sqlsmith-"$profile"

echo "--- upload misc"
cp src/source/src/test_data/simple-schema.avsc ./avro-simple-schema.avsc
buildkite-agent artifact upload ./avro-simple-schema.avsc
8 changes: 4 additions & 4 deletions ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch"
seq 10 | parallel MADSIM_TEST_SEED={} $RUNNER -j 16 './e2e_test/batch/\*\*/\*.slt'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/streaming/\*\*/\*.slt' || true
# echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
# seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/streaming/\*\*/\*.slt' || true

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/batch/\*\*/\*.slt' || true
# echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch"
# seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-compute './e2e_test/batch/\*\*/\*.slt' || true

echo "--- deterministic simulation e2e, ci-3cn-1fe, fuzzing"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata
3 changes: 3 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ buildkite-agent artifact download risedev-dev-"$profile" target/debug/
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev

echo "--- Download mise"
buildkite-agent artifact download avro-simple-schema.avsc ./

echo "--- Adjust permission"
chmod +x ./target/debug/risingwave
chmod +x ./target/debug/risedev-dev
Expand Down
13 changes: 13 additions & 0 deletions e2e_test/source/basic_test.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ create materialized source s8 (
statement ok
select * from s8

statement ok
create materialized source s9 with (
connector = 'kafka', topic = 'avro_bin',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) row format avro message 'test_student' row schema location 'file:///risingwave/avro-simple-schema.avsc'

statement ok
select * from s9

statement ok
flush;

Expand Down Expand Up @@ -242,3 +252,6 @@ drop source s6

statement ok
drop source s8

statement ok
drop source s9
Binary file added scripts/source/test_data/avro_bin.1
Binary file not shown.
38 changes: 36 additions & 2 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::DEFAULT_SCHEMA_NAME;
Expand All @@ -22,8 +24,10 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::plan_common::{ColumnCatalog as ProstColumnCatalog, RowFormatType};
use risingwave_pb::user::grant_privilege::{Action, Object};
use risingwave_source::ProtobufParser;
use risingwave_sqlparser::ast::{CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema};
use risingwave_source::{AvroParser, ProtobufParser};
use risingwave_sqlparser::ast::{
AvroSchema, CreateSourceStatement, ObjectName, ProtobufSchema, SourceSchema,
};

use super::create_table::{
bind_sql_columns, bind_sql_table_constraints, gen_materialized_source_plan,
Expand Down Expand Up @@ -72,6 +76,22 @@ pub(crate) fn make_prost_source(
})
}

/// Map an Avro schema to a relational schema.
async fn extract_avro_table_schema(
schema: &AvroSchema,
with_properties: HashMap<String, String>,
) -> Result<Vec<ProstColumnCatalog>> {
let parser = AvroParser::new(schema.row_schema_location.0.as_str(), with_properties).await?;
let vec_column_desc = parser.map_to_columns()?;
Ok(vec_column_desc
.into_iter()
.map(|c| ProstColumnCatalog {
column_desc: Some(c),
is_hidden: false,
})
.collect_vec())
}

/// Map a protobuf schema to a relational schema.
fn extract_protobuf_table_schema(schema: &ProtobufSchema) -> Result<Vec<ProstColumnCatalog>> {
let parser = ProtobufParser::new(&schema.row_schema_location.0, &schema.message_name.0)?;
Expand Down Expand Up @@ -112,6 +132,20 @@ pub async fn handle_create_source(
pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(),
}
}
SourceSchema::Avro(avro_schema) => {
assert_eq!(columns.len(), 1);
assert_eq!(pk_column_ids, vec![0.into()]);
assert_eq!(row_id_index, Some(0));
columns.extend(extract_avro_table_schema(avro_schema, with_properties.clone()).await?);
StreamSourceInfo {
properties: with_properties.clone(),
row_format: RowFormatType::Avro as i32,
row_schema_location: avro_schema.row_schema_location.0.clone(),
row_id_index: row_id_index.map(|index| ProstColumnIndex { index: index as _ }),
columns,
pk_column_ids: pk_column_ids.into_iter().map(Into::into).collect(),
}
}
SourceSchema::Json => StreamSourceInfo {
properties: with_properties.clone(),
row_format: RowFormatType::Json as i32,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clap = { version = "3", features = ["derive", "env"] }
crc32fast = "1"
derivative = "2"
either = "1"
etcd-client = "0.10"
etcd-client = { version = "0.2", package = "madsim-etcd-client" }
fail = "0.5"
function_name = "0.3.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/hummock/compaction_group/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ impl<S: MetaStore> CompactionGroupManager<S> {
.collect_vec()
}

pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
self.inner
.read()
.await
.compaction_groups
.values()
.map(|cg| cg.group_id)
.collect_vec()
}

pub async fn compaction_group(&self, id: CompactionGroupId) -> Option<CompactionGroup> {
self.inner.read().await.compaction_groups.get(&id).cloned()
}
Expand Down Expand Up @@ -359,7 +369,6 @@ impl CompactionGroupManagerInner {

#[cfg(test)]
mod tests {

use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;

Expand Down
20 changes: 17 additions & 3 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::storage::MetaStore;
pub type CompactionSchedulerRef<S> = Arc<CompactionScheduler<S>>;

pub type CompactionRequestChannelRef = Arc<CompactionRequestChannel>;

/// [`CompactionRequestChannel`] wrappers a mpsc channel and deduplicate requests from same
/// compaction groups.
pub struct CompactionRequestChannel {
Expand Down Expand Up @@ -72,9 +73,9 @@ pub struct CompactionScheduler<S>
where
S: MetaStore,
{
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
compactor_manager: CompactorManagerRef,
compactor_selection_retry_interval_sec: u64,
}

impl<S> CompactionScheduler<S>
Expand All @@ -87,9 +88,9 @@ where
compactor_manager: CompactorManagerRef,
) -> Self {
Self {
env,
hummock_manager,
compactor_manager,
compactor_selection_retry_interval_sec: env.opts.compactor_selection_retry_interval_sec,
}
}

Expand All @@ -100,6 +101,10 @@ where
self.hummock_manager
.set_compaction_scheduler(request_channel.clone());
tracing::info!("Start compaction scheduler.");
let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
self.env.opts.periodic_compaction_interval_sec,
));
min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
let compaction_group: CompactionGroupId = tokio::select! {
compaction_group = request_rx.recv() => {
Expand All @@ -112,6 +117,15 @@ where
}
}
},
_ = min_trigger_interval.tick() => {
// Periodically trigger compaction for all compaction groups.
for cg_id in self.hummock_manager.compaction_group_manager().compaction_group_ids().await {
if let Err(e) = request_channel.try_send(cg_id) {
tracing::warn!("Failed to schedule compaction for compaction group {}. {}", cg_id, e);
}
}
continue;
},
// Shutdown compactor scheduler
_ = &mut shutdown_rx => {
break;
Expand Down Expand Up @@ -166,7 +180,7 @@ where
self.hummock_manager.list_assigned_tasks_number().await;
tracing::warn!("No idle compactor available. The assigned task number for every compactor is (context_id, count):\n {:?}", current_compactor_tasks);
tokio::time::sleep(Duration::from_secs(
self.compactor_selection_retry_interval_sec,
self.env.opts.compactor_selection_retry_interval_sec,
))
.await;
match self
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/compactor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ mod tests {
{
let original_tables = generate_test_tables(epoch, get_sst_ids(hummock_manager, 2).await);
register_sstable_infos_to_compaction_group(
hummock_manager.compaction_group_manager_ref_for_test(),
hummock_manager.compaction_group_manager(),
&original_tables,
StaticCompactionGroupId::StateDefault.into(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ where
assignment_ref.get(&task_id).cloned()
}

pub fn compaction_group_manager_ref_for_test(&self) -> CompactionGroupManagerRef<S> {
pub fn compaction_group_manager(&self) -> CompactionGroupManagerRef<S> {
self.compaction_group_manager.clone()
}

Expand Down
Loading

0 comments on commit dc5e0c4

Please sign in to comment.