Skip to content

Commit

Permalink
Merge pull request #2 from incident-io/rob/add-discovery-mode-and-mod…
Browse files Browse the repository at this point in the history
…el-basics

Add the discovery flag and method, along with model package
  • Loading branch information
rliddler authored Oct 18, 2023
2 parents cbc6956 + 01e72ec commit 413c5c4
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 53 deletions.
13 changes: 9 additions & 4 deletions cmd/tap-incident/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ var (
app = kingpin.New("tap-incident", "Extract data from incident.io for use with Singer").Version(versionStanza())

// Global flags
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
configFile = app.Flag("config", "Configuration file").ExistingFile()
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
configFile = app.Flag("config", "Configuration file").ExistingFile()
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
discoveryMode = app.Flag("discover", "If set, only outputs the catalog and exits").Default("false").Bool()
)

func Run(ctx context.Context) (err error) {
Expand Down Expand Up @@ -79,7 +80,11 @@ func Run(ctx context.Context) (err error) {
// can be streamed separately.
ol := tap.NewOutputLogger(os.Stdout)

err = tap.Run(ctx, logger, ol, cl)
if *discoveryMode {
err = tap.Discover(ctx, logger, ol)
} else {
err = tap.Sync(ctx, logger, ol, cl)
}
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/alecthomas/kingpin/v2 v2.3.2
github.com/deepmap/oapi-codegen v1.12.4
github.com/fatih/structs v1.1.0
github.com/ghodss/yaml v1.0.0
github.com/go-kit/log v0.2.0
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/deepmap/oapi-codegen v1.12.4/go.mod h1:3lgHGMu6myQ2vqbbTXH2H1o4eXFTGn
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/getkin/kin-openapi v0.107.0 h1:bxhL6QArW7BXQj8NjXfIJQy680NsMKd25nwhvpCXchg=
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
90 changes: 90 additions & 0 deletions model/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package model

import (
"strings"

"github.com/fatih/structs"
)

// Schema is a JSON schema for a stream.
type Schema struct {
// Type is the type of the schema, e.g. "object" - for some reason singer docs
// have this as an array and often nullable eg: `"type": ["null", "object"]`
Type []string `json:"type"`
// HasAdditionalProperties indicates whether the schema allows additional properties
// not defined in the schema.
HasAdditionalProperties bool `json:"additionalProperties"`
// Properties is a map of property names to their schema.
Properties map[string]Property `json:"properties"`
}

// Property is a property in a JSON schema.
type Property struct {
// Types is a list of types that this property can be, e.g. "string" or "integer".
Types []string `json:"type"`
// CustomFormat is a custom format for this property, e.g. "date-time".
CustomFormat string `json:"format,omitempty"`
// For nested structures a property can have its own properties.
Properties map[string]Property `json:"properties,omitempty"`
// For array structures we define the type of the items in the array
Items *ArrayItem `json:"items,omitempty"`
}

// ArrayItem is the type and properties of an item in an array.
type ArrayItem struct {
Type string `json:"type"`
Properties map[string]Property `json:"properties,omitempty"`
}

func (s Property) IsBoolean() bool {
return s.hasType("boolean")
}

func (s Property) IsNumber() bool {
return s.hasType("number")
}

func (s Property) IsInteger() bool {
return s.hasType("integer")
}

func (s Property) hasType(typeName string) bool {
for _, t := range s.Types {
if strings.EqualFold(t, typeName) {
return true
}
}
return false
}

func (s Property) IsDateTime() bool {
return s.CustomFormat == "date-time"
}

// As a shortcut for simple leaf nodes we can just dump everything (we can also dump everything higher level probably too)
// If we're just going to dump everything why bother with serialisers? Good question.
// a) Initial thoughts were that we want some control on the fields we output potentially - for example
// ignoring deprecated fields.
// b) We also might need to be cleverer when it comes to catalog config that enables / disables optional fields
// in the output.
//
// Keeping this as a single callsite so it's easy to find where we're doing this in future.
func DumpToMap(input interface{}) map[string]any {
structs.DefaultTagName = "json"
return structs.Map(input)
}

func AsOptional(p Property) Property {
p.Types = append(p.Types, "null")
return p
}

func AsArray(p Property) Property {
return Property{
Types: []string{"array"},
Items: &ArrayItem{
Type: "object",
Properties: p.Properties,
},
}
}
43 changes: 43 additions & 0 deletions tap/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tap

import "github.com/incident-io/singer-tap/model"

// A catalog can contain several streams or "entries"
type CatalogEntry struct {
// Name of the stream
Stream string `json:"stream"`

// Unique identifier for this stream
// Allows for multiple sources that have duplicate stream names
TapStreamID string `json:"tap_stream_id"`

// The given schema for this stream
Schema model.Schema `json:"schema"`

// Optional metadata for this stream
// Metadata *[]Metadata `json:"metadata,omitempty"`
}

// Actual catalog that we export
// contains an array of all our streams
type Catalog struct {
Streams []CatalogEntry `json:"streams"`
}

func NewCatalog(streams map[string]Stream) *Catalog {
entries := []CatalogEntry{}

for name, stream := range streams {
catalogEntry := CatalogEntry{
Stream: name,
TapStreamID: name,
Schema: *stream.Output().Schema,
}

entries = append(entries, catalogEntry)
}

return &Catalog{
Streams: entries,
}
}
63 changes: 17 additions & 46 deletions tap/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/incident-io/singer-tap/model"
)

// OutputType is the type of Singer tap output for each message.
Expand All @@ -29,7 +30,7 @@ type Output struct {
// Stream is the name of the stream, e.g. "users"
Stream string `json:"stream,omitempty"`
// Schema is the schema of the stream, if Type == "SCHEMA", in JSON schema format.
Schema *Schema `json:"schema,omitempty"`
Schema *model.Schema `json:"schema,omitempty"`
// Record is a record from the stream, if Type == "RECORD".
Record map[string]any `json:"record,omitempty"`
// TimeExtracted is the time that this record was extracted, if Type == "RECORD".
Expand All @@ -46,50 +47,6 @@ type Output struct {
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
}

// Schema is a JSON schema for a stream.
type Schema struct {
// Type is the type of the schema, e.g. "object"
Type []string `json:"type"`
// HasAdditionalProperties indicates whether the schema allows additional properties
// not defined in the schema.
HasAdditionalProperties bool `json:"additionalProperties"`
// Properties is a map of property names to their schema.
Properties map[string]Property `json:"properties"`
}

// Property is a property in a JSON schema.
type Property struct {
// Types is a list of types that this property can be, e.g. "string" or "integer".
Types []string `json:"type"`
// CustomFormat is a custom format for this property, e.g. "date-time".
CustomFormat string `json:"format,omitempty"`
}

func (s Property) IsBoolean() bool {
return s.hasType("boolean")
}

func (s Property) IsNumber() bool {
return s.hasType("number")
}

func (s Property) IsInteger() bool {
return s.hasType("integer")
}

func (s Property) hasType(typeName string) bool {
for _, t := range s.Types {
if strings.EqualFold(t, typeName) {
return true
}
}
return false
}

func (s Property) IsDateTime() bool {
return s.CustomFormat == "date-time"
}

// OutputLogger is a logger that logs to STDOUT in the format expected by the downstream
// Singer target.
type OutputLogger struct {
Expand All @@ -113,3 +70,17 @@ func (o *OutputLogger) Log(op *Output) error {

return nil
}

func (o *OutputLogger) CataLog(catalog *Catalog) error {
data, err := json.Marshal(catalog)
if err != nil {
return err
}

_, err = fmt.Fprintln(o.w, string(data))
if err != nil {
return err
}

return nil
}
5 changes: 3 additions & 2 deletions tap/stream_incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

kitlog "github.com/go-kit/log"
"github.com/incident-io/singer-tap/client"
"github.com/incident-io/singer-tap/model"
"github.com/pkg/errors"
"github.com/samber/lo"
)
Expand All @@ -20,9 +21,9 @@ func (s *StreamIncidents) Output() *Output {
return &Output{
Type: OutputTypeSchema,
Stream: "incidents",
Schema: &Schema{
Schema: &model.Schema{
Type: []string{"object"},
Properties: map[string]Property{
Properties: map[string]model.Property{
"id": {
Types: []string{"string"},
},
Expand Down
12 changes: 11 additions & 1 deletion tap/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/incident-io/singer-tap/client"
)

func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
for name, stream := range streams {
logger := kitlog.With(logger, "stream", name)

Expand Down Expand Up @@ -40,3 +40,13 @@ func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client

return nil
}

func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error {
catalog := NewCatalog(streams)

if err := ol.CataLog(catalog); err != nil {
return err
}

return nil
}

0 comments on commit 413c5c4

Please sign in to comment.