Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(extractor-plugins): add crawler specifications #478

Merged
merged 12 commits into from
Apr 19, 2023
Merged
39 changes: 22 additions & 17 deletions plugins/extractors/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,39 @@ source:
password: 1234
host: localhost
port: 9042
exclude:
keyspaces: [mykeyspace]
tables: [mykeyspace_2.tableName_1]
```

## Inputs

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `user_id` | `string` | `admin` | User ID to access the cassandra server| *required* |
| `password` | `string` | `1234` | Password for the cassandra Server | *required* |
| `host` | `string` | `127.0.0.1` | The Host address at which server is running | *required* |
| `port` | `int` | `9042` | The Port number at which server is running | *required* |
| Key | Value | Example | Description | |
| :------------------ | :--------- | :---------------------- | :--------------------------------------------- | :--------- |
| `user_id` | `string` | `admin` | User ID to access the cassandra server | _required_ |
| `password` | `string` | `1234` | Password for the cassandra Server | _required_ |
| `host` | `string` | `127.0.0.1` | The Host address at which server is running | _required_ |
| `port` | `int` | `9042` | The Port number at which server is running | _required_ |
| `exclude.keyspcaes` | `[]string` | `[keyspace1,keyspace2]` | List of keyspaces to be excluded from crawling | _optional_ |
| `exclude.tables` | `[]string` | `[keyspace3.table1]` | List of tables to be excluded from crawling | _optional_ |

## Outputs

| Field | Sample Value |
| :---- | :---- |
| `resource.urn` | `my_keyspace.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `cassandra` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][Column](#column) |
| Field | Sample Value |
| :------------------- | :--------------------- |
| `resource.urn` | `my_keyspace.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `cassandra` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][column](#column) |

### Column

| Field | Sample Value |
| :---- | :---- |
| Field | Sample Value |
| :----- | :------------ |
| `name` | `total_price` |
| `type` | `text` |
| `type` | `text` |

## Contributing

Expand Down
29 changes: 23 additions & 6 deletions plugins/extractors/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/gocql/gocql"
"github.com/odpf/meteor/models"
_ "github.com/odpf/meteor/models"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins/sqlutil"

Expand All @@ -37,10 +36,16 @@ const (

// Config holds the set of configuration for the cassandra extractor
type Config struct {
UserID string `json:"user_id" yaml:"user_id" mapstructure:"user_id" validate:"required"`
Password string `json:"password" yaml:"password" mapstructure:"password" validate:"required"`
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
Port int `json:"port" yaml:"port" mapstructure:"port" validate:"required"`
UserID string `json:"user_id" yaml:"user_id" mapstructure:"user_id" validate:"required"`
Password string `json:"password" yaml:"password" mapstructure:"password" validate:"required"`
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
Port int `json:"port" yaml:"port" mapstructure:"port" validate:"required"`
Exclude Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}

type Exclude struct {
Keyspaces []string `json:"keyspaces" yaml:"keyspaces" mapstructure:"keyspaces"`
Tables []string `json:"tables" yaml:"tables" mapstructure:"tables"`
}

var sampleConfig = `
Expand All @@ -61,6 +66,7 @@ var info = plugins.Info{
type Extractor struct {
plugins.BaseExtractor
excludedKeyspaces map[string]bool
excludeTables map[string]bool
logger log.Logger
config Config
session *gocql.Session
Expand All @@ -84,7 +90,9 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)
}

// build excluded database list
e.excludedKeyspaces = sqlutil.BuildBoolMap(defaultKeyspaceList)
excludedKeyspacesList := append(defaultKeyspaceList, e.config.Exclude.Keyspaces...)
e.excludedKeyspaces = sqlutil.BuildBoolMap(excludedKeyspacesList)
e.excludeTables = sqlutil.BuildBoolMap(e.config.Exclude.Tables)

// connect to cassandra
cluster := gocql.NewCluster(e.config.Host)
Expand Down Expand Up @@ -144,6 +152,9 @@ func (e *Extractor) extractTables(keyspace string) (err error) {
if err = scanner.Scan(&tableName); err != nil {
return errors.Wrapf(err, "failed to iterate over %s", tableName)
}
if e.isExcludedTable(keyspace, tableName) {
continue
}
if err = e.processTable(keyspace, tableName); err != nil {
return errors.Wrap(err, "failed to process table")
}
Expand Down Expand Up @@ -212,6 +223,12 @@ func (e *Extractor) isExcludedKeyspace(keyspace string) bool {
return ok
}

func (e *Extractor) isExcludedTable(keyspace, table string) bool {
tableName := fmt.Sprintf("%s.%s", keyspace, table)
_, ok := e.excludeTables[tableName]
return ok
}

// init register the extractor to the catalog
func init() {
if err := registry.Extractors.Register("cassandra", func() plugins.Extractor {
Expand Down
37 changes: 21 additions & 16 deletions plugins/extractors/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,38 @@
source:
name: clickhouse
config:
connection_url: tcp://localhost:3306?username=admin&password=pass123&debug=true
connection_url: clickhouse://username:password@clickhouse-server:9000
exclude:
databases: [database_a, database_b]
tables: [database_c.table_a]
```

## Inputs

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `connection_url` | `string` | `tcp://localhost:3306?username=admin&password=pass123&debug=true` | URL to access the clickhouse server | *required* |
| Key | Value | Example | Description | |
| :------------------ | :--------- | :---------------------------------------------------------------- | :---------------------------------- | :--------- |
| `connection_url` | `string` | `tcp://localhost:3306?username=admin&password=pass123&debug=true` | URL to access the clickhouse server | _required_ |
| `exclude.databases` | `[]string` | `[database_a`, `database_b]` | List of databases to be excluded | _optional_ |
| `exclude.tables` | `[]string` | `[database_c.table_a, database_c.table_b]` | List of tables to be excluded | _optional_ |

## Outputs

| Field | Sample Value |
| :---- | :---- |
| `resource.urn` | `my_database.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `clickhouse` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][Column](#column) |
| Field | Sample Value |
| :------------------- | :--------------------- |
| `resource.urn` | `my_database.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `clickhouse` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][column](#column) |

### Column

| Field | Sample Value |
| :---- | :---- |
| `name` | `total_price` |
| Field | Sample Value |
| :------------ | :------------------- |
| `name` | `total_price` |
| `description` | `item's total price` |
| `data_type` | `String` |
| `data_type` | `String` |

## Contributing

Expand Down
31 changes: 26 additions & 5 deletions plugins/extractors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/meteor/models"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/plugins/sqlutil"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
)
Expand All @@ -22,10 +23,19 @@ var summary string

// Config holds the connection URL for the extractor
type Config struct {
ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
Exclude Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}

var sampleConfig = `connection_url: "tcp://localhost:3306?username=admin&password=pass123&debug=true"`
type Exclude struct {
Databases []string `json:"databases" yaml:"databases" mapstructure:"databases"`
Tables []string `json:"tables" yaml:"tables" mapstructure:"tables"`
}

var sampleConfig = `connection_url: "tcp://localhost:3306?username=admin&password=pass123&debug=true"
exclude:
databases: [database_a, database_b]
tables: [dataset_c.table_a]`

var info = plugins.Info{
Description: "Column-oriented DBMS for online analytical processing.",
Expand All @@ -38,9 +48,11 @@ var info = plugins.Info{
// and logger interface for the extractor
type Extractor struct {
plugins.BaseExtractor
config Config
logger log.Logger
db *sql.DB
config Config
logger log.Logger
excludedDBs map[string]bool
excludedTbl map[string]bool
db *sql.DB
}

// New returns a pointer to an initialized Extractor Object
Expand All @@ -59,6 +71,10 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)
return err
}

// initialize excluded databases and tables
e.excludedDBs = sqlutil.BuildBoolMap(e.config.Exclude.Databases)
e.excludedTbl = sqlutil.BuildBoolMap(e.config.Exclude.Tables)

if e.db, err = sql.Open("clickhouse", e.config.ConnectionURL); err != nil {
return errors.Wrap(err, "failed to create a client")
}
Expand Down Expand Up @@ -91,6 +107,11 @@ func (e *Extractor) extractTables(emit plugins.Emit) (err error) {
return
}

// skip excluded databases and tables
if e.excludedDBs[dbName] || e.excludedTbl[fmt.Sprintf("%s.%s", dbName, tableName)] {
continue
}

var columns []*v1beta2.Column
columns, err = e.getColumnsInfo(dbName, tableName)
if err != nil {
Expand Down
30 changes: 16 additions & 14 deletions plugins/extractors/couchdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,33 @@ source:
name: couchdb
config:
connection_url: http://admin:pass123@localhost:3306/
exclude: database_a,database_b
```

## Inputs

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `connection_url` | `string` | `http://admin:pass123@localhost:3306/` | URL to access the couchdb server | *required* |
| Key | Value | Example | Description | |
| :--------------- | :------- | :------------------------------------- | :--------------------------------------------------------- | :--------- |
| `connection_url` | `string` | `http://admin:pass123@localhost:3306/` | URL to access the couchdb server | _required_ |
| `exclude` | `string` | `primaryDB,secondaryDB` | Comma separated database list to be excluded from crawling | _optional_ |

## Outputs

| Field | Sample Value |
| :---- | :---- |
| `resource.urn` | `database_name.docID` |
| `resource.name` | `docID` |
| `resource.service` | `couchdb` |
| `schema` | [][Column](#column) |
| Field | Sample Value |
| :----------------- | :-------------------- |
| `resource.urn` | `database_name.docID` |
| `resource.name` | `docID` |
| `resource.service` | `couchdb` |
| `schema` | [][column](#column) |

### Column

| Field | Sample Value |
| :---- | :---- |
| `name` | `field1` |
| Field | Sample Value |
| :------------ | :------------------------- |
| `name` | `field1` |
| `description` | `rev for revision history` |
| `data_type` | `float64` |
| `length` | `` |
| `data_type` | `float64` |
| `length` | `` |

## Contributing

Expand Down
26 changes: 11 additions & 15 deletions plugins/extractors/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
_ "embed" // used to print the embedded assets
"fmt"
"reflect"
"strings"

_ "github.com/go-kivik/couchdb"
"github.com/go-kivik/kivik"
"github.com/odpf/meteor/models"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/plugins/sqlutil"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -28,9 +30,12 @@ var defaultDBList = []string{
// Config holds the connection URL for the extractor
type Config struct {
ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
Exclude string `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}

var sampleConfig = `connection_url: http://admin:pass123@localhost:3306/`
var sampleConfig = `
connection_url: http://admin:pass123@localhost:3306/
exclude: database_a,database_b`

var info = plugins.Info{
Description: "Table metadata from CouchDB server,",
Expand Down Expand Up @@ -67,7 +72,8 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)
}

// build excluded database list
e.buildExcludedDBs()
excludedList := append(defaultDBList, strings.Split(e.config.Exclude, ",")...)
e.excludedDbs = sqlutil.BuildBoolMap(excludedList)

// create client
e.client, err = kivik.New("couch", e.config.ConnectionURL)
Expand All @@ -90,6 +96,9 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
}

for _, dbName := range res {
if e.isExcludedDB(dbName) {
continue
}
if err := e.extractTables(ctx, dbName); err != nil {
return err
}
Expand All @@ -99,10 +108,6 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)

// Extract tables from a given database
func (e *Extractor) extractTables(ctx context.Context, dbName string) (err error) {
// skip if database is default
if e.isExcludedDB(dbName) {
return
}
e.db = e.client.DB(ctx, dbName)

// extract documents
Expand Down Expand Up @@ -176,15 +181,6 @@ func (e *Extractor) extractColumns(ctx context.Context, docID string) (columns [
return
}

func (e *Extractor) buildExcludedDBs() {
excludedMap := make(map[string]bool)
for _, db := range defaultDBList {
excludedMap[db] = true
}

e.excludedDbs = excludedMap
}

func (e *Extractor) isExcludedDB(database string) bool {
_, ok := e.excludedDbs[database]
return ok
Expand Down
Loading