Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherrypick master fix #176

Merged
merged 10 commits into from
Nov 17, 2023
61 changes: 44 additions & 17 deletions README-CN.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
# 欢迎使用 NebulaGraph Exchange
[English](https://github.com/vesoft-inc/nebula-exchange/blob/master/README.md)

NebulaGraph Exchange(以下简称 Exchange)是一款 Apache Spark™ 应用,用于在分布式环境中将集群中的数据批量迁移到 NebulaGraph 中,它能支持多种不同格式的批式数据和流式数据的迁移,它还支持直接与 SST File 方式的 NebulaGraph 写入。
NebulaGraph Exchange(以下简称 Exchange)是一款 Apache Spark™ 应用,用于在分布式环境中将集群中的数据批量迁移到
NebulaGraph 中,它能支持多种不同格式的批式数据和流式数据的迁移,它还支持直接与 SST File 方式的
NebulaGraph 写入。


Exchange 支持的 Spark 版本包括 2.2、2.4 和 3.0,对应的工具包名分别为 `nebula-exchange_spark_2.2`、`nebula-exchange_spark_2.4` 和 `nebula-exchange_spark_3.0`。
Exchange 支持的 Spark 版本包括 2.2、2.4 和
3.0,对应的工具包名分别为 `nebula-exchange_spark_2.2`、`nebula-exchange_spark_2.4`
和 `nebula-exchange_spark_3.0`。

> 注意:
> - 3.4.0 版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 3.3.0 或 3.5.0 版本。
> - 本仓库仅支持 NebulaGraph 2.x 和 3.x,如果您在使用 NebulaGraph v1.x,请使用 [NebulaExchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange) ,或参考 Exchange 1.0 的使用文档[NebulaExchange 用户手册](https://docs.nebula-graph.com.cn/nebula-exchange/about-exchange/ex-ug-what-is-exchange/ "点击前往 Nebula Graph 网站")。
> - 3.4.0 版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或
3.3.0 或 3.5.0 版本。
> - 本仓库仅支持 NebulaGraph 2.x 和 3.x,如果您在使用 NebulaGraph
v1.x,请使用 [NebulaExchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange)
,或参考 Exchange 1.0
的使用文档[NebulaExchange 用户手册](https://docs.nebula-graph.com.cn/nebula-exchange/about-exchange/ex-ug-what-is-exchange/ "点击前往 Nebula Graph 网站")。

> 注意:3.4.0版本不支持 kafka 和 pulsar, 若需将 kafka 或 pulsar 数据导入 NebulaGraph,请使用 3.0.0 或 3.3.0 或 3.5.0 版本。

## 如何获取

Expand All @@ -24,10 +30,13 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和 3.0,对应的工具包名
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-exchange_spark_3.0 -am -Pscala-2.12 -Pspark-3.0
```

编译打包完成后,可以:
- 在 nebula-exchange/nebula-exchange_spark_2.2/target/ 目录下找到 nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar 文件;
- 在 nebula-exchange/nebula-exchange_spark_2.4/target/ 目录下找到 nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar 文件;
- 以及在 nebula-exchange/nebula-exchange_spark_3.0/target/ 目录下找到 nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar 文件。
编译打包完成后,可以:
- 在 nebula-exchange/nebula-exchange_spark_2.2/target/ 目录下找到
nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar 文件;
- 在 nebula-exchange/nebula-exchange_spark_2.4/target/ 目录下找到
nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar 文件;
- 以及在 nebula-exchange/nebula-exchange_spark_3.0/target/ 目录下找到
nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar 文件。

3. 在官网或 GitHub 下载

Expand All @@ -38,9 +47,17 @@ Exchange 支持的 Spark 版本包括 2.2、2.4 和 3.0,对应的工具包名

**快照版本**

进入[GitHub Actions Artifacts](https://github.com/vesoft-inc/nebula-exchange/actions/workflows/snapshot.yml)页面点击任意 workflow 后,从 Artifacts 中,根据需求下载下载。
进入[GitHub Actions Artifacts](https://github.com/vesoft-inc/nebula-exchange/actions/workflows/snapshot.yml)
页面点击任意 workflow 后,从 Artifacts 中,根据需求下载下载。

## 自动生成示例配置文件

通过如下命令,指定要导入的数据源,即可获得该数据源所对应的配置文件示例。
```agsl
java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s {source} -p
{target-path-to-save-config-file}
```


## 版本匹配

Exchange 和 NebulaGraph 的版本对应关系如下:
Expand Down Expand Up @@ -72,21 +89,27 @@ Exchange 和 NebulaGraph 的版本对应关系如下:

*2. Exchange 2.0 新增 null、Date、DateTime、Time 类型数据的导入( DateTime 是 UTC 时区,非 Local time)。*

*3. Exchange 2.0 支持 Hive on Spark 以外的 Hive 数据源,需在配置文件中配置 Hive 源,具体配置示例参考 [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf) 中 Hive 的配置。*
*3. Exchange 2.0 支持 Hive on Spark 以外的 Hive 数据源,需在配置文件中配置 Hive
源,具体配置示例参考 [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf)
中 Hive 的配置。*

*4. Exchange 2.0 将导入失败的 INSERT 语句进行落盘,存于配置文件的 error/output 路径中。*

*5. Exchange 2.5.0 支持SST导入,但不支持属性的 default 值。*

*6. 配置文件参考 [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf)。*
*6.
配置文件参考 [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf)。*

*7. Exchange 2.0 的导入命令:*

```
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange --master local nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar -c /path/to/application.conf
```

如果数据源有HIVE,则导入命令最后还需要加 `-h` 表示启用HIVE数据源。

注:在Yarn-Cluster模式下提交 Exchange,请使用如下提交命令:

```
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange \
--master yarn-cluster \
Expand All @@ -97,7 +120,9 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \
-c application.conf
```

注:使用 Nebula Exchange 进行 SST 文件生成时,会涉及到 Spark 的 shuffle 操作,请注意在提交命令中增加 spark.sql.shuffle.partition 的配置:
注:使用 Nebula Exchange 进行 SST 文件生成时,会涉及到 Spark 的 shuffle 操作,请注意在提交命令中增加
spark.sql.shuffle.partition 的配置:

```
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange \
--master local \
Expand All @@ -106,12 +131,14 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \
-c application.conf
```

关于 Nebula Exchange 的更多说明,请参考 Exchange 2.0 的[使用手册](https://docs.nebula-graph.com.cn/2.6.2/nebula-exchange/about-exchange/ex-ug-what-is-exchange/) 。
关于 Nebula Exchange 的更多说明,请参考 Exchange 2.0
的[使用手册](https://docs.nebula-graph.com.cn/2.6.2/nebula-exchange/about-exchange/ex-ug-what-is-exchange/) 。

## 贡献

Nebula Exchange 2.0 是一个完全开源的项目,欢迎开源爱好者通过以下方式参与:

- 前往 [Nebula Graph 论坛](https://discuss.nebula-graph.com.cn/ "点击前往“Nebula Graph 论坛") 上参与 Issue 讨论,如答疑、提供想法或者报告无法解决的问题
- 前往 [Nebula Graph 论坛](https://discuss.nebula-graph.com.cn/ "点击前往“Nebula Graph 论坛") 上参与
Issue 讨论,如答疑、提供想法或者报告无法解决的问题
- 撰写或改进文档
- 提交优化代码
103 changes: 68 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
# NebulaGraph Exchange
[中文版](https://github.com/vesoft-inc/nebula-exchange/blob/master/README-CN.md)

NebulaGraph Exchange (referred to as Exchange) is an Apache Spark™ application used to migrate data in bulk from different sources to NebulaGraph in a distributed way(Spark). It supports a variety of batch or streaming data sources and allows direct writing to NebulaGraph through side-loading (SST Files).
[中文版](https://github.com/vesoft-inc/nebula-exchange/blob/master/README-CN.md)

Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective toolkits named: `nebula-exchange_spark_2.2`, `nebula-exchange_spark_2.4`, and `nebula-exchange_spark_3.0`.
NebulaGraph Exchange (referred to as Exchange) is an Apache Spark™ application used to migrate data
in bulk from different sources to NebulaGraph in a distributed way(Spark). It supports a variety of
batch or streaming data sources and allows direct writing to NebulaGraph through side-loading (SST
Files).

Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective toolkits
named: `nebula-exchange_spark_2.2`, `nebula-exchange_spark_2.4`, and `nebula-exchange_spark_3.0`.

> Note:
> - Exchange 3.4.0 does not support Apache Kafka and Apache Pulsar. Please use Exchange of version 3.0.0, 3.3.0, or 3.5.0 to load data from Apache Kafka or Apache Pulsar to NebulaGraph for now.
> - This repo covers only NebulaGraph 2.x and 3.x, for NebulaGraph v1.x, please use [NebulaGraph Exchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange).
> - Exchange 3.4.0 does not support Apache Kafka and Apache Pulsar. Please use Exchange of version
3.0.0, 3.3.0, or 3.5.0 to load data from Apache Kafka or Apache Pulsar to NebulaGraph for now.
> - This repo covers only NebulaGraph 2.x and 3.x, for NebulaGraph v1.x, please
use [NebulaGraph Exchange v1.0](https://github.com/vesoft-inc/nebula-java/tree/v1.0/tools/exchange).

## Build or Download Exchange

Expand All @@ -21,13 +28,16 @@ Exchange supports Spark versions 2.2, 2.4, and 3.0 along with their respective t
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-exchange_spark_3.0 -am -Pscala-2.12 -Pspark-3.0
```

After packaging, the newly generated JAR files can be found in the following path:
- nebula-exchange/nebula-exchange_spark_2.2/target/ contains nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_2.4/target/ contains nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_3.0/target/ contains nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar
After packaging, the newly generated JAR files can be found in the following path:
- nebula-exchange/nebula-exchange_spark_2.2/target/ contains
nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_2.4/target/ contains
nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar
- nebula-exchange/nebula-exchange_spark_3.0/target/ contains
nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar

3. Download from the GitHub artifact

**Released Version:**

[GitHub Releases](https://github.com/vesoft-inc/nebula-exchange/releases)
Expand Down Expand Up @@ -63,7 +73,8 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \
-c application.conf
```

Note: When using Exchange to generate SST files, please add `spark.sql.shuffle.partition` in `--conf` for Spark's shuffle operation:
Note: When using Exchange to generate SST files, please add `spark.sql.shuffle.partition`
in `--conf` for Spark's shuffle operation:

```
$SPARK_HOME/bin/spark-submit --class com.vesoft.nebula.exchange.Exchange \
Expand All @@ -73,38 +84,60 @@ nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar \
-c application.conf
```

For more details, please refer to [NebulaGraph Exchange Docs](https://docs.nebula-graph.io/master/nebula-exchange/about-exchange/ex-ug-what-is-exchange/)
For more details, please refer
to [NebulaGraph Exchange Docs](https://docs.nebula-graph.io/master/nebula-exchange/about-exchange/ex-ug-what-is-exchange/)

## How to get the config file

You can get the template config file with your datasource through the command:

```agsl
java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s {source} -p
{target-path-to-save-config-file}
```

Such as your datasource is csv, and want to save the template config file in /tmp/, please run:

```agsl
java -cp nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar com.vesoft.exchange.common.GenerateConfigTemplate -s csv -p /tmp
```

## Version Compatibility Matrix

Here is the version correspondence between Exchange and NebulaGraph:

| Exchange Version | Nebula Version | Spark Version |
|:------------------------------------------:|:--------------:|:--------------:|
| nebula-exchange-2.0.0.jar | 2.0.0, 2.0.1 |2.4.*|
| nebula-exchange-2.0.1.jar | 2.0.0, 2.0.1 |2.4.*|
| nebula-exchange-2.1.0.jar | 2.0.0, 2.0.1 |2.4.*|
| nebula-exchange-2.5.0.jar | 2.5.0, 2.5.1 |2.4.*|
| nebula-exchange-2.5.1.jar | 2.5.0, 2.5.1 |2.4.*|
| nebula-exchange-2.5.2.jar | 2.5.0, 2.5.1 |2.4.*|
| nebula-exchange-2.6.0.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange-2.6.1.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange-2.6.2.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange-2.6.3.jar | 2.6.0, 2.6.1 |2.4.*|
| nebula-exchange_spark_2.2-3.x.x.jar | 3.x.x |2.2.*|
| nebula-exchange_spark_2.4-3.x.x.jar | 3.x.x |2.4.*|
| nebula-exchange_spark_3.0-3.x.x.jar | 3.x.x |`3.0.*`,`3.1.*`,`3.2.*`,`3.3.*`|
| nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar | nightly |2.2.*|
| nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar | nightly |2.4.*|
| nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar | nightly |`3.0.*`,`3.1.*`,`3.2.*`,`3.3.*`|
| Exchange Version | Nebula Version | Spark Version |
|:------------------------------------------:|:--------------:|:-------------------------------:|
| nebula-exchange-2.0.0.jar | 2.0.0, 2.0.1 | 2.4.* |
| nebula-exchange-2.0.1.jar | 2.0.0, 2.0.1 | 2.4.* |
| nebula-exchange-2.1.0.jar | 2.0.0, 2.0.1 | 2.4.* |
| nebula-exchange-2.5.0.jar | 2.5.0, 2.5.1 | 2.4.* |
| nebula-exchange-2.5.1.jar | 2.5.0, 2.5.1 | 2.4.* |
| nebula-exchange-2.5.2.jar | 2.5.0, 2.5.1 | 2.4.* |
| nebula-exchange-2.6.0.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange-2.6.1.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange-2.6.2.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange-2.6.3.jar | 2.6.0, 2.6.1 | 2.4.* |
| nebula-exchange_spark_2.2-3.x.x.jar | 3.x.x | 2.2.* |
| nebula-exchange_spark_2.4-3.x.x.jar | 3.x.x | 2.4.* |
| nebula-exchange_spark_3.0-3.x.x.jar | 3.x.x | `3.0.*`,`3.1.*`,`3.2.*`,`3.3.*` |
| nebula-exchange_spark_2.2-3.0-SNAPSHOT.jar | nightly | 2.2.* |
| nebula-exchange_spark_2.4-3.0-SNAPSHOT.jar | nightly | 2.4.* |
| nebula-exchange_spark_3.0-3.0-SNAPSHOT.jar | nightly | `3.0.*`,`3.1.*`,`3.2.*`,`3.3.*` |

## Feature History

1. *Since 2.0* Exchange allows for the import of vertex data with both String and Integer type IDs.
2. *Since 2.0* Exchange also supports importing data of various types, including Null, Date, DateTime (using UTC instead of local time), and Time.
3. *Since 2.0* In addition to Hive on Spark, Exchange can import data from other Hive sources as well.
4. *Since 2.0* If there are failures during the data import process, Exchange supports recording and retrying the INSERT statement.
5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet supported.
2. *Since 2.0* Exchange also supports importing data of various types, including Null, Date,
DateTime (using UTC instead of local time), and Time.
3. *Since 2.0* In addition to Hive on Spark, Exchange can import data from other Hive sources as
well.
4. *Since 2.0* If there are failures during the data import process, Exchange supports recording and
retrying the INSERT statement.
5. *Since 2.5* While SST import is supported by Exchange, property default values are not yet
supported.
6. *Since 3.0* Exchange is compatible with Spark 2.2, Spark 2.4, and Spark 3.0.

Refer to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf) as an example to edit the configuration file.
Refer
to [application.conf](https://github.com/vesoft-inc/nebula-exchange/blob/master/exchange-common/src/test/resources/application.conf)
as an example to edit the configuration file.
12 changes: 11 additions & 1 deletion exchange-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>exchange</artifactId>
<groupId>com.vesoft</groupId>
<version>3.6.0</version>
<version>3.6.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -65,6 +65,16 @@
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.16</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright (c) 2023 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.exchange.common;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class FileMigrate {
//Logger log = Logger.getLogger(FileMigrate.class);


/**
* migrate the source file to target path
*
* @param sourceFile template config file
* @param path target path to save the config info
*/
public void saveConfig(String sourceFile, String path) {
InputStream inputStream =
this.getClass().getClassLoader().getResourceAsStream(sourceFile);
if (inputStream == null) {
System.exit(-1);
}
File file = new File(path);
if (file.exists()) {
file.delete();
}
FileWriter writer = null;
BufferedWriter bufferedWriter = null;
BufferedReader reader = null;
try {
writer = new FileWriter(path);
bufferedWriter = new BufferedWriter(writer);

reader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null) {
bufferedWriter.write(line);
bufferedWriter.write("\n");
}
} catch (IOException e) {
System.out.println("Failed to migrate the template conf file:" + e.getMessage());
e.printStackTrace();
} finally {
try {
if (bufferedWriter != null) {
bufferedWriter.close();
}
if (reader != null) {
reader.close();
}
} catch (IOException e) {
System.out.println("Failed to close the writer or reader:" + e.getMessage());
e.printStackTrace();
}
}

}
}
Loading