Skip to content

Commit

Permalink
[Improve][SourceConnector] Unifie InfluxDB source fields to schema (#…
Browse files Browse the repository at this point in the history
…3897)

* Unifie InfluxDB source fields to schema
  • Loading branch information
wfrong authored Jan 17, 2023
1 parent b4348f6 commit 85a984a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 32 deletions.
39 changes: 21 additions & 18 deletions docs/en/connector-v2/source/InfluxDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ supports query SQL and can achieve projection effect.
|--------------------|--------|----------|---------------|
| url | string | yes | - |
| sql | string | yes | - |
| fields | config | yes | - |
| schema | config | yes | - |
| database | string | yes | |
| username | string | no | - |
| password | string | no | - |
Expand All @@ -51,19 +51,20 @@ The query sql used to search data
select name,age from test
```

### fields [string]
### schema [config]

the fields of the InfluxDB when you select

the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType`
#### fields [Config]

The schema information of upstream data.
e.g.

```
fields{
name=STRING
age=INT
schema {
fields {
name = string
age = int
}
}
```

### database [string]
Expand Down Expand Up @@ -147,11 +148,12 @@ source {
lower_bound = 1
partition_num = 4
split_column = "value"
fields {
label = STRING
value = INT
rt = STRING
time = BIGINT
schema {
fields {
label = STRING
value = INT
rt = STRING
time = BIGINT
}
}
Expand All @@ -166,11 +168,12 @@ source {
url = "http://influxdb-host:8086"
sql = "select label, value, rt, time from test"
database = "test"
fields {
label = STRING
value = INT
rt = STRING
time = BIGINT
schema {
fields {
label = STRING
value = INT
rt = STRING
time = BIGINT
}
}
Expand Down
1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- [JDBC]Improve option check rule
- [JDBC]Support SAP HANA. (#3017)
- [MongoDB]Add source query capability #3697
- [InfluxDB]Unifie InfluxDB source fields to schema #3897
- [File]Fix file source connector option rule bug #3804
- [File]Add lzo compression way
- [Kafka]Fix Source failed to parse offset format #3810
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public String getPluginName() {

@Override
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key(), SeaTunnelSchema.SCHEMA.key());
if (!result.isSuccess()) {
throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
Expand All @@ -82,8 +82,8 @@ public void prepare(Config config) throws PrepareFailException {
}
try {
this.sourceConfig = SourceConfig.loadConfig(config);
SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config);
this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
Config schema = config.getConfig(SeaTunnelSchema.SCHEMA.key());
this.typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
} catch (Exception e) {
throw new InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;

import com.google.auto.service.AutoService;

Expand All @@ -50,7 +51,8 @@ public OptionRule optionRule() {
.required(
URL,
SQL,
DATABASES
DATABASES,
SeaTunnelSchema.SCHEMA
)
.bundled(USERNAME, PASSWORD)
.bundled(LOWER_BOUND, UPPER_BOUND, PARTITION_NUM, SPLIT_COLUMN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ source {
lower_bound = 0
partition_num = 4
split_column = "c_int"
fields {
label = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
schema {
fields {
label = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
}
}

Expand Down

0 comments on commit 85a984a

Please sign in to comment.