Skip to content

Commit

Permalink
Fix code style and Fix e2e bug and Optimize program logic
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo committed May 6, 2023
1 parent ff55078 commit b972922
Show file tree
Hide file tree
Showing 28 changed files with 400 additions and 679 deletions.
39 changes: 8 additions & 31 deletions docs/en/connector-v2/sink/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Tips:
Connector Options
-----------------

| Option | Required | Default | Type | Description |
| Option | Required | Default | Type | Description |
|-----------------------|----------|---------|----------|----------------------------------------------------------------------------------------------------------------|
| uri | required | (none) | String | The MongoDB connection uri. |
| database | required | (none) | String | The name of MongoDB database to read or write. |
Expand All @@ -76,7 +76,6 @@ Connector Options
| retry.interval | optional | 1000 | Duration | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
| upsert-enable | optional | false | Boolean | Whether to write documents via upsert mode. |
| upsert-key | optional | (none) | List | The primary keys for upsert. Only valid in upsert mode. Keys are in `["id","name",...]` format for properties. |
| transaction-enable | optional | false | Boolean | Turn on the Mongodb transaction to ensure the atomicity of the write and update operation through 2pc. |

How to create a MongoDB Data synchronization jobs
-------------------------------------------------
Expand Down Expand Up @@ -185,40 +184,19 @@ sink {
}
```

**Idempotent Writes**

By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.

```bash
sink {
MongoDB {
uri = "mongodb://user:[email protected]:27017"
database = "test_db"
collection = "users"
upsert-enable = true
upsert-key = ["name","status"]
schema = {
fields {
_id = string
name = string
status = string
}
}
}
}
```

**Why is it not recommended to use transactions for operation?**

Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly.
Transactions are equivalent to locks, node coordination, additional overhead, and performance impact.
Instead, the principle for using transactions should be: avoid using them if possible.
The necessity for using transactions can be greatly avoided by designing systems rationally.

**Use transactions for insertion and update operations**
**Idempotent Writes**

By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

If upsert-key is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes.
In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.

```bash
sink {
Expand All @@ -227,8 +205,7 @@ sink {
database = "test_db"
collection = "users"
upsert-enable = true
transaction-enable = true
upsert-key = ["name"]
upsert-key = ["name","status"]
schema = {
fields {
_id = string
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/MongoDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ tips:
Connector Options
-----------------

| Option | Required | Default | Type | Description |
| Option | Required | Default | Type | Description |
|----------------------|----------|------------------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| uri | required | (none) | String | The MongoDB connection uri. |
| database | required | (none) | String | The name of MongoDB database to read or write. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,4 @@ public class MongodbConfig {
.noDefaultValue()
.withDescription(
"The primary keys for upsert. Only valid in upsert mode. Keys are in csv format for properties.");

public static final Option<Boolean> TRANSACTION_ENABLE =
Options.key("transaction-enable")
.booleanType()
.defaultValue(false)
.withDescription("Turn on the transaction submission of mongodb.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.bson.BsonDocument;
import org.bson.BsonType;
import org.bson.BsonValue;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.bson.types.Decimal128;

import java.io.Serializable;
Expand Down Expand Up @@ -384,7 +386,9 @@ private static String convertToString(BsonValue bsonValue) {
return bsonValue.asObjectId().getValue().toHexString();
}
if (bsonValue.isDocument()) {
return bsonValue.asDocument().toJson(DEFAULT_JSON_WRITER_SETTINGS);
return bsonValue
.asDocument()
.toJson(JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build());
}
return new BsonDocument(ENCODE_VALUE_FIELD, bsonValue).toJson(DEFAULT_JSON_WRITER_SETTINGS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@

public interface DocumentSerializer<T> extends Serializable {

WriteModel<BsonDocument> serialize(T object);
WriteModel<BsonDocument> serializeToWriteModel(T object);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public RowDataDocumentSerializer(
}

@Override
public WriteModel<BsonDocument> serialize(SeaTunnelRow row) {
public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
final BsonDocument bsonDocument = rowDataToBsonConverter.convert(row);
if (isUpsertEnable) {
Bson filter = generateFilter(filterConditions.apply(bsonDocument));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,27 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.config.MongodbWriterOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbSinkState;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.writer.MongodbBulkWriter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.writer.MongodbWriter;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;

@AutoService(SeaTunnelSink.class)
public class MongodbSink
implements SeaTunnelSink<
SeaTunnelRow, MongodbSinkState, MongodbCommitInfo, MongodbAggregatedCommitInfo> {
public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private MongodbWriterOptions options;

Expand Down Expand Up @@ -97,12 +88,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
pluginConfig.hasPath(MongodbConfig.RETRY_INTERVAL.key())
? pluginConfig.getLong(
MongodbConfig.RETRY_INTERVAL.key())
: MongodbConfig.RETRY_INTERVAL.defaultValue())
.withTransactionEnable(
pluginConfig.hasPath(MongodbConfig.TRANSACTION_ENABLE.key())
? pluginConfig.getBoolean(
MongodbConfig.TRANSACTION_ENABLE.key())
: MongodbConfig.TRANSACTION_ENABLE.defaultValue());
: MongodbConfig.RETRY_INTERVAL.defaultValue());
this.options = builder.build();
}
}
Expand All @@ -123,46 +109,13 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
}

@Override
public SinkWriter<SeaTunnelRow, MongodbCommitInfo, MongodbSinkState> createWriter(
SinkWriter.Context context) throws IOException {
return new MongodbBulkWriter(
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new MongodbWriter(
new RowDataDocumentSerializer(
RowDataToBsonConverters.createConverter(seaTunnelRowType),
options,
new MongoKeyExtractor(options)),
options);
}

@Override
public SinkWriter<SeaTunnelRow, MongodbCommitInfo, MongodbSinkState> restoreWriter(
SinkWriter.Context context, List<MongodbSinkState> states) throws IOException {
return new MongodbBulkWriter(
new RowDataDocumentSerializer(
RowDataToBsonConverters.createConverter(seaTunnelRowType),
options,
new MongoKeyExtractor(options)),
options,
states);
}

@Override
public Optional<Serializer<MongodbSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<SinkAggregatedCommitter<MongodbCommitInfo, MongodbAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
return Optional.of(new MongodbSinkAggregatedCommitter(options));
}

@Override
public Optional<Serializer<MongodbAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit b972922

Please sign in to comment.