Skip to content

Commit

Permalink
Merge branch 'main' into jinser/connector-ssl
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Mar 28, 2024
2 parents 1118b9f + e0b71ba commit c603d1c
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 114 deletions.
36 changes: 11 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions ci/scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ fi

echo "--- Build risingwave release binary"
export ENABLE_BUILD_DASHBOARD=1
if [ ${ARCH} == "aarch64" ]; then
# enable large page size support for jemalloc
# see https://github.com/tikv/jemallocator/blob/802969384ae0c581255f3375ee2ba774c8d2a754/jemalloc-sys/build.rs#L218
export JEMALLOC_SYS_WITH_LG_PAGE=16
fi
cargo build -p risingwave_cmd_all --features "rw-static-link" --profile release
cargo build -p risingwave_cmd --bin risectl --features "rw-static-link" --profile release
cd target/release && chmod +x risingwave risectl
Expand All @@ -75,8 +80,8 @@ if [ "${BUILDKITE_SOURCE}" == "schedule" ]; then
tar -czvf risingwave-"$(date '+%Y%m%d')"-${ARCH}-unknown-linux.tar.gz risingwave
aws s3 cp risingwave-"$(date '+%Y%m%d')"-${ARCH}-unknown-linux.tar.gz s3://rw-nightly-pre-built-binary
elif [[ -n "${BINARY_NAME+x}" ]]; then
tar -czvf risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz risingwave
aws s3 cp risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz s3://rw-nightly-pre-built-binary
tar -czvf risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz risingwave
aws s3 cp risingwave-${BINARY_NAME}-${ARCH}-unknown-linux.tar.gz s3://rw-nightly-pre-built-binary
fi

echo "--- Build connector node"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ public void testEsSink(ElasticsearchContainer container, String username, String
throws IOException {
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress(), "test")
new EsSinkConfig(container.getHttpHostAddress())
.withIndex("test")
.withDelimiter("$")
.withUsername(username)
.withPassword(password),
getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(Op.INSERT, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
new ArraySinkRow(
Op.INSERT, null, "1$Alice", "{\"id\":1,\"name\":\"Alice\"}"),
new ArraySinkRow(Op.INSERT, null, "2$Bob", "{\"id\":2,\"name\":\"Bob\"}")));
sink.sync();
// container is slow here, but our default flush time is 5s,
// so 3s is enough for sync test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,32 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}

private void processUpsert(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);
String doc = (String) row.get(1);

UpdateRequest updateRequest =
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
final String index = (String) row.get(0);
final String key = (String) row.get(1);
String doc = (String) row.get(2);

UpdateRequest updateRequest;
if (config.getIndex() != null) {
updateRequest =
new UpdateRequest(config.getIndex(), "_doc", key).doc(doc, XContentType.JSON);
} else {
updateRequest = new UpdateRequest(index, "_doc", key).doc(doc, XContentType.JSON);
}
updateRequest.docAsUpsert(true);
this.requestTracker.addWriteTask();
bulkProcessor.add(updateRequest);
}

private void processDelete(SinkRow row) throws JsonMappingException, JsonProcessingException {
final String key = (String) row.get(0);

DeleteRequest deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
final String index = (String) row.get(0);
final String key = (String) row.get(1);

DeleteRequest deleteRequest;
if (config.getIndex() != null) {
deleteRequest = new DeleteRequest(config.getIndex(), "_doc", key);
} else {
deleteRequest = new DeleteRequest(index, "_doc", key);
}
this.requestTracker.addWriteTask();
bulkProcessor.add(deleteRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public class EsSinkConfig extends CommonSinkConfig {
/** Required */
private String url;

/** Required */
/** Optional */
@JsonProperty(value = "index")
private String index;

/** Optional, delimiter for generating id */
Expand All @@ -37,11 +38,12 @@ public class EsSinkConfig extends CommonSinkConfig {
@JsonProperty(value = "password")
private String password;

@JsonProperty(value = "index_column")
private String indexColumn;

@JsonCreator
public EsSinkConfig(
@JsonProperty(value = "url") String url, @JsonProperty(value = "index") String index) {
public EsSinkConfig(@JsonProperty(value = "url") String url) {
this.url = url;
this.index = index;
}

public String getUrl() {
Expand All @@ -52,6 +54,11 @@ public String getIndex() {
return index;
}

public EsSinkConfig withIndex(String index) {
this.index = index;
return this;
}

public String getDelimiter() {
return delimiter;
}
Expand All @@ -78,4 +85,13 @@ public EsSinkConfig withPassword(String password) {
this.password = password;
return this;
}

public String getIndexColumn() {
return indexColumn;
}

public EsSinkConfig withIndexColumn(String indexColumn) {
this.indexColumn = indexColumn;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkWriterV1;
import com.risingwave.proto.Catalog;
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +62,32 @@ public void validate(
} catch (IllegalArgumentException e) {
throw Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException();
}
if (config.getIndexColumn() != null) {
Data.DataType.TypeName typeName = tableSchema.getColumnType(config.getIndexColumn());
if (typeName == null) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column " + config.getIndexColumn() + " not found in schema")
.asRuntimeException();
}
if (!typeName.equals(Data.DataType.TypeName.VARCHAR)) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Index column must be of type String, but found " + typeName)
.asRuntimeException();
}
if (config.getIndex() != null) {
throw Status.INVALID_ARGUMENT
.withDescription("index and index_column cannot be set at the same time")
.asRuntimeException();
}
} else {
if (config.getIndex() == null) {
throw Status.INVALID_ARGUMENT
.withDescription("index or index_column must be set")
.asRuntimeException();
}
}

// 2. check connection
RestClientBuilder builder = RestClient.builder(host);
Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
<jackson.version>2.13.5</jackson.version>
<spark_sql.version>3.3.1</spark_sql.version>
<hadoop.version>3.3.3</hadoop.version>
<elasticsearch.version>7.17.14</elasticsearch.version>
<elasticsearch.version>7.17.19</elasticsearch.version>
<datastax.version>4.15.0</datastax.version>
<flink.version>1.18.0</flink.version>
<testcontainers.version>1.17.6</testcontainers.version>
Expand Down
14 changes: 14 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,9 @@ pub struct S3ObjectStoreDeveloperConfig {
default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes"
)]
pub object_store_retryable_service_error_codes: Vec<String>,

#[serde(default = "default::object_store_config::s3::developer::use_opendal")]
pub use_opendal: bool,
}

impl SystemConfig {
Expand Down Expand Up @@ -1718,13 +1721,24 @@ pub mod default {
}

pub mod developer {
use crate::util::env_var::env_var_is_false_or;
const RW_USE_OPENDAL_FOR_S3: &str = "RW_USE_OPENDAL_FOR_S3";

pub fn object_store_retry_unknown_service_error() -> bool {
false
}

pub fn object_store_retryable_service_error_codes() -> Vec<String> {
vec!["SlowDown".into(), "TooManyRequests".into()]
}

pub fn use_opendal() -> bool {
// TODO: deprecate this config when we are completely switch from aws sdk to opendal.
// The reason why we use !env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false) here is
// 1. Maintain compatibility so that there is no behavior change in cluster with RW_USE_OPENDAL_FOR_S3 set.
// 2. Change the default behavior to use opendal for s3 if RW_USE_OPENDAL_FOR_S3 is not set.
!env_var_is_false_or(RW_USE_OPENDAL_FOR_S3, false)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ identity_resolution_timeout_s = 5
[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = false
object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"]
use_opendal = true

[system]
barrier_interval_ms = 1000
Expand Down
Loading

0 comments on commit c603d1c

Please sign in to comment.