Skip to content

Commit

Permalink
Merge branch 'main' into lwz/join-output-indices-executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Enter-tainer committed Jun 20, 2022
2 parents f76ff29 + ea386d3 commit 994309d
Show file tree
Hide file tree
Showing 157 changed files with 3,402 additions and 953 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,25 @@ set -ex
RUST_BACKTRACE=1 RW_NODE=playground cargo run --bin risingwave --profile "${RISINGWAVE_BUILD_PROFILE}"
'''

[tasks.ctl]
category = "RiseDev - Start"
description = "Start RiseCtl"
script = '''
#!@shell
RC_ENV_FILE="${PREFIX_CONFIG}/risectl-env"
if [ ! -f "${RC_ENV_FILE}" ]; then
echo "risectl-env file not found. Did you start cluster using $(tput setaf 4)\`./risedev d\`$(tput sgr0)?"
exit 1
fi
source "${RC_ENV_FILE}"
cargo run --bin risectl --profile "${RISINGWAVE_BUILD_PROFILE}" -- "$@"
test $? -eq 0 || exit 1
'''

[tasks.d]
alias = "dev"

Expand Down
1 change: 1 addition & 0 deletions ci/scripts/common.env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export CARGO_TERM_COLOR=always
export RUSTFLAGS="-D warnings"
export PROTOC_NO_VENDOR=true
export CARGO_HOME=/risingwave/.cargo
export RISINGWAVE_CI=true
28 changes: 17 additions & 11 deletions docs/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ To report bugs, create a [GitHub issue](https://github.com/singularity-data/risi
- [Dashboard v1](#dashboard-v1)
- [Dashboard v2](#dashboard-v2)
- [Observability components](#observability-components)
- [Cluster Control](#cluster-control)
- [Monitoring](#monitoring)
- [Tracing](#tracing)
- [Dashboard](#dashboard)
Expand Down Expand Up @@ -199,6 +200,22 @@ The development instructions for dashboard v2 are available [here](../dashboard/

RiseDev supports several observability components.

### Cluster Control

`risectl` is the tool for providing internal access to the RisingWave cluster. See

```
cargo run --bin risectl -- --help
```
... or
```
./risingwave risectl --help
```
for more information.
### Monitoring
Uncomment `grafana` and `prometheus` lines in `risedev.yml` to enable Grafana and Prometheus services.
Expand Down Expand Up @@ -339,17 +356,6 @@ workspace.

And use [cargo-sort](https://crates.io/crates/cargo-sort) to ensure all deps are get sorted.

## Check in PRs from forks

```shell
gh pr checkout <PR id>
git checkout -b forks/<PR id>
git push origin HEAD -u
```

After that, CI checks will begin on branches of RisingWave's main repo,
and the status will be automatically updated to PRs from forks.

## Submit PRs

Instructions about submitting PRs are included in the [contribution guidelines](../CONTRIBUTING.md).
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions e2e_test/batch/distribution_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ SET QUERY_MODE TO distributed;


include ./basic/*.slt.part
include ./aggregate/*.slt.part
include ./types/*.slt.part
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions e2e_test/batch/local_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ statement ok
SET QUERY_MODE TO local;

include ./basic/*.slt.part
include ./aggregate/*.slt.part
include ./types/*.slt.part

statement ok
SET QUERY_MODE TO distributed;
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,24 @@ SELECT interval '5 minute', interval '2 m';
query TTTTT
SELECT interval '6 second';
----
00:00:06
00:00:06

query T
SELECT interval '1' month = interval '30' day;
----
t

query T
SELECT interval '1' day = interval '24' hour;
----
t

query T
SELECT interval '1' day = interval '86400' second;
----
t

query T
SELECT interval '1' day - interval '12' hour = interval '12' hour;
----
t
File renamed without changes.
4 changes: 0 additions & 4 deletions e2e_test/batch/types/time.slt

This file was deleted.

9 changes: 9 additions & 0 deletions e2e_test/batch/types/time.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
query T
values(extract(hour from timestamp '2001-02-16 20:38:40'));
----
20

query TTTTT
select timestamp '2001-03-16 23:38:45' - timestamp '2001-02-16 20:38:40';
----
28 days 03:00:05
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package catalog;

import "common.proto";
import "plan_common.proto";

option optimize_for = SPEED;
Expand Down Expand Up @@ -58,6 +59,7 @@ message Table {
repeated int32 pk = 13;
bool appendonly = 14;
string owner = 15;
common.ParallelUnitMapping mapping = 16;
}

message Schema {
Expand Down
11 changes: 10 additions & 1 deletion proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ message Cluster {
map<string, string> config = 3;
}

message Buffer {
enum CompressionType {
INVALID = 0;
NONE = 1;
}
CompressionType compression = 1;
bytes body = 2;
}

// Vnode mapping for stream fragments / relational state tables. Stores mapping from virtual node to parallel unit id.
message ParallelUnitMapping {
uint32 table_id = 1;
Expand All @@ -71,5 +80,5 @@ message ParallelUnitMapping {
// Bitmap that records whether vnodes are present.
message VNodeBitmap {
uint32 table_id = 1;
bytes bitmap = 2;
Buffer bitmap = 2;
}
13 changes: 2 additions & 11 deletions proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,6 @@ message IntervalUnit {
int64 ms = 3;
}

message Buffer {
enum CompressionType {
INVALID = 0;
NONE = 1;
}
CompressionType compression = 1;
bytes body = 2;
}

message DataType {
enum IntervalType {
INVALID = 0;
Expand Down Expand Up @@ -102,8 +93,8 @@ enum ArrayType {

message Array {
ArrayType array_type = 1;
Buffer null_bitmap = 2;
repeated Buffer values = 3;
common.Buffer null_bitmap = 2;
repeated common.Buffer values = 3;
StructArrayData struct_array_data = 4;
ListArrayData list_array_data = 5;
}
Expand Down
9 changes: 9 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ message DropMaterializedSourceResponse {
uint64 version = 2;
}

// Used by risectl (and in the future, dashboard)
message ListMaterializedViewRequest {}

// Used by risectl (and in the future, dashboard)
message ListMaterializedViewResponse {
repeated catalog.Table tables = 1;
}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand All @@ -119,4 +127,5 @@ service DdlService {
rpc DropMaterializedView(DropMaterializedViewRequest) returns (DropMaterializedViewResponse);
rpc CreateMaterializedSource(CreateMaterializedSourceRequest) returns (CreateMaterializedSourceResponse);
rpc DropMaterializedSource(DropMaterializedSourceRequest) returns (DropMaterializedSourceResponse);
rpc ListMaterializedView(ListMaterializedViewRequest) returns (ListMaterializedViewResponse);
}
11 changes: 9 additions & 2 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ message SstableInfo {
uint64 id = 1;
KeyRange key_range = 2;
uint64 file_size = 3;
repeated common.VNodeBitmap vnode_bitmaps = 4;
repeated uint32 table_ids = 4;
uint64 unit_id = 5;
}

Expand All @@ -42,8 +42,12 @@ message UncommittedEpoch {
}

message HummockVersion {
message Levels {
repeated Level levels = 1;
}
uint64 id = 1;
repeated Level levels = 2;
// Levels of each compaction group
map<uint64, Levels> levels = 2;
uint64 max_committed_epoch = 4;
// Snapshots with epoch less than the safe epoch have been GCed.
// Reads against such an epoch will fail.
Expand Down Expand Up @@ -227,6 +231,9 @@ message GetCompactionGroupsResponse {

message TriggerManualCompactionRequest {
uint64 compaction_group_id = 1;
KeyRange key_range = 2;
uint32 table_id = 3;
uint32 level = 4;
}

message TriggerManualCompactionResponse {
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package stream_plan;

import "common.proto";
import "data.proto";
import "expr.proto";
import "plan_common.proto";
Expand Down Expand Up @@ -301,7 +302,7 @@ message StreamActor {
bool same_worker_node_as_upstream = 7;
// Vnodes that the executors in this actor own. If this actor is the only actor in its fragment, `vnode_bitmap`
// will be empty.
bytes vnode_bitmap = 8;
common.Buffer vnode_bitmap = 8;
}

enum FragmentType {
Expand Down
6 changes: 5 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ message BarrierCompleteResponse {
string request_id = 1;
common.Status status = 2;
repeated CreateMviewProgress create_mview_progress = 3;
repeated hummock.SstableInfo sycned_sstables = 4;
message GroupedSstableInfo {
uint64 compaction_group_id = 1;
hummock.SstableInfo sst = 2;
}
repeated GroupedSstableInfo sycned_sstables = 4;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand Down
9 changes: 1 addition & 8 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use risingwave_common::array::{DataChunk, Row};
use risingwave_common::catalog::{ColumnDesc, OrderedColumnDesc, Schema, TableId};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::Datum;
use risingwave_common::util::ordered::OrderedRowSerializer;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::expr::LiteralExpression;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -150,20 +149,14 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
let pk_descs_proto = &seq_scan_node.table_desc.as_ref().unwrap().order_key;
let pk_descs: Vec<OrderedColumnDesc> = pk_descs_proto.iter().map(|d| d.into()).collect();
let order_types: Vec<OrderType> = pk_descs.iter().map(|desc| desc.order).collect();
let ordered_row_serializer = OrderedRowSerializer::new(order_types);

let scan_range = seq_scan_node.scan_range.as_ref().unwrap();
let (pk_prefix_value, next_col_bounds) = get_scan_bound(scan_range.clone());

dispatch_state_store!(source.context().try_get_state_store()?, state_store, {
let keyspace = Keyspace::table_root(state_store.clone(), &table_id);
let batch_stats = source.context().stats();
let table = CellBasedTable::new(
keyspace.clone(),
column_descs,
Some(ordered_row_serializer),
None,
);
let table = CellBasedTable::new(keyspace.clone(), column_descs, order_types, None);

let scan_type = if pk_prefix_value.size() == 0 && is_full_range(&next_col_bounds) {
let iter = table.batch_dedup_pk_iter(source.epoch, &pk_descs).await?;
Expand Down
Loading

0 comments on commit 994309d

Please sign in to comment.