Skip to content

Commit

Permalink
Merge branch 'main' into new-version
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Jan 16, 2023
2 parents aef556f + b5c154d commit 510776c
Show file tree
Hide file tree
Showing 71 changed files with 1,013 additions and 253 deletions.
25 changes: 22 additions & 3 deletions dashboard/proto/gen/meta.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 35 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions e2e_test/batch/functions/session_timezone.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ set timezone = "us/pacific";
query T
select '2022-01-01'::date::timestamp with time zone;
----
2022-01-01 08:00:00+00:00
2022-01-01 00:00:00-08:00

# Cast timestamp to timestamptz
query T
select '2022-01-01 00:00:00'::timestamp::timestamp with time zone;
----
2022-01-01 08:00:00+00:00
2022-01-01 00:00:00-08:00

# Cast timestamptz to timestamp
query T
Expand Down Expand Up @@ -47,7 +47,7 @@ t
query T
select '2022-01-01 00:00:00'::timestamp with time zone;
----
2022-01-01 08:00:00+00:00
2022-01-01 00:00:00-08:00

# Cast timestamptz to varchar, should display with timezone information
query T
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ matviewname Varchar
matviewowner Int32
definition Varchar
matviewid Int32
matviewtimezone Varchar
matviewgraph Varchar

query TT
Expand All @@ -137,4 +138,5 @@ matviewname Varchar
matviewowner Int32
definition Varchar
matviewid Int32
matviewtimezone Varchar
matviewgraph Varchar
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ message TableFragments {
map<uint32, Fragment> fragments = 3;
map<uint32, ActorStatus> actor_status = 4;
map<uint32, source.ConnectorSplits> actor_splits = 5;

stream_plan.StreamEnvironment env = 6;
}

// TODO: remove this when dashboard refactored.
Expand Down Expand Up @@ -117,6 +119,7 @@ message ListTableFragmentsResponse {
}
message TableFragmentInfo {
repeated FragmentInfo fragments = 1;
stream_plan.StreamEnvironment env = 2;
}
map<uint32, TableFragmentInfo> table_fragments = 1;
}
Expand Down
7 changes: 7 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ enum FragmentTypeFlag {
CHAIN_NODE = 16;
}

// The environment associated with a stream plan
message StreamEnvironment {
// The timezone associated with the streaming plan. Only applies to MV for now.
string timezone = 1;
}

message StreamFragmentGraph {
message StreamFragment {
// 0-based on frontend, and will be rewritten to global id on meta.
Expand Down Expand Up @@ -610,4 +616,5 @@ message StreamFragmentGraph {

repeated uint32 dependent_table_ids = 3;
uint32 table_ids_cnt = 4;
StreamEnvironment env = 5;
}
1 change: 0 additions & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ mod tests {
None,
ReadOptions {
prefix_hint: None,
check_bloom_filter: false,
ignore_range_tombstone: false,
table_id: Default::default(),
retention_seconds: None,
Expand Down
19 changes: 17 additions & 2 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::array::DataChunk;
use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::error::ErrorCode::{ConnectorError, ProtocolError};
use risingwave_common::error::{Result, RwError};
Expand All @@ -30,6 +31,7 @@ use risingwave_source::monitor::SourceMetrics;
use risingwave_source::{SourceColumnDesc, SourceFormat, SourceParserImpl};

use super::Executor;
use crate::error::BatchError;
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::task::BatchTaskContext;

Expand Down Expand Up @@ -175,7 +177,7 @@ impl SourceExecutor {
for chunk in stream {
match chunk {
Ok(chunk) => {
yield chunk.chunk.data_chunk().to_owned();
yield covert_stream_chunk_to_batch_chunk(chunk.chunk)?;
}
Err(e) => {
return Err(e);
Expand All @@ -184,3 +186,16 @@ impl SourceExecutor {
}
}
}

fn covert_stream_chunk_to_batch_chunk(chunk: StreamChunk) -> Result<DataChunk> {
// chunk read from source must be compact
assert!(chunk.data_chunk().visibility().is_none());

if chunk.ops().iter().any(|op| *op != Op::Insert) {
return Err(RwError::from(BatchError::Internal(anyhow!(
"Only support insert op in batch source executor"
))));
}

Ok(chunk.data_chunk().clone())
}
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ mod default {
}

pub fn bloom_false_positive() -> f64 {
0.01
0.001
}

pub fn share_buffers_sync_parallelism() -> u32 {
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ impl GlobalMemoryManager {
watermark_epoch.store(epoch, Ordering::Relaxed);
}

/// Jemalloc is not supported on non-Linux OSs, because tikv-jemalloc is not available.
/// See the comments for the macro enable_jemalloc_on_linux!();
// FIXME: remove such limitation after #7180
/// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons.
/// See the comments for the macro `enable_jemalloc_on_linux!()`
#[cfg(not(target_os = "linux"))]
pub async fn run(self: Arc<Self>, _: Arc<BatchManager>, _: Arc<LocalStreamManager>) {}

Expand Down
2 changes: 1 addition & 1 deletion src/config/ci-compaction-test-meta.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ checkpoint_frequency = 99999999
shared_buffer_capacity_mb = 4096
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.01
bloom_false_positive = 0.001
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
Expand Down
2 changes: 1 addition & 1 deletion src/config/ci-compaction-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ checkpoint_frequency = 10
shared_buffer_capacity_mb = 4096
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.01
bloom_false_positive = 0.001
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ checkpoint_frequency = 10
shared_buffer_capacity_mb = 4096
sstable_size_mb = 256
block_size_kb = 1024
bloom_false_positive = 0.01
bloom_false_positive = 0.001
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
Expand Down
Loading

0 comments on commit 510776c

Please sign in to comment.