Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][connector-V2-Neo4j]Supports neo4j sink batch write and update docs #4841

Merged
merged 34 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
de2b159
[Feature][connectors]supports neo4j sink batch write
FuYouJ May 26, 2023
7d9c5ea
[Feature][connectors]supports neo4j sink batch write
FuYouJ May 26, 2023
4fef517
[Feature][connectors]supports neo4j sink batch write
FuYouJ May 26, 2023
fc34af9
[Feature][connectors]supports neo4j sink batch write
FuYouJ May 26, 2023
0a7d847
[Docs][Feature][connector-V2][Neo4jSink]Providing a configuration exa…
FuYouJ May 26, 2023
a6f3377
[Docs][Feature][connector-V2][Neo4jSink]update change log docs
FuYouJ May 26, 2023
b0980f3
[Docs][Feature][connector-V2][Neo4jSink]update change log docs
FuYouJ May 26, 2023
cae39d3
[Improve][connector-V2][Neo4jSink]Added neo4j database exception hand…
FuYouJ May 27, 2023
5291d32
[Improve][connector-V2][Neo4jSink]Add Licensed to asf
FuYouJ May 27, 2023
ae4e1b3
[Improve][connector-V2][Neo4jSink]fix code style
FuYouJ May 27, 2023
4d127f7
[Feature][connectors]supports neo4j sink batch write
FuYouJ May 26, 2023
01f9375
[Feature][connectors]supports neo4j sink batch write
FuYouJ May 26, 2023
225eede
Merge branch 'dev' into sink_neo4j_batch_write
FuYouJ May 28, 2023
0602979
Merge branch 'apache:dev' into dev
FuYouJ May 28, 2023
4910544
Merge remote-tracking branch 'origin/dev' into sink_neo4j_batch_write
FuYouJ May 28, 2023
a670b45
Merge branch 'apache:dev' into dev
FuYouJ May 30, 2023
03f2306
Merge remote-tracking branch 'origin/dev' into sink_neo4j_batch_write
FuYouJ May 30, 2023
77d2c18
Merge branch 'apache:dev' into dev
FuYouJ May 31, 2023
37cce01
Merge remote-tracking branch 'origin/dev' into sink_neo4j_batch_write
FuYouJ May 31, 2023
89f4f5b
Merge branch 'apache:dev' into dev
FuYouJ Jun 3, 2023
34ef1b1
Merge branch 'dev' into sink_neo4j_batch_write
FuYouJ Jun 3, 2023
586d12f
[Improve][connector-V2][Neo4jSink]prepare sink by different config
FuYouJ Jun 3, 2023
3db86ca
[Improve][connector-V2][Neo4jSink]provide e2e neo4j batch write test
FuYouJ Jun 3, 2023
71366f8
Merge branch 'apache:dev' into dev
FuYouJ Jun 5, 2023
8019bcf
Merge branch 'apache:dev' into dev
FuYouJ Jun 6, 2023
4ee9be0
Merge remote-tracking branch 'origin/dev' into sink_neo4j_batch_write
FuYouJ Jun 6, 2023
61b6e33
[Improve][connector-V2][Neo4jSink]clean code and update docs,test conf
FuYouJ Jun 6, 2023
2a79797
Merge branch 'apache:dev' into dev
FuYouJ Jun 6, 2023
81b06d1
Merge remote-tracking branch 'origin/dev' into sink_neo4j_batch_write
FuYouJ Jun 6, 2023
b3435d0
[Improve][connector-V2][Neo4jSink]resolve release-note.md conflict
FuYouJ Jun 6, 2023
740042b
[Improve][connector-V2][Neo4jSink]update docs
FuYouJ Jun 6, 2023
d8a63a5
Merge branch 'apache:dev' into dev
FuYouJ Jun 9, 2023
1484dc1
Merge remote-tracking branch 'origin/dev' into sink_neo4j_batch_write
FuYouJ Jun 9, 2023
7e8a3c3
[Improve][connector-V2][Neo4jSink]removed batch_data_variable config …
FuYouJ Jun 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 55 additions & 14 deletions docs/en/connector-v2/sink/Neo4j.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ Write data to Neo4j.

## Options

| name | type | required | default value |
|----------------------------|--------|----------|---------------|
| uri | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| bearer_token | String | No | - |
| kerberos_ticket | String | No | - |
| database | String | Yes | - |
| query | String | Yes | - |
| queryParamPosition | Object | Yes | - |
| max_transaction_retry_time | Long | No | 30 |
| max_connection_timeout | Long | No | 30 |
| common-options | config | no | - |
| name | type | required | default value |
|----------------------------|---------|----------|---------------|
| uri | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| max_batch_size | Integer | No | - |
| write_mode | String | No | OneByOne |
| bearer_token | String | No | - |
| kerberos_ticket | String | No | - |
| database | String | Yes | - |
| query | String | Yes | - |
| queryParamPosition | Object | Yes | - |
| max_transaction_retry_time | Long | No | 30 |
| max_connection_timeout | Long | No | 30 |
| common-options | config | no | - |

### uri [string]

Expand All @@ -40,6 +42,20 @@ username of the Neo4j

password of the Neo4j. required if `username` is provided

### max_batch_size[Integer]

max_batch_size refers to the maximum number of data entries that can be written in a single transaction when writing to a database.

### write_mode

The default value is oneByOne, or set it to "Batch" if you want to have the ability to write in batches

```cypher
unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age
```

"ttt" represents a batch of data.,"ttt" can be any arbitrary string as long as it matches the configured "batch_data_variable".

### bearer_token [string]

base64 encoded bearer token of the Neo4j. for Auth.
Expand Down Expand Up @@ -76,7 +92,7 @@ The maximum amount of time to wait for a TCP connection to be established (secon

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details

## Example
## WriteOneByOneExample

```
sink {
Expand All @@ -98,9 +114,34 @@ sink {
}
```

## WriteBatchExample
> The unwind keyword provided by cypher supports batch writing, and the default variable for a batch of data is batch. If you write a batch write statement, then you should declare cypher:unwind $batch as row to do someting
```
sink {
Neo4j {
uri = "bolt://localhost:7687"
username = "neo4j"
password = "neo4j"
database = "neo4j"
max_batch_size = 1000
write_mode = "BATCH"

max_transaction_retry_time = 3
max_connection_timeout = 10

query = "unwind $batch as row create(n:MyLabel) set n.name = row.name,n.age = row.age"

}
}
```

## Changelog

### 2.2.0-beta 2022-09-26

- Add Neo4j Sink Connector

### issue ##4835

- Sink supports batch write

1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
- [Connector-V2] [Doris] Add a jobId to the doris label to distinguish between tasks (#4839) (#4853)
- [Connector-v2] [Mongodb]Refactor mongodb connector (#4620)
- [Connector-v2] [Jdbc] Populate primary key when jdbc sink is created using CatalogTable (#4755)
- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
- [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion bug of transform (#4490)

### CI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,31 @@

package org.apache.seatunnel.connectors.seatunnel.neo4j.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;

import org.neo4j.driver.AuthTokens;

import lombok.Data;

import java.io.Serializable;
import java.net.URI;

import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;

/**
* Because Neo4jQueryInfo is one of the Neo4jSink's member variable, So Neo4jQueryInfo need
Expand All @@ -29,4 +51,89 @@
public abstract class Neo4jQueryInfo implements Serializable {
protected DriverBuilder driverBuilder;
protected String query;

protected PluginType pluginType;

public Neo4jQueryInfo(Config config, PluginType pluginType) {
this.pluginType = pluginType;
this.driverBuilder = prepareDriver(config, pluginType);
this.query = prepareQuery(config, pluginType);
}

// which is identical to the prepareDriver methods of the source and sink.
// the only difference is the pluginType mentioned in the error messages.
// so move code to here
protected DriverBuilder prepareDriver(Config config, PluginType pluginType) {
final CheckResult uriConfigCheck =
CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(), KEY_DATABASE.key());
final CheckResult authConfigCheck =
CheckConfigUtil.checkAtLeastOneExists(
config,
KEY_USERNAME.key(),
KEY_BEARER_TOKEN.key(),
KEY_KERBEROS_TICKET.key());
final CheckResult mergedConfigCheck =
CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
if (!mergedConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, pluginType, mergedConfigCheck.getMsg()));
}

final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));

final DriverBuilder driverBuilder = DriverBuilder.create(uri);

if (config.hasPath(KEY_USERNAME.key())) {
final CheckResult pwParamCheck =
CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
if (!pwParamCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, pluginType, pwParamCheck.getMsg()));
}
final String username = config.getString(KEY_USERNAME.key());
final String password = config.getString(KEY_PASSWORD.key());

driverBuilder.setUsername(username);
driverBuilder.setPassword(password);
} else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
final String bearerToken = config.getString(KEY_BEARER_TOKEN.key());
AuthTokens.bearer(bearerToken);
driverBuilder.setBearerToken(bearerToken);
} else {
final String kerberosTicket = config.getString(KEY_KERBEROS_TICKET.key());
AuthTokens.kerberos(kerberosTicket);
driverBuilder.setBearerToken(kerberosTicket);
}

driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));

if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
driverBuilder.setMaxConnectionTimeoutSeconds(
config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
}
if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
driverBuilder.setMaxTransactionRetryTimeSeconds(
config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
}

return driverBuilder;
}

private String prepareQuery(Config config, PluginType pluginType) {
CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_QUERY.key());
if (!queryConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, pluginType, queryConfigCheck.getMsg()));
}
return config.getString(KEY_QUERY.key());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;

import java.util.Map;

Expand All @@ -29,4 +30,16 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
.noDefaultValue()
.withDescription(
"position mapping information for query parameters. key name is parameter placeholder name. associated value is position of field in input data row.");

public static final Option<Integer> MAX_BATCH_SIZE =
Options.key("max_batch_size")
.intType()
.defaultValue(500)
.withDescription("neo4j write max batch size");
public static final Option<SinkWriteMode> WRITE_MODE =
Options.key("write_mode")
.enumType(SinkWriteMode.class)
.defaultValue(SinkWriteMode.ONE_BY_ONE)
.withDescription(
"The write mode on the sink end is oneByOne by default in order to maintain compatibility with previous code.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,89 @@

package org.apache.seatunnel.connectors.seatunnel.neo4j.config;

import lombok.Data;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;
import org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;

import lombok.Getter;
import lombok.Setter;

import java.util.Map;

@Data
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.MAX_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.WRITE_MODE;

@Getter
@Setter
public class Neo4jSinkQueryInfo extends Neo4jQueryInfo {

private Map<String, Object> queryParamPosition;
private Integer maxBatchSize;

private SinkWriteMode writeMode;

public boolean batchMode() {
return SinkWriteMode.BATCH.equals(writeMode);
}

public Neo4jSinkQueryInfo(Config config) {
super(config, PluginType.SINK);

this.writeMode = prepareWriteMode(config);

if (SinkWriteMode.BATCH.equals(writeMode)) {
prepareBatchWriteConfig(config);
} else {
prepareOneByOneConfig(config);
}
}

private void prepareOneByOneConfig(Config config) {

CheckResult queryConfigCheck =
CheckConfigUtil.checkAllExists(config, QUERY_PARAM_POSITION.key());

if (!queryConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, PluginType.SINK, queryConfigCheck.getMsg()));
}

// set queryParamPosition
this.queryParamPosition = config.getObject(QUERY_PARAM_POSITION.key()).unwrapped();
}

private void prepareBatchWriteConfig(Config config) {

// batch size
if (config.hasPath(MAX_BATCH_SIZE.key())) {
int batchSize = config.getInt(MAX_BATCH_SIZE.key());
if (batchSize <= 0) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, PluginType.SINK, "maxBatchSize must greater than 0"));
}
this.maxBatchSize = batchSize;
} else {
this.maxBatchSize = MAX_BATCH_SIZE.defaultValue();
}
}

private SinkWriteMode prepareWriteMode(Config config) {
if (config.hasPath(WRITE_MODE.key())) {
return config.getEnum(SinkWriteMode.class, WRITE_MODE.key());
}
return WRITE_MODE.defaultValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.neo4j.config;

public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {}
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.common.constants.PluginType;

public class Neo4jSourceQueryInfo extends Neo4jQueryInfo {

public Neo4jSourceQueryInfo(Config pluginConfig) {
super(pluginConfig, PluginType.SOURCE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.neo4j.constants;

public enum CypherEnum {
BATCH("batch", "a variable in cypher that represents a batch of data");
private final String value;
private final String description;

CypherEnum(String value, String description) {
this.value = value;
this.description = description;
}

public String getValue() {
return value;
}
}
Loading