Skip to content

Commit

Permalink
[Improve][connector-V2-Neo4j]Supports neo4j sink batch write and upda…
Browse files Browse the repository at this point in the history
…te docs (apache#4841)
  • Loading branch information
FuYouJ authored and EricJoy2048 committed Jul 3, 2023
1 parent 2b62bbc commit 31e0d9b
Show file tree
Hide file tree
Showing 15 changed files with 666 additions and 206 deletions.
69 changes: 55 additions & 14 deletions docs/en/connector-v2/sink/Neo4j.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ Write data to Neo4j.

## Options

| name | type | required | default value |
|----------------------------|--------|----------|---------------|
| uri | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| bearer_token | String | No | - |
| kerberos_ticket | String | No | - |
| database | String | Yes | - |
| query | String | Yes | - |
| queryParamPosition | Object | Yes | - |
| max_transaction_retry_time | Long | No | 30 |
| max_connection_timeout | Long | No | 30 |
| common-options | config | no | - |
| name | type | required | default value |
|----------------------------|---------|----------|---------------|
| uri | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| max_batch_size | Integer | No | - |
| write_mode | String | No | OneByOne |
| bearer_token | String | No | - |
| kerberos_ticket | String | No | - |
| database | String | Yes | - |
| query | String | Yes | - |
| queryParamPosition | Object | Yes | - |
| max_transaction_retry_time | Long | No | 30 |
| max_connection_timeout | Long | No | 30 |
| common-options | config | no | - |

### uri [string]

Expand All @@ -40,6 +42,20 @@ username of the Neo4j

password of the Neo4j. required if `username` is provided

### max_batch_size[Integer]

max_batch_size refers to the maximum number of data entries that can be written in a single transaction when writing to a database.

### write_mode

The default value is oneByOne, or set it to "Batch" if you want to have the ability to write in batches

```cypher
unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age
```

"ttt" represents a batch of data.,"ttt" can be any arbitrary string as long as it matches the configured "batch_data_variable".

### bearer_token [string]

base64 encoded bearer token of the Neo4j. for Auth.
Expand Down Expand Up @@ -76,7 +92,7 @@ The maximum amount of time to wait for a TCP connection to be established (secon

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details

## Example
## WriteOneByOneExample

```
sink {
Expand All @@ -98,9 +114,34 @@ sink {
}
```

## WriteBatchExample
> The unwind keyword provided by cypher supports batch writing, and the default variable for a batch of data is batch. If you write a batch write statement, then you should declare cypher:unwind $batch as row to do someting
```
sink {
Neo4j {
uri = "bolt://localhost:7687"
username = "neo4j"
password = "neo4j"
database = "neo4j"
max_batch_size = 1000
write_mode = "BATCH"
max_transaction_retry_time = 3
max_connection_timeout = 10
query = "unwind $batch as row create(n:MyLabel) set n.name = row.name,n.age = row.age"
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26

- Add Neo4j Sink Connector

### issue ##4835

- Sink supports batch write

67 changes: 66 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,27 @@
- [Connector-V2] [Jdbc] Field aliases are not supported in the query of jdbc source. (#4210)
- [Connector-V2] [Jdbc] Fix connection failure caused by connection timeout. (#4322)
- [Connector-V2] [Jdbc] Set default value to false of JdbcOption: generate_sink_sql (#4471)
- [Connector-V2] [JDBC] Fix TiDBCatalog without open (#4718)
- [Connector-V2] [Jdbc] Fix XA DataSource crash(Oracle/Dameng/SqlServer) (#4866)
- [Connector-V2] [Pulsar] Fix the bug that can't consume messages all the time. (#4125)
- [Connector-V2] [Eleasticsearch] Document description error (#4390)
- [Connector-V2] [Eleasticsearch] Source deserializer error and inappropriate (#4233)
- [Connector-V2] [Kafka] Fix KafkaProducer resources have never been released. (#4302)
- [Connector-V2] [Kafka] Fix the permission problem caused by client.id. (#4246)
- [Connector-V2] [Kafka] Fix KafkaConsumerThread exit caused by commit offset error. (#4379)
- [Connector-V2] [kafka] Fix the problem that the partition information can not be obtained when kafka is restored (#4764)
- [Connector-V2] [SFTP] Fix incorrect exception handling logic (#4720)
- [Connector-V2] [File] Fix read temp file (#4876)
- [Connector-V2] [CDC Base] Solving the ConcurrentModificationException caused by snapshotState being modified concurrently. (#4877)
- [Connector-V2] [Doris] update last checkpoint id when doing snapshot (#4881)
- [Connector-v2] [kafka] Fix the short interval of pull data settings and revise the format (#4875)
- [Connector-v2] [RabbitMQ] Fix reduplicate ack msg bug and code style (#4842)
- [Connector-V2] [Jdbc] Fix the error of extracting primary key column in sink (#4815)
- [Connector-V2] [Jdbc] Fix reconnect throw close statement exception (#4801)
- [Connector-V2] [Jdbc] Fix sqlserver system table case sensitivity (#4806)
- [Connector-v2] [File] Fix configuration file format and error comments (#4762)
- [Connector-v2] [Jdbc] Fix oracle sql table identifier (#4754)
- [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)

### Zeta(ST-Engine)

Expand All @@ -31,11 +46,26 @@
- [Zeta] Fix the bug of conf (#4488)
- [Zeta] Fix Connector load logic from zeta (#4510)
- [Zeta] Fix conflict dependency of hadoop-hdfs (#4509)
- [Zeta] Fix TaskExecutionService synchronized lock will not release (#4886)
- [Zeta] Fix TaskExecutionService will return not active ExecutionContext (#4869)
- [Zeta] Fix deploy operation timeout but task already finished bug (#4867)
- [Zeta] Fix restoreComplete Future can't be completed when cancel task (#4863)
- [Zeta] Fix IMap operation timeout bug (#4859)
- [Zeta] fix pipeline state not right bug (#4823)
- [Zeta] Fix the incorrect setting of transform parallelism (#4814)
- [Zeta] Fix master active bug (#4855)
- [Zeta] Fix completePendingCheckpoint concurrent action (#4854)
- [Zeta] Fix engine runtime error (#4850)
- [Zeta] Fix TaskGroupContext always hold classloader so classloader can't recycle (#4849)
- [Zeta] Fix task `notifyTaskStatusToMaster` failed when job not running or failed before run (#4847)
- [Zeta] Fix cpu load problem (#4828)
- [zeta] Fix the deadlock issue with JDBC driver loading (#4878)

### E2E

- [E2E] [Kafka] Fix kafka e2e testcase (#4520)
- [Container Version] Fix risk of unreproducible test cases #4591
- [E2e] [Mysql-cdc] Removing the excess MySqlIncrementalSourceIT e2e reduces the CI time (#4738)

## Improve

Expand All @@ -44,19 +74,31 @@
- [Core] [Spark] Push transform operation from Spark Driver to Executors (#4503)
- [Core] [Starter] Optimize code structure & remove redundant code (#4525)
- [Core] [Translation] [Flink] Optimize code structure & remove redundant code (#4527)
- [Core] [Starter] Add check of sink and source config to avoid null pointer exception. (#4734)

### Connector-V2

- [Connector-V2] [CDC] Improve startup.mode/stop.mode options (#4360)
- [Connector-V2] [CDC] Optimize jdbc fetch-size options (#4352)
- [Connector-V2] [CDC] Fix chunk start/end parameter type error (#4777)
- [Connector-V2] [SQLServer] Fix sqlserver catalog (#4441)
- [Connector-V2] [StarRocks] Improve StarRocks Serialize Error Message (#4458)
- [Connector-V2] [Jdbc] add the log for sql and update some style (#4475)
- [Connector-V2] [Jdbc] Fix the table name is not automatically obtained when multiple tables (#4514)
- [Connector-V2] [S3 & Kafka] Delete unavailable S3 & Kafka Catalogs (#4477)
- [Connector-V2] [Pulsar] Support Canal Format
- [Connector-V2] [CDC base] Implement Sample-based Sharding Strategy with Configurable Sampling Rate (#4856)
- [Connector-V2] [SelectDB] Add a jobId to the selectDB label to distinguish between tasks (#4864)
- [Connector-V2] [Doris] Add a jobId to the doris label to distinguish between tasks (#4839) (#4853)
- [Connector-v2] [Mongodb]Refactor mongodb connector (#4620)
- [Connector-v2] [Jdbc] Populate primary key when jdbc sink is created using CatalogTable (#4755)
- [Connector-v2] [Neo4j] Supports neo4j sink batch write mode (#4835)
- [Transform-V2] Optimize SQL Transform package and Fix Spark type conversion bug of transform (#4490)

### CI

- [CI] Fix error repository name in ci config files (#4795)

### Zeta(ST-Engine)

- [Zeta] Support run the server through daemon mode (#4161)
Expand All @@ -69,6 +111,13 @@
- [Zeta] Remove redundant code (#4489)
- [Zeta] Remove redundancy code in validateSQL (#4506)
- [Zeta] Improve JobMetrics fetch performance (#4467)
- [Zeta] Reduce the operation count of imap_running_job_metrics (#4861)
- [Zeta] Speed up listAllJob function (#4852)
- [Zeta] async execute checkpoint trigger and other block method (#4846)
- [Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)
- [Zeta] Cancel pipeline add retry to avoid cancel failed. (#4792)
- [Zeta] Improve Zeta operation max count and ignore NPE (#4787)
- [Zeta] Remove serialize(deserialize) cost when use shuffle action (#4722)

## Feature

Expand All @@ -85,19 +134,30 @@
- [Connector-V2] [Kafka] Kafka source supports data deserialization failure skipping (#4364)
- [Connector-V2] [Jdbc] [TiDB] Add TiDB catalog (#4438)
- [Connector-V2] [File] Add file excel sink and source (#4164)
- [Connector-v2] [Snowflake] Add Snowflake Source&Sink connector (#4470)
- [Connector-V2] [Pular] support read format for pulsar (#4111)
- [Connector-V2] [Paimon] Introduce paimon connector (#4178)
- [Connector V2] [Cassandra] Expose configurable options in Cassandra (#3681)
- [Connector V2] [Jdbc] Supports GEOMETRY data type for PostgreSQL (#4673)
- [Transform-V2] Add UDF SPI and an example implement for SQL Transform plugin (#4392)
- [Transform-V2] Support copy field list (#4404)
- [Transform-V2] Add support CatalogTable for FieldMapperTransform (#4423)
- [Transform-V2] Add CatalogTable support for ReplaceTransform (#4411)
- [Transform-V2] Add Catalog support for FilterRowKindTransform (#4420)
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)

### Zeta(ST-Engine)

- [Zeta] Support for mixing Factory and Plugin SPI (#4359)
- [Zeta] Add get running job info by jobId rest api (#4140)
- [Zeta] Add REST API To Get System Monitoring Information (#4315)
- [Transform V2 & Zeta] Make SplitTransform Support CatalogTable And CatalogTable Evolution (#4396)
- [Zeta] Move driver into lib directory and change operation count (#4845)
- [Zeta] Add Metaspace size default value to config file (#4848)
- [Zeta] Reduce the frequency of fetching data from imap (#4851)
- [Zeta] Add OSS support for Imap storage to cluster-mode type (#4683)
- [Zeta] Improve local mode startup request ports (#4660)

## Docs

Expand All @@ -107,4 +167,9 @@
- [Docs] Fix max_retries default value is 0. (#4383)
- [Docs] Fix markdown syntax (#4426)
- [Docs] Fix Kafka Doc Error Config Key "kafka." (#4427)
- [Docs] Add Transform to Quick Start v2 (#4436)
- [Docs] Add Transform to Quick Start v2 (#4436)
- [Docs] Fix Mysql sink format doc (#4800)
- [Docs] Add the generate sink sql parameter for the jdbc sink document (#4797)
- [Docs] Add the generate sink sql parameter And example (#4769)
- [Docs] Redshift add defaultRowFetchSize (#4616)
- [Docs] Refactor connector-v2 docs using unified format Mysql (#4590)
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,31 @@

package org.apache.seatunnel.connectors.seatunnel.neo4j.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;

import org.neo4j.driver.AuthTokens;

import lombok.Data;

import java.io.Serializable;
import java.net.URI;

import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_BEARER_TOKEN;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_KERBEROS_TICKET;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_CONNECTION_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_MAX_TRANSACTION_RETRY_TIME;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_NEO4J_URI;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_QUERY;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.KEY_USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jCommonConfig.PLUGIN_NAME;

/**
* Because Neo4jQueryInfo is one of the Neo4jSink's member variable, So Neo4jQueryInfo need
Expand All @@ -29,4 +51,89 @@
public abstract class Neo4jQueryInfo implements Serializable {
protected DriverBuilder driverBuilder;
protected String query;

protected PluginType pluginType;

public Neo4jQueryInfo(Config config, PluginType pluginType) {
this.pluginType = pluginType;
this.driverBuilder = prepareDriver(config, pluginType);
this.query = prepareQuery(config, pluginType);
}

// which is identical to the prepareDriver methods of the source and sink.
// the only difference is the pluginType mentioned in the error messages.
// so move code to here
protected DriverBuilder prepareDriver(Config config, PluginType pluginType) {
final CheckResult uriConfigCheck =
CheckConfigUtil.checkAllExists(config, KEY_NEO4J_URI.key(), KEY_DATABASE.key());
final CheckResult authConfigCheck =
CheckConfigUtil.checkAtLeastOneExists(
config,
KEY_USERNAME.key(),
KEY_BEARER_TOKEN.key(),
KEY_KERBEROS_TICKET.key());
final CheckResult mergedConfigCheck =
CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
if (!mergedConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, pluginType, mergedConfigCheck.getMsg()));
}

final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));

final DriverBuilder driverBuilder = DriverBuilder.create(uri);

if (config.hasPath(KEY_USERNAME.key())) {
final CheckResult pwParamCheck =
CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
if (!pwParamCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, pluginType, pwParamCheck.getMsg()));
}
final String username = config.getString(KEY_USERNAME.key());
final String password = config.getString(KEY_PASSWORD.key());

driverBuilder.setUsername(username);
driverBuilder.setPassword(password);
} else if (config.hasPath(KEY_BEARER_TOKEN.key())) {
final String bearerToken = config.getString(KEY_BEARER_TOKEN.key());
AuthTokens.bearer(bearerToken);
driverBuilder.setBearerToken(bearerToken);
} else {
final String kerberosTicket = config.getString(KEY_KERBEROS_TICKET.key());
AuthTokens.kerberos(kerberosTicket);
driverBuilder.setBearerToken(kerberosTicket);
}

driverBuilder.setDatabase(config.getString(KEY_DATABASE.key()));

if (config.hasPath(KEY_MAX_CONNECTION_TIMEOUT.key())) {
driverBuilder.setMaxConnectionTimeoutSeconds(
config.getLong(KEY_MAX_CONNECTION_TIMEOUT.key()));
}
if (config.hasPath(KEY_MAX_TRANSACTION_RETRY_TIME.key())) {
driverBuilder.setMaxTransactionRetryTimeSeconds(
config.getLong(KEY_MAX_TRANSACTION_RETRY_TIME.key()));
}

return driverBuilder;
}

private String prepareQuery(Config config, PluginType pluginType) {
CheckResult queryConfigCheck = CheckConfigUtil.checkAllExists(config, KEY_QUERY.key());
if (!queryConfigCheck.isSuccess()) {
throw new Neo4jConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
PLUGIN_NAME, pluginType, queryConfigCheck.getMsg()));
}
return config.getString(KEY_QUERY.key());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.neo4j.constants.SinkWriteMode;

import java.util.Map;

Expand All @@ -29,4 +30,16 @@ public class Neo4jSinkConfig extends Neo4jCommonConfig {
.noDefaultValue()
.withDescription(
"position mapping information for query parameters. key name is parameter placeholder name. associated value is position of field in input data row.");

public static final Option<Integer> MAX_BATCH_SIZE =
Options.key("max_batch_size")
.intType()
.defaultValue(500)
.withDescription("neo4j write max batch size");
public static final Option<SinkWriteMode> WRITE_MODE =
Options.key("write_mode")
.enumType(SinkWriteMode.class)
.defaultValue(SinkWriteMode.ONE_BY_ONE)
.withDescription(
"The write mode on the sink end is oneByOne by default in order to maintain compatibility with previous code.");
}
Loading

0 comments on commit 31e0d9b

Please sign in to comment.