From 895eae439def97f0d466af3b190e2adfe491d5fe Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 28 Mar 2024 11:45:26 +0800 Subject: [PATCH 01/10] fix(jemalloc): support large page size for aarch64 (#15956) Signed-off-by: Richard Chien --- ci/scripts/release.sh | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ci/scripts/release.sh b/ci/scripts/release.sh index d310daf389774..7fde7eb9b00e9 100755 --- a/ci/scripts/release.sh +++ b/ci/scripts/release.sh @@ -66,6 +66,11 @@ fi echo "--- Build risingwave release binary" export ENABLE_BUILD_DASHBOARD=1 +if [ ${ARCH} == "aarch64" ]; then + # enable large page size support for jemalloc + # see https://github.com/tikv/jemallocator/blob/802969384ae0c581255f3375ee2ba774c8d2a754/jemalloc-sys/build.rs#L218 + export JEMALLOC_SYS_WITH_LG_PAGE=16 +fi cargo build -p risingwave_cmd_all --features "rw-static-link" --profile release cargo build -p risingwave_cmd --bin risectl --features "rw-static-link" --profile release cd target/release && chmod +x risingwave risectl @@ -75,8 +80,8 @@ if [ "${BUILDKITE_SOURCE}" == "schedule" ]; then tar -czvf risingwave-"$(date '+%Y%m%d')"-${ARCH}-unknown-linux.tar.gz risingwave aws s3 cp risingwave-"$(date '+%Y%m%d')"-${ARCH}-unknown-linux.tar.gz s3://rw-nightly-pre-built-binary elif [[ -n "${BINARY_NAME+x}" ]]; then - tar -czvf risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz risingwave - aws s3 cp risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz s3://rw-nightly-pre-built-binary + tar -czvf risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz risingwave + aws s3 cp risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz s3://rw-nightly-pre-built-binary fi echo "--- Build connector node" From 192de5254f071589fd2266279b808aeb4c2fa72f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 28 Mar 2024 11:45:26 +0800 Subject: [PATCH 02/10] chore(deps): Bump bytes from 1.5.0 to 1.6.0 (#15961) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb55833d50181..c2a51ba341754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1793,9 +1793,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" dependencies = [ "serde", ] From 2f565a80def7165924a2d68cce986b8540af6802 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 28 Mar 2024 11:45:26 +0800 Subject: [PATCH 03/10] chore(deps): Bump org.elasticsearch:elasticsearch from 7.17.14 to 7.17.19 in /java (#15964) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 6bd4e91f43125..89df5b870077b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -80,7 +80,7 @@ 2.13.5 3.3.1 3.3.3 - 7.17.14 + 7.17.19 4.15.0 1.18.0 1.17.6 From cc556326131bbdcf718c06bd18bf977a93a36ea7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 28 Mar 2024 11:45:26 +0800 Subject: [PATCH 04/10] chore(deps): Bump jsonwebtoken from 9.2.0 to 9.3.0 (#15962) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2a51ba341754..141e9ab0edadf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5538,9 +5538,9 @@ dependencies = [ [[package]] name = "jsonwebtoken" -version = "9.2.0" +version = "9.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4" +checksum = "b9ae10193d25051e74945f1ea2d0b42e03cc3b890f7e4cc5faa44997d808193f" dependencies = [ "base64 0.21.7", "js-sys", @@ -7424,7 +7424,7 @@ dependencies = [ "bytes", "futures", "itertools 0.12.0", - "jsonwebtoken 9.2.0", + "jsonwebtoken 9.3.0", "madsim-tokio", "openssl", "panic-message", @@ -8597,7 +8597,7 @@ dependencies = [ "hmac", "home", "http 0.2.9", - "jsonwebtoken 9.2.0", + "jsonwebtoken 9.3.0", "log", "once_cell", "percent-encoding", From 542a8b657e3b9cfb8b10a0c6cc3ce90170d037ed Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 28 Mar 2024 12:36:49 +0800 Subject: [PATCH 05/10] feat(Sink): support ck Replicated*tree (#15904) --- src/connector/src/sink/clickhouse.rs | 89 +++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index c0d4caaf7257a..16407a8f68c6c 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::time::Duration; use anyhow::anyhow; +use clickhouse::insert::Insert; use clickhouse::{Client as ClickHouseClient, Row as ClickHouseRow}; use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; @@ -42,7 +43,7 @@ use crate::sink::{ }; const QUERY_ENGINE: &str = - "select distinct ?fields from system.tables where database = ? and table = ?"; + "select distinct ?fields from system.tables where database = ? and name = ?"; const QUERY_COLUMN: &str = "select distinct ?fields from system.columns where database = ? and table = ? order by ?"; pub const CLICKHOUSE_SINK: &str = "clickhouse"; @@ -72,6 +73,13 @@ enum ClickHouseEngine { CollapsingMergeTree(String), VersionedCollapsingMergeTree(String), GraphiteMergeTree, + ReplicatedMergeTree, + ReplicatedReplacingMergeTree, + ReplicatedSummingMergeTree, + ReplicatedAggregatingMergeTree, + ReplicatedCollapsingMergeTree(String), + ReplicatedVersionedCollapsingMergeTree(String), + ReplicatedGraphiteMergeTree, } impl ClickHouseEngine { pub fn is_collapsing_engine(&self) -> bool { @@ -79,6 +87,8 @@ impl ClickHouseEngine { self, ClickHouseEngine::CollapsingMergeTree(_) | ClickHouseEngine::VersionedCollapsingMergeTree(_) + | ClickHouseEngine::ReplicatedCollapsingMergeTree(_) + | ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(_) ) } @@ -88,6 +98,12 @@ impl ClickHouseEngine { ClickHouseEngine::VersionedCollapsingMergeTree(sign_name) => { Some(sign_name.to_string()) } + ClickHouseEngine::ReplicatedCollapsingMergeTree(sign_name) => { + Some(sign_name.to_string()) + } + ClickHouseEngine::ReplicatedVersionedCollapsingMergeTree(sign_name) => { + Some(sign_name.to_string()) + } _ => None, } } @@ -98,6 +114,7 @@ impl ClickHouseEngine { "ReplacingMergeTree" => Ok(ClickHouseEngine::ReplacingMergeTree), "SummingMergeTree" => Ok(ClickHouseEngine::SummingMergeTree), "AggregatingMergeTree" => Ok(ClickHouseEngine::AggregatingMergeTree), + // VersionedCollapsingMergeTree(sign_name,"a") "VersionedCollapsingMergeTree" => { let sign_name = engine_name .create_table_query @@ -107,9 +124,11 @@ impl ClickHouseEngine { .split(',') .next() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .trim() .to_string(); Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name)) } + // CollapsingMergeTree(sign_name) "CollapsingMergeTree" => { let sign_name = engine_name .create_table_query @@ -119,10 +138,50 @@ impl ClickHouseEngine { .split(')') .next() .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .trim() .to_string(); Ok(ClickHouseEngine::CollapsingMergeTree(sign_name)) } "GraphiteMergeTree" => Ok(ClickHouseEngine::GraphiteMergeTree), + "ReplicatedMergeTree" => Ok(ClickHouseEngine::ReplicatedMergeTree), + "ReplicatedReplacingMergeTree" => Ok(ClickHouseEngine::ReplicatedReplacingMergeTree), + "ReplicatedSummingMergeTree" => Ok(ClickHouseEngine::ReplicatedSummingMergeTree), + "ReplicatedAggregatingMergeTree" => { + Ok(ClickHouseEngine::ReplicatedAggregatingMergeTree) + } + // ReplicatedVersionedCollapsingMergeTree("a","b",sign_name,"c") + "ReplicatedVersionedCollapsingMergeTree" => { + let sign_name = engine_name + .create_table_query + .split("ReplicatedVersionedCollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(',') + .rev() + .nth(1) + .ok_or_else(|| SinkError::ClickHouse("must have index 1".to_string()))? + .trim() + .to_string(); + Ok(ClickHouseEngine::VersionedCollapsingMergeTree(sign_name)) + } + // ReplicatedCollapsingMergeTree("a","b",sign_name) + "ReplicatedCollapsingMergeTree" => { + let sign_name = engine_name + .create_table_query + .split("ReplicatedCollapsingMergeTree(") + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .split(')') + .next() + .ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))? + .split(',') + .last() + .ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))? + .trim() + .to_string(); + Ok(ClickHouseEngine::CollapsingMergeTree(sign_name)) + } + "ReplicatedGraphiteMergeTree" => Ok(ClickHouseEngine::ReplicatedGraphiteMergeTree), _ => Err(SinkError::ClickHouse(format!( "Cannot find clickhouse engine {:?}", engine_name.engine @@ -381,6 +440,7 @@ pub struct ClickHouseSinkWriter { column_correct_vec: Vec, rw_fields_name_after_calibration: Vec, clickhouse_engine: ClickHouseEngine, + inserter: Option>, } #[derive(Debug)] struct ClickHouseSchemaFeature { @@ -424,6 +484,7 @@ impl ClickHouseSinkWriter { column_correct_vec: column_correct_vec?, rw_fields_name_after_calibration, clickhouse_engine, + inserter: None, }) } @@ -489,10 +550,12 @@ impl ClickHouseSinkWriter { } async fn write(&mut self, chunk: StreamChunk) -> Result<()> { - let mut insert = self.client.insert_with_fields_name( - &self.config.common.table, - self.rw_fields_name_after_calibration.clone(), - )?; + if self.inserter.is_none() { + self.inserter = Some(self.client.insert_with_fields_name( + &self.config.common.table, + self.rw_fields_name_after_calibration.clone(), + )?); + } for (op, row) in chunk.rows() { let mut clickhouse_filed_vec = vec![]; for (index, data) in row.iter().enumerate() { @@ -524,9 +587,12 @@ impl ClickHouseSinkWriter { let clickhouse_column = ClickHouseColumn { row: clickhouse_filed_vec, }; - insert.write(&clickhouse_column).await?; + self.inserter + .as_mut() + .unwrap() + .write(&clickhouse_column) + .await?; } - insert.end().await?; Ok(()) } } @@ -539,6 +605,15 @@ impl AsyncTruncateSinkWriter for ClickHouseSinkWriter { ) -> Result<()> { self.write(chunk).await } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if let Some(inserter) = self.inserter.take() + && is_checkpoint + { + inserter.end().await?; + } + Ok(()) + } } #[derive(ClickHouseRow, Deserialize, Clone)] From db9d3035d2d17f3eebb354eca4a5e1236450fd38 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Thu, 28 Mar 2024 12:51:14 +0800 Subject: [PATCH 06/10] feat(Sink): support es dynamic index (#15835) --- .../sink/elasticsearch/EsSinkTest.java | 8 ++- .../java/com/risingwave/connector/EsSink.java | 28 +++++++--- .../risingwave/connector/EsSinkConfig.java | 24 ++++++-- .../risingwave/connector/EsSinkFactory.java | 27 +++++++++ src/connector/src/sink/elasticsearch.rs | 56 +++++++++++++++---- src/connector/src/sink/remote.rs | 3 +- 6 files changed, 119 insertions(+), 27 deletions(-) diff --git a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java index e7e74de2af6f0..509f71ec1e569 100644 --- a/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java +++ b/java/connector-node/risingwave-connector-test/src/test/java/com/risingwave/connector/sink/elasticsearch/EsSinkTest.java @@ -54,15 +54,17 @@ public void testEsSink(ElasticsearchContainer container, String username, String throws IOException { EsSink sink = new EsSink( - new EsSinkConfig(container.getHttpHostAddress(), "test") + new EsSinkConfig(container.getHttpHostAddress()) + .withIndex("test") .withDelimiter("$") .withUsername(username) .withPassword(password), getTestTableSchema()); sink.write( Iterators.forArray( - new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"), - new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}"))); + new ArraySinkRow( + Op.INSERT, null, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"), + new ArraySinkRow(Op.INSERT, null, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}"))); sink.sync(); // container is slow here, but our default flush time is 5s, // so 3s is enough for sync test diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index e40332a327112..cc5977a9c208c 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -278,20 +278,32 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException { - final String key = (String) row.get(0); - String doc = (String) row.get(1); - - UpdateRequest updateRequest = - new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON); + final String index = (String) row.get(0); + final String key = (String) row.get(1); + String doc = (String) row.get(2); + + UpdateRequest updateRequest; + if (config.getIndex() != null) { + updateRequest = + new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON); + } else { + updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON); + } updateRequest.docAsUpsert(true); this.requestTracker.addWriteTask(); bulkProcessor.add(updateRequest); } private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException { - final String key = (String) row.get(0); - - DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key); + final String index = (String) row.get(0); + final String key = (String) row.get(1); + + DeleteRequest deleteRequest; + if (config.getIndex() != null) { + deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key); + } else { + deleteRequest = new DeleteRequest(index, "_doc", key); + } this.requestTracker.addWriteTask(); bulkProcessor.add(deleteRequest); } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java index e02c205304a3b..4ee49efca10ad 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkConfig.java @@ -24,7 +24,8 @@ public class EsSinkConfig extends CommonSinkConfig { /** Required */ private String url; - /** Required */ + /** Optional */ + @JsonProperty(value = "index") private String index; /** Optional, delimiter for generating id */ @@ -37,11 +38,12 @@ public class EsSinkConfig extends CommonSinkConfig { @JsonProperty(value = "password") private String password; + @JsonProperty(value = "index_column") + private String indexColumn; + @JsonCreator - public EsSinkConfig( - @JsonProperty(value = "url") String url, @JsonProperty(value = "index") String index) { + public EsSinkConfig(@JsonProperty(value = "url") String url) { this.url = url; - this.index = index; } public String getUrl() { @@ -52,6 +54,11 @@ public String getIndex() { return index; } + public EsSinkConfig withIndex(String index) { + this.index = index; + return this; + } + public String getDelimiter() { return delimiter; } @@ -78,4 +85,13 @@ public EsSinkConfig withPassword(String password) { this.password = password; return this; } + + public String getIndexColumn() { + return indexColumn; + } + + public EsSinkConfig withIndexColumn(String indexColumn) { + this.indexColumn = indexColumn; + return this; + } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java index 7495b0a1d5ad0..f3fa3bfa16c3b 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSinkFactory.java @@ -21,6 +21,7 @@ import com.risingwave.connector.api.sink.SinkWriter; import com.risingwave.connector.api.sink.SinkWriterV1; import com.risingwave.proto.Catalog; +import com.risingwave.proto.Data; import io.grpc.Status; import java.io.IOException; import java.util.Map; @@ -61,6 +62,32 @@ public void validate( } catch (IllegalArgumentException e) { throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException(); } + if (config.getIndexColumn() != null) { + Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn()); + if (typeName == null) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Index column " + config.getIndexColumn() + " not found in schema") + .asRuntimeException(); + } + if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Index column must be of type String, but found " + typeName) + .asRuntimeException(); + } + if (config.getIndex() != null) { + throw Status.INVALID_ARGUMENT + .withDescription("index and index_column cannot be set at the same time") + .asRuntimeException(); + } + } else { + if (config.getIndex() == null) { + throw Status.INVALID_ARGUMENT + .withDescription("index or index_column must be set") + .asRuntimeException(); + } + } // 2. check connection RestClientBuilder builder = RestClient.builder(host); diff --git a/src/connector/src/sink/elasticsearch.rs b/src/connector/src/sink/elasticsearch.rs index f6ddbeb7e35d2..578c768f1ce30 100644 --- a/src/connector/src/sink/elasticsearch.rs +++ b/src/connector/src/sink/elasticsearch.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use anyhow::anyhow; use risingwave_common::array::{ - ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, + ArrayBuilder, ArrayImpl, JsonbArrayBuilder, RowRef, StreamChunk, Utf8ArrayBuilder, }; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; @@ -27,6 +27,7 @@ use super::encoder::{JsonEncoder, RowEncoder}; use super::remote::ElasticSearchSink; use crate::sink::{Result, Sink}; pub const ES_OPTION_DELIMITER: &str = "delimiter"; +pub const ES_OPTION_INDEX_COLUMN: &str = "index_column"; pub enum StreamChunkConverter { Es(EsStreamChunkConverter), @@ -40,10 +41,22 @@ impl StreamChunkConverter { properties: &HashMap, ) -> Result { if sink_name == ElasticSearchSink::SINK_NAME { + let index_column = properties + .get(ES_OPTION_INDEX_COLUMN) + .cloned() + .map(|n| { + schema + .fields() + .iter() + .position(|s| s.name == n) + .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN)) + }) + .transpose()?; Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new( schema, pk_indices.clone(), properties.get(ES_OPTION_DELIMITER).cloned(), + index_column, )?)) } else { Ok(StreamChunkConverter::Other) @@ -60,9 +73,15 @@ impl StreamChunkConverter { pub struct EsStreamChunkConverter { json_encoder: JsonEncoder, fn_build_id: Box) -> Result + Send>, + index_column: Option, } impl EsStreamChunkConverter { - fn new(schema: Schema, pk_indices: Vec, delimiter: Option) -> Result { + fn new( + schema: Schema, + pk_indices: Vec, + delimiter: Option, + index_column: Option, + ) -> Result { let fn_build_id: Box) -> Result + Send> = if pk_indices.is_empty() { Box::new(|row: RowRef<'_>| { @@ -96,10 +115,18 @@ impl EsStreamChunkConverter { Ok(keys.join(&delimiter)) }) }; - let json_encoder = JsonEncoder::new_with_es(schema, None); + let col_indices = if let Some(index) = index_column { + let mut col_indices: Vec = (0..schema.len()).collect(); + col_indices.remove(index); + Some(col_indices) + } else { + None + }; + let json_encoder = JsonEncoder::new_with_es(schema, col_indices); Ok(Self { json_encoder, fn_build_id, + index_column, }) } @@ -109,23 +136,30 @@ impl EsStreamChunkConverter { ::new(chunk.capacity()); let mut json_builder = ::new(chunk.capacity()); + let mut index_builder = + ::new(chunk.capacity()); for (op, row) in chunk.rows() { ops.push(op); + id_string_builder.append(Some(&self.build_id(row)?)); + if let Some(index) = self.index_column { + index_builder.append(Some( + row.datum_at(index) + .ok_or_else(|| anyhow!("No value find in row, index is {}", index))? + .into_utf8(), + )); + } else { + index_builder.append_null(); + } let json = JsonbVal::from(Value::Object(self.json_encoder.encode(row)?)); - risingwave_common::array::ArrayBuilder::append( - &mut id_string_builder, - Some(&self.build_id(row)?), - ); - risingwave_common::array::ArrayBuilder::append( - &mut json_builder, - Some(json.as_scalar_ref()), - ); + json_builder.append(Some(json.as_scalar_ref())); } let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder); let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder); + let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder); Ok(StreamChunk::new( ops, vec![ + std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)), std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)), std::sync::Arc::new(ArrayImpl::Jsonb(json_array)), ], diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 1b989cfb53bae..4d3c48b6ec5c7 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -267,7 +267,8 @@ impl RemoteLogSinker { let payload_schema = if sink_name == ElasticSearchSink::SINK_NAME { let columns = vec![ ColumnDesc::unnamed(ColumnId::from(0), DataType::Varchar).to_protobuf(), - ColumnDesc::unnamed(ColumnId::from(1), DataType::Jsonb).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(1), DataType::Varchar).to_protobuf(), + ColumnDesc::unnamed(ColumnId::from(2), DataType::Jsonb).to_protobuf(), ]; Some(TableSchema { columns, From cf94fb9d94a43063e7693e68e5b062b06399edef Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 28 Mar 2024 13:47:19 +0800 Subject: [PATCH 07/10] chore(deps): Bump uuid from 1.7.0 to 1.8.0 (#15960) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 141e9ab0edadf..2fa104c560f04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13209,9 +13209,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom", "rand", From 390465cc9b0abc7eb7806ee5647a62055f58b70c Mon Sep 17 00:00:00 2001 From: Wallace Date: Thu, 28 Mar 2024 13:50:48 +0800 Subject: [PATCH 08/10] feat(compaction): optimize compaction overlapping level (#15736) Signed-off-by: Little-Wallace --- .../src/hummock/compactor/compactor_runner.rs | 173 ++++++++++++++---- 1 file changed, 135 insertions(+), 38 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index c22d8eb5a30a9..a229ef6e15f93 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -27,9 +27,9 @@ use risingwave_hummock_sdk::compact::{ use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker}; use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon}; use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, TableStatsMap}; -use risingwave_hummock_sdk::{can_concat, HummockSstableObjectId}; +use risingwave_hummock_sdk::{can_concat, HummockSstableObjectId, KeyComparator}; use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; -use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType}; +use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -64,6 +64,8 @@ pub struct CompactorRunner { split_index: usize, } +const MAX_OVERLAPPING_SST: usize = 64; + impl CompactorRunner { pub fn new( split_index: usize, @@ -153,63 +155,69 @@ impl CompactorRunner { &self, task_progress: Arc, ) -> HummockResult> { + let compactor_iter_max_io_retry_times = self + .compactor + .context + .storage_opts + .compactor_iter_max_io_retry_times; let mut table_iters = Vec::new(); for level in &self.compact_task.input_ssts { if level.table_infos.is_empty() { continue; } + let tables = level + .table_infos + .iter() + .filter(|table_info| { + let key_range = KeyRange::from(table_info.key_range.as_ref().unwrap()); + let table_ids = &table_info.table_ids; + let exist_table = table_ids + .iter() + .any(|table_id| self.compact_task.existing_table_ids.contains(table_id)); + + self.key_range.full_key_overlap(&key_range) && exist_table + }) + .cloned() + .collect_vec(); // Do not need to filter the table because manager has done it. if level.level_type == LevelType::Nonoverlapping as i32 { debug_assert!(can_concat(&level.table_infos)); - let tables = level - .table_infos - .iter() - .filter(|table_info| { - let key_range = KeyRange::from(table_info.key_range.as_ref().unwrap()); - let table_ids = &table_info.table_ids; - let exist_table = table_ids.iter().any(|table_id| { - self.compact_task.existing_table_ids.contains(table_id) - }); - - self.key_range.full_key_overlap(&key_range) && exist_table - }) - .cloned() - .collect_vec(); - table_iters.push(ConcatSstableIterator::new( self.compact_task.existing_table_ids.clone(), tables, self.compactor.task_config.key_range.clone(), self.sstable_store.clone(), task_progress.clone(), - self.compactor - .context - .storage_opts - .compactor_iter_max_io_retry_times, + compactor_iter_max_io_retry_times, )); + } else if tables.len() > MAX_OVERLAPPING_SST { + let sst_groups = partition_overlapping_sstable_infos(tables); + tracing::warn!( + "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups", + level.table_infos.len(), + sst_groups.len() + ); + for table_infos in sst_groups { + assert!(can_concat(&table_infos)); + table_iters.push(ConcatSstableIterator::new( + self.compact_task.existing_table_ids.clone(), + table_infos, + self.compactor.task_config.key_range.clone(), + self.sstable_store.clone(), + task_progress.clone(), + compactor_iter_max_io_retry_times, + )); + } } else { - for table_info in &level.table_infos { - let key_range = KeyRange::from(table_info.key_range.as_ref().unwrap()); - let table_ids = &table_info.table_ids; - let exist_table = table_ids - .iter() - .any(|table_id| self.compact_task.existing_table_ids.contains(table_id)); - - if !self.key_range.full_key_overlap(&key_range) || !exist_table { - continue; - } - + for table_info in tables { table_iters.push(ConcatSstableIterator::new( self.compact_task.existing_table_ids.clone(), - vec![table_info.clone()], + vec![table_info], self.compactor.task_config.key_range.clone(), self.sstable_store.clone(), task_progress.clone(), - self.compactor - .context - .storage_opts - .compactor_iter_max_io_retry_times, + compactor_iter_max_io_retry_times, )); } } @@ -227,6 +235,58 @@ impl CompactorRunner { } } +pub fn partition_overlapping_sstable_infos( + mut origin_infos: Vec, +) -> Vec> { + pub struct SstableGroup { + ssts: Vec, + max_right_bound: Vec, + } + + impl PartialEq for SstableGroup { + fn eq(&self, other: &SstableGroup) -> bool { + self.max_right_bound == other.max_right_bound + } + } + impl PartialOrd for SstableGroup { + fn partial_cmp(&self, other: &SstableGroup) -> Option { + Some(self.cmp(other)) + } + } + impl Eq for SstableGroup {} + impl Ord for SstableGroup { + fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering { + // Pick group with the smallest right bound for every new sstable. + KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound) + } + } + let mut groups: BinaryHeap = BinaryHeap::default(); + origin_infos.sort_by(|a, b| { + let x = a.key_range.as_ref().unwrap(); + let y = b.key_range.as_ref().unwrap(); + KeyComparator::compare_encoded_full_key(&x.left, &y.left) + }); + for sst in origin_infos { + // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition. + if let Some(mut prev_group) = groups.peek_mut() { + if KeyComparator::encoded_full_key_less_than( + &prev_group.max_right_bound, + &sst.key_range.as_ref().unwrap().left, + ) { + prev_group.max_right_bound = sst.key_range.as_ref().unwrap().right.clone(); + prev_group.ssts.push(sst); + continue; + } + } + groups.push(SstableGroup { + max_right_bound: sst.key_range.as_ref().unwrap().right.clone(), + ssts: vec![sst], + }); + } + assert!(!groups.is_empty()); + groups.into_iter().map(|group| group.ssts).collect_vec() +} + /// Handles a compaction task and reports its status to hummock manager. /// Always return `Ok` and let hummock manager handle errors. pub async fn compact( @@ -864,3 +924,40 @@ where Ok(compaction_statistics) } + +#[cfg(test)] +pub mod tests { + use risingwave_hummock_sdk::can_concat; + + use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos; + use crate::hummock::iterator::test_utils::mock_sstable_store; + use crate::hummock::test_utils::{ + default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of, + }; + use crate::hummock::value::HummockValue; + + #[tokio::test] + async fn test_partition_overlapping_level() { + const TEST_KEYS_COUNT: usize = 10; + let sstable_store = mock_sstable_store(); + let mut table_infos = vec![]; + for object_id in 0..10 { + let start_index = object_id * TEST_KEYS_COUNT; + let end_index = start_index + 2 * TEST_KEYS_COUNT; + let table_info = gen_test_sstable_info( + default_builder_opt_for_test(), + object_id as u64, + (start_index..end_index) + .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))), + sstable_store.clone(), + ) + .await; + table_infos.push(table_info); + } + let table_infos = partition_overlapping_sstable_infos(table_infos); + assert_eq!(table_infos.len(), 2); + for ssts in table_infos { + assert!(can_concat(&ssts)); + } + } +} From 0b56ac3c56b9992bff17b17dfe8badb807a9011d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 28 Mar 2024 05:51:02 +0000 Subject: [PATCH 09/10] chore(deps): Bump rust_decimal from 1.34.2 to 1.35.0 (#15958) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 20 +++----------------- src/tests/e2e_extended_mode/Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fa104c560f04..c8fa0f3011732 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7624,20 +7624,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" -[[package]] -name = "postgres" -version = "0.19.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bed5017bc2ff49649c0075d0d7a9d676933c1292480c1d137776fb205b5cd18" -dependencies = [ - "bytes", - "fallible-iterator 0.2.0", - "futures-util", - "log", - "tokio", - "tokio-postgres", -] - [[package]] name = "postgres-derive" version = "0.4.5" @@ -10590,15 +10576,15 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.34.2" +version = "1.35.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "755392e1a2f77afd95580d3f0d0e94ac83eeeb7167552c9b5bca549e61a94d83" +checksum = "1790d1c4c0ca81211399e0e0af16333276f375209e71a37b67698a373db5b47a" dependencies = [ "arrayvec", "borsh", "bytes", "num-traits", - "postgres", + "postgres-types", "rand", "rkyv", "serde", diff --git a/src/tests/e2e_extended_mode/Cargo.toml b/src/tests/e2e_extended_mode/Cargo.toml index 81644ee3430cf..4871d14108e8a 100644 --- a/src/tests/e2e_extended_mode/Cargo.toml +++ b/src/tests/e2e_extended_mode/Cargo.toml @@ -18,7 +18,7 @@ anyhow = { version = "1", features = ["backtrace"] } chrono = { version = "0.4", features = ['serde'] } clap = { version = "4", features = ["derive"] } pg_interval = "0.4" -rust_decimal ={ version = "1.34", features = ["db-postgres"] } +rust_decimal ={ version = "1.35", features = ["db-postgres"] } tokio = { version = "0.2.24", package = "madsim-tokio", features = ["rt", "macros","rt-multi-thread"] } tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } tracing = "0.1" From e0b71bad5803d0359db5196dc6ab2137efcaf9a9 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Thu, 28 Mar 2024 14:38:42 +0800 Subject: [PATCH 10/10] chore: expose a config to switch to opendal for s3 backend (#15969) --- src/common/src/config.rs | 14 ++++++++++++++ src/config/example.toml | 1 + src/object_store/src/object/mod.rs | 22 +++++++++------------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 92db04cc722b5..9948f1c31b3a6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1030,6 +1030,9 @@ pub struct S3ObjectStoreDeveloperConfig { default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes" )] pub object_store_retryable_service_error_codes: Vec, + + #[serde(default = "default::object_store_config::s3::developer::use_opendal")] + pub use_opendal: bool, } impl SystemConfig { @@ -1718,6 +1721,9 @@ pub mod default { } pub mod developer { + use crate::util::env_var::env_var_is_false_or; + const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; + pub fn object_store_retry_unknown_service_error() -> bool { false } @@ -1725,6 +1731,14 @@ pub mod default { pub fn object_store_retryable_service_error_codes() -> Vec { vec!["SlowDown".into(), "TooManyRequests".into()] } + + pub fn use_opendal() -> bool { + // TODO: deprecate this config when we are completely switch from aws sdk to opendal. + // The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is + // 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set. + // 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set. + !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) + } } } } diff --git a/src/config/example.toml b/src/config/example.toml index 25c2f4b200f8a..b6605b5305cc9 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -203,6 +203,7 @@ identity_resolution_timeout_s = 5 [storage.object_store.s3.developer] object_store_retry_unknown_service_error = false object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"] +use_opendal = true [system] barrier_interval_ms = 1000 diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index 89622e66895c9..c75447323c30e 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -32,7 +32,6 @@ use await_tree::InstrumentAwait; use futures::stream::BoxStream; use futures::StreamExt; pub use risingwave_common::config::ObjectStoreConfig; -use risingwave_common::util::env_var::env_var_is_false_or; pub use s3::*; pub mod error; @@ -50,8 +49,6 @@ use self::sim::SimObjectStore; pub type ObjectStoreRef = Arc; pub type ObjectStreamingUploader = MonitoredStreamingUploader; -const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3"; - type BoxedStreamingUploader = Box; pub trait ObjectRangeBounds = RangeBounds + Clone + Send + Sync + std::fmt::Debug + 'static; @@ -797,8 +794,15 @@ pub async fn build_remote_object_store( ) -> ObjectStoreImpl { match url { s3 if s3.starts_with("s3://") => { - // only switch to s3 sdk when `RW_USE_OPENDAL_FOR_S3` is set to false. - if env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) { + if config.s3.developer.use_opendal { + let bucket = s3.strip_prefix("s3://").unwrap(); + tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket); + ObjectStoreImpl::Opendal( + OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) + .unwrap() + .monitored(metrics, config), + ) + } else { ObjectStoreImpl::S3( S3ObjectStore::new_with_config( s3.strip_prefix("s3://").unwrap().to_string(), @@ -808,14 +812,6 @@ pub async fn build_remote_object_store( .await .monitored(metrics, config), ) - } else { - let bucket = s3.strip_prefix("s3://").unwrap(); - tracing::info!("Using OpenDAL to access s3, bucket is {}", bucket); - ObjectStoreImpl::Opendal( - OpendalObjectStore::new_s3_engine(bucket.to_string(), config.clone()) - .unwrap() - .monitored(metrics, config), - ) } } #[cfg(feature = "hdfs-backend")]