Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-v2][Mongodb]Refactor mongodb connector #4620

Merged
merged 12 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 204 additions & 25 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
@@ -1,53 +1,232 @@
# MongoDB

> MongoDB sink connector
> MongoDB Sink Connector

## Description
Support Those Engines
---------------------

Write data to `MongoDB`
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key features
Key Features
------------

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [cdc](../../concept/connector-v2-features.md)

Description
-----------

The MongoDB Connector provides the ability to read and write data from and to MongoDB.
This document describes how to set up the MongoDB connector to run data writers against MongoDB.

Supported DataSource Info
-------------------------

In order to use the Mongodb connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.

| Datasource | Supported Versions | Dependency |
|------------|--------------------|---------------------------------------------------------------------------------------------------------------|
| MongoDB | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-mongodb) |

Data Type Mapping
-----------------

The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.

| Seatunnel Data Type | MongoDB BSON Type |
|---------------------|-------------------|
| STRING | ObjectId |
| STRING | String |
| BOOLEAN | Boolean |
| BINARY | Binary |
| INTEGER | Int32 |
| TINYINT | Int32 |
| SMALLINT | Int32 |
| BIGINT | Int64 |
| DOUBLE | Double |
| FLOAT | Double |
| DECIMAL | Decimal128 |
| Date | Date |
| Timestamp | Timestamp[Date] |
| ROW | Object |
| ARRAY | Array |

**Tips**

> 1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.<br/>
> 2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).<br/>

Sink Options
------------

| Name | Type | Required | Default | Description |
|-----------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------|
| uri | String | Yes | - | The MongoDB connection uri. |
| database | String | Yes | - | The name of MongoDB database to read or write. |
| collection | String | Yes | - | The name of MongoDB collection to read or write. |
| schema | String | Yes | - | MongoDB's BSON and seatunnel data structure mapping |
| buffer-flush.max-rows | String | No | 1000 | Specifies the maximum number of buffered rows per batch request. |
| buffer-flush.interval | String | No | 30000 | Specifies the retry time interval if writing records to database failed, the unit is seconds. |
| retry.max | String | No | 3 | Specifies the max retry times if writing records to database failed. |
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
| upsert-key | List | No | - | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |

**Tips**

> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
> Data flushing will be triggered if any of these conditions are met.<br/>

How to Create a MongoDB Data Synchronization Jobs
-------------------------------------------------

The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:

## Options
```bash
# Set the basic configuration of the task to be performed
env {
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000
}

source {
FakeSource {
row.num = 2
bigint.min = 0
bigint.max = 10000000
split.num = 1
split.read-interval = 300
schema {
fields {
c_bigint = bigint
}
}
}
}

sink {
MongoDB{
uri = mongodb://user:[email protected]:27017
database = "test"
collection = "test"
schema = {
fields {
_id = string
c_bigint = bigint
}
}
}
}
```

Parameter Interpretation
------------------------

**MongoDB Database Connection URI Examples**

Unauthenticated single node connection:

```bash
mongodb://127.0.0.0:27017/mydb
```

Replica set connection:

```bash
mongodb://127.0.0.0:27017/mydb?replicaSet=xxx
```

Authenticated replica set connection:

| name | type | required | default value |
|----------------|--------|----------|---------------|
| uri | string | yes | - |
| database | string | yes | - |
| collection | string | yes | - |
| common-options | config | no | - |
```bash
mongodb://admin:[email protected]:27017/mydb?replicaSet=xxx&authSource=admin
```

Multi-node replica set connection:

```bash
mongodb://127.0.0..1:27017,127.0.0..2:27017,127.0.0.3:27017/mydb?replicaSet=xxx
```

Sharded cluster connection:

```bash
mongodb://127.0.0.0:27017/mydb
```

### uri [string]
Multiple mongos connections:

uri to write to mongoDB
```bash
mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb
```

### database [string]
Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.

database to write to mongoDB
**Buffer Flush**

### collection [string]
```bash
sink {
MongoDB {
uri = "mongodb://user:[email protected]:27017"
database = "test_db"
collection = "users"
buffer-flush.max-rows = 2000
buffer-flush.interval = 1000
schema = {
fields {
_id = string
id = bigint
status = string
}
}
}
}
```

**Why is Not Recommended to Use Transactions for Operation?**

collection to write to mongoDB
Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly.
Transactions are equivalent to locks, node coordination, additional overhead, and performance impact.
Instead, the principle for using transactions should be: avoid using them if possible.
The necessity for using transactions can be greatly avoided by designing systems rationally.

### common options
**Idempotent Writes**

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

## Example
If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.

```bash
mongodb {
uri = "mongodb://username:[email protected]:27017/mypost?retryWrites=true&writeConcern=majority"
database = "mydatabase"
collection = "mycollection"
sink {
MongoDB {
uri = "mongodb://user:[email protected]:27017"
database = "test_db"
collection = "users"
upsert-enable = true
upsert-key = ["name","status"]
schema = {
fields {
_id = string
name = string
status = string
}
}
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26

- Add MongoDB Sink Connector
- Add MongoDB Source Connector

### Next Version

- [Feature]Refactor mongodb source connector([4620](https://github.com/apache/incubator-seatunnel/pull/4620))

Loading