Skip to content

Commit

Permalink
Merge branch 'main' of github.com:singularity-data/risingwave into yu…
Browse files Browse the repository at this point in the history
…hao/current_watermark
  • Loading branch information
yuhao-su committed Jun 16, 2023
2 parents 77c8ac4 + 3dd1393 commit e48560d
Show file tree
Hide file tree
Showing 23 changed files with 466 additions and 127 deletions.
10 changes: 5 additions & 5 deletions ci/scripts/gen-flamegraph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ install_all() {
promql --version

echo ">>> Installing Kafka"
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -zxvf kafka_2.13-3.4.0.tgz
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
tar -zxvf kafka_2.13-3.4.1.tgz

echo ">>> Installing nexmark bench"
buildkite-agent artifact download nexmark-server /usr/local/bin
Expand Down Expand Up @@ -182,8 +182,8 @@ start_nperf() {
}

start_kafka() {
./kafka_2.13-3.4.0/bin/zookeeper-server-start.sh ./kafka_2.13-3.4.0/config/zookeeper.properties > zookeeper.log 2>&1 &
./kafka_2.13-3.4.0/bin/kafka-server-start.sh ./kafka_2.13-3.4.0/config/server.properties --override num.partitions=8 > kafka.log 2>&1 &
./kafka_2.13-3.4.1/bin/zookeeper-server-start.sh ./kafka_2.13-3.4.1/config/zookeeper.properties > zookeeper.log 2>&1 &
./kafka_2.13-3.4.1/bin/kafka-server-start.sh ./kafka_2.13-3.4.1/config/server.properties --override num.partitions=8 > kafka.log 2>&1 &
sleep 10
# TODO(kwannoel): `trap ERR` and upload these logs.
# buildkite-agent artifact upload ./zookeeper.log
Expand All @@ -207,7 +207,7 @@ gen_events() {
}

show_kafka_topics() {
./kafka_2.13-3.4.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic nexmark --bootstrap-server localhost:9092
./kafka_2.13-3.4.1/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic nexmark --bootstrap-server localhost:9092
}

gen_cpu_flamegraph() {
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM ubuntu:22.04 as base

ENV LANG en_US.utf8

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl bash lld maven unzip
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl bash lld maven unzip libsasl2-dev

FROM base as builder

Expand Down Expand Up @@ -43,7 +43,7 @@ RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node

FROM ubuntu:22.04 as image-base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk && rm -rf /var/lib/{apt,dpkg,cache,log}/
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM ubuntu:22.04 as base

ENV LANG en_US.utf8

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl pkg-config bash lld maven unzip wget openjdk-11-jdk
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl pkg-config bash lld maven unzip wget openjdk-11-jdk libsasl2-dev

FROM base as builder

Expand Down Expand Up @@ -48,7 +48,7 @@ RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node

FROM ubuntu:22.04 as image-base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk wget && rm -rf /var/lib/{apt,dpkg,cache,log}/
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk wget libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave
Expand Down
11 changes: 11 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,14 @@ service SystemParamsService {
rpc GetSystemParams(GetSystemParamsRequest) returns (GetSystemParamsResponse);
rpc SetSystemParam(SetSystemParamRequest) returns (SetSystemParamResponse);
}

message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
repeated FragmentParallelUnitMapping mappings = 1;
map<uint32, uint32> fragment_to_table = 2;
}

service ServingService {
rpc GetServingVnodeMappings(GetServingVnodeMappingsRequest) returns (GetServingVnodeMappingsResponse);
}
197 changes: 134 additions & 63 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
self.mem_context.add(memory_usage_diff);
}

// generate output data chunks
let mut result = groups.into_iter();
// Don't use `into_iter` here, it may cause memory leak.
let mut result = groups.iter_mut();
let cardinality = self.chunk_size;
loop {
let mut group_builders: Vec<_> = self
Expand All @@ -259,9 +259,9 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
array_len += 1;
key.deserialize_to_builders(&mut group_builders[..], &self.group_key_types)?;
states
.into_iter()
.iter_mut()
.zip_eq_fast(&mut agg_builders)
.try_for_each(|(mut aggregator, builder)| aggregator.output(builder))?;
.try_for_each(|(aggregator, builder)| aggregator.output(builder))?;
}
if !has_next {
break; // exit loop
Expand All @@ -281,6 +281,11 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {

#[cfg(test)]
mod tests {
use std::alloc::{AllocError, Allocator, Global, Layout};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use prometheus::IntGauge;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::test_prelude::DataChunkTestExt;
Expand All @@ -296,9 +301,11 @@ mod tests {

#[tokio::test]
async fn execute_int32_grouped() {
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
"i i i
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
{
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
"i i i
0 1 1
1 1 1
0 0 1
Expand All @@ -307,68 +314,75 @@ mod tests {
0 0 2
1 1 3
0 1 2",
),
Schema::new(vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int64),
]),
));

let agg_call = AggCall {
r#type: Type::Sum as i32,
args: vec![InputRef {
index: 2,
r#type: Some(PbDataType {
type_name: TypeName::Int32 as i32,
),
Schema::new(vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int64),
]),
));

let agg_call = AggCall {
r#type: Type::Sum as i32,
args: vec![InputRef {
index: 2,
r#type: Some(PbDataType {
type_name: TypeName::Int32 as i32,
..Default::default()
}),
}],
return_type: Some(PbDataType {
type_name: TypeName::Int64 as i32,
..Default::default()
}),
}],
return_type: Some(PbDataType {
type_name: TypeName::Int64 as i32,
..Default::default()
}),
distinct: false,
order_by: vec![],
filter: None,
direct_args: vec![],
};

let agg_prost = HashAggNode {
group_key: vec![0, 1],
agg_calls: vec![agg_call],
};

let mem_context = MemoryContext::root(IntGauge::new("memory_usage", " ").unwrap());
let actual_exec = HashAggExecutorBuilder::deserialize(
&agg_prost,
src_exec,
TaskId::default(),
"HashAggExecutor".to_string(),
CHUNK_SIZE,
mem_context.clone(),
)
.unwrap();

// TODO: currently the order is fixed unless the hasher is changed
let expect_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
"i i I
distinct: false,
order_by: vec![],
filter: None,
direct_args: vec![],
};

let agg_prost = HashAggNode {
group_key: vec![0, 1],
agg_calls: vec![agg_call],
};

let mem_context = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
);
let actual_exec = HashAggExecutorBuilder::deserialize(
&agg_prost,
src_exec,
TaskId::default(),
"HashAggExecutor".to_string(),
CHUNK_SIZE,
mem_context.clone(),
)
.unwrap();

// TODO: currently the order is fixed unless the hasher is changed
let expect_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
"i i I
1 0 1
0 0 3
0 1 3
1 1 6",
),
Schema::new(vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int64),
]),
));
diff_executor_output(actual_exec, expect_exec).await;

// check estimated memory usage = 4 groups x state size
assert_eq!(mem_context.get_bytes_used() as usize, 4 * 72);
),
Schema::new(vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int64),
]),
));
diff_executor_output(actual_exec, expect_exec).await;

// check estimated memory usage = 4 groups x state size
assert_eq!(mem_context.get_bytes_used() as usize, 4 * 72);
}

// Ensure that agg memory counter has been dropped.
assert_eq!(0, parent_mem.get_bytes_used());
}

#[tokio::test]
Expand Down Expand Up @@ -425,4 +439,61 @@ mod tests {
);
diff_executor_output(actual_exec, Box::new(expect_exec)).await;
}

/// A test to verify that `HashMap` may leak memory counter when using `into_iter`.
#[test]
fn test_hashmap_into_iter_bug() {
let dropped: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));

{
struct MyAllocInner {
drop_flag: Arc<AtomicBool>,
}

#[derive(Clone)]
struct MyAlloc {
inner: Arc<MyAllocInner>,
}

impl Drop for MyAllocInner {
fn drop(&mut self) {
println!("MyAlloc freed.");
self.drop_flag.store(true, Ordering::SeqCst);
}
}

unsafe impl Allocator for MyAlloc {
fn allocate(
&self,
layout: Layout,
) -> std::result::Result<NonNull<[u8]>, AllocError> {
let g = Global;
g.allocate(layout)
}

unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: Layout) {
let g = Global;
g.deallocate(ptr, layout)
}
}

let mut map = hashbrown::HashMap::with_capacity_in(
10,
MyAlloc {
inner: Arc::new(MyAllocInner {
drop_flag: dropped.clone(),
}),
},
);
for i in 0..10 {
map.entry(i).or_insert_with(|| "i".to_string());
}

for (k, v) in map {
println!("{}, {}", k, v);
}
}

assert!(!dropped.load(Ordering::SeqCst));
}
}
Loading

0 comments on commit e48560d

Please sign in to comment.