Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the discovery flag and method, along with model package #2

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/tap-incident/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ var (
app = kingpin.New("tap-incident", "Extract data from incident.io for use with Singer").Version(versionStanza())

// Global flags
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
configFile = app.Flag("config", "Configuration file").ExistingFile()
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
configFile = app.Flag("config", "Configuration file").ExistingFile()
catalogFile = app.Flag("catalog", "If set, allows filtering which streams would be synced").ExistingFile()
discoveryMode = app.Flag("discover", "If set, only outputs the catalog and exits").Default("false").Bool()
)

func Run(ctx context.Context) (err error) {
Expand Down Expand Up @@ -79,7 +80,11 @@ func Run(ctx context.Context) (err error) {
// can be streamed separately.
ol := tap.NewOutputLogger(os.Stdout)

err = tap.Run(ctx, logger, ol, cl)
if *discoveryMode {
err = tap.Discover(ctx, logger, ol)
} else {
err = tap.Sync(ctx, logger, ol, cl)
}
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/alecthomas/kingpin/v2 v2.3.2
github.com/deepmap/oapi-codegen v1.12.4
github.com/fatih/structs v1.1.0
github.com/ghodss/yaml v1.0.0
github.com/go-kit/log v0.2.0
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/deepmap/oapi-codegen v1.12.4/go.mod h1:3lgHGMu6myQ2vqbbTXH2H1o4eXFTGn
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.12.0 h1:mRhaKNwANqRgUBGKmnI5ZxEk7QXmjQeCcuYFMX2bfcc=
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/getkin/kin-openapi v0.107.0 h1:bxhL6QArW7BXQj8NjXfIJQy680NsMKd25nwhvpCXchg=
github.com/getkin/kin-openapi v0.107.0/go.mod h1:9Dhr+FasATJZjS4iOLvB0hkaxgYdulrNYm2e9epLWOo=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
90 changes: 90 additions & 0 deletions model/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package model

import (
"strings"

"github.com/fatih/structs"
)

// Schema is a JSON schema for a stream.
type Schema struct {
// Type is the type of the schema, e.g. "object" - for some reason singer docs
// have this as an array and often nullable eg: `"type": ["null", "object"]`
Type []string `json:"type"`
// HasAdditionalProperties indicates whether the schema allows additional properties
// not defined in the schema.
HasAdditionalProperties bool `json:"additionalProperties"`
// Properties is a map of property names to their schema.
Properties map[string]Property `json:"properties"`
}

// Property is a property in a JSON schema.
type Property struct {
// Types is a list of types that this property can be, e.g. "string" or "integer".
Types []string `json:"type"`
// CustomFormat is a custom format for this property, e.g. "date-time".
CustomFormat string `json:"format,omitempty"`
// For nested structures a property can have its own properties.
Properties map[string]Property `json:"properties,omitempty"`
// For array structures we define the type of the items in the array
Items *ArrayItem `json:"items,omitempty"`
}

// ArrayItem is the type and properties of an item in an array.
type ArrayItem struct {
Type string `json:"type"`
Properties map[string]Property `json:"properties,omitempty"`
}

func (s Property) IsBoolean() bool {
return s.hasType("boolean")
}

func (s Property) IsNumber() bool {
return s.hasType("number")
}

func (s Property) IsInteger() bool {
return s.hasType("integer")
}

func (s Property) hasType(typeName string) bool {
for _, t := range s.Types {
if strings.EqualFold(t, typeName) {
return true
}
}
return false
}

func (s Property) IsDateTime() bool {
return s.CustomFormat == "date-time"
}

// As a shortcut for simple leaf nodes we can just dump everything (we can also dump everything higher level probably too)
// If we're just going to dump everything why bother with serialisers? Good question.
// a) Initial thoughts were that we want some control on the fields we output potentially - for example
// ignoring deprecated fields.
// b) We also might need to be cleverer when it comes to catalog config that enables / disables optional fields
// in the output.
//
// Keeping this as a single callsite so it's easy to find where we're doing this in future.
func DumpToMap(input interface{}) map[string]any {
structs.DefaultTagName = "json"
return structs.Map(input)
}

func AsOptional(p Property) Property {
p.Types = append(p.Types, "null")
return p
}

func AsArray(p Property) Property {
return Property{
Types: []string{"array"},
Items: &ArrayItem{
Type: "object",
Properties: p.Properties,
},
}
}
43 changes: 43 additions & 0 deletions tap/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package tap

import "github.com/incident-io/singer-tap/model"

// A catalog can contain several streams or "entries"
type CatalogEntry struct {
// Name of the stream
Stream string `json:"stream"`

// Unique identifier for this stream
// Allows for multiple sources that have duplicate stream names
TapStreamID string `json:"tap_stream_id"`

// The given schema for this stream
Schema model.Schema `json:"schema"`

// Optional metadata for this stream
// Metadata *[]Metadata `json:"metadata,omitempty"`
}

// Actual catalog that we export
// contains an array of all our streams
type Catalog struct {
Streams []CatalogEntry `json:"streams"`
}

func NewCatalog(streams map[string]Stream) *Catalog {
entries := []CatalogEntry{}

for name, stream := range streams {
catalogEntry := CatalogEntry{
Stream: name,
TapStreamID: name,
Schema: *stream.Output().Schema,
}

entries = append(entries, catalogEntry)
}

return &Catalog{
Streams: entries,
}
}
63 changes: 17 additions & 46 deletions tap/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/incident-io/singer-tap/model"
)

// OutputType is the type of Singer tap output for each message.
Expand All @@ -29,7 +30,7 @@ type Output struct {
// Stream is the name of the stream, e.g. "users"
Stream string `json:"stream,omitempty"`
// Schema is the schema of the stream, if Type == "SCHEMA", in JSON schema format.
Schema *Schema `json:"schema,omitempty"`
Schema *model.Schema `json:"schema,omitempty"`
// Record is a record from the stream, if Type == "RECORD".
Record map[string]any `json:"record,omitempty"`
// TimeExtracted is the time that this record was extracted, if Type == "RECORD".
Expand All @@ -46,50 +47,6 @@ type Output struct {
BookmarkProperties []string `json:"bookmark_properties,omitempty"`
}

// Schema is a JSON schema for a stream.
type Schema struct {
// Type is the type of the schema, e.g. "object"
Type []string `json:"type"`
// HasAdditionalProperties indicates whether the schema allows additional properties
// not defined in the schema.
HasAdditionalProperties bool `json:"additionalProperties"`
// Properties is a map of property names to their schema.
Properties map[string]Property `json:"properties"`
}

// Property is a property in a JSON schema.
type Property struct {
// Types is a list of types that this property can be, e.g. "string" or "integer".
Types []string `json:"type"`
// CustomFormat is a custom format for this property, e.g. "date-time".
CustomFormat string `json:"format,omitempty"`
}

func (s Property) IsBoolean() bool {
return s.hasType("boolean")
}

func (s Property) IsNumber() bool {
return s.hasType("number")
}

func (s Property) IsInteger() bool {
return s.hasType("integer")
}

func (s Property) hasType(typeName string) bool {
for _, t := range s.Types {
if strings.EqualFold(t, typeName) {
return true
}
}
return false
}

func (s Property) IsDateTime() bool {
return s.CustomFormat == "date-time"
}

// OutputLogger is a logger that logs to STDOUT in the format expected by the downstream
// Singer target.
type OutputLogger struct {
Expand All @@ -113,3 +70,17 @@ func (o *OutputLogger) Log(op *Output) error {

return nil
}

func (o *OutputLogger) CataLog(catalog *Catalog) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (o *OutputLogger) CataLog(catalog *Catalog) error {
func (o *OutputLogger) Catalog(catalog *Catalog) error {

..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a terrible joke from me - it's a catalog log 😢

data, err := json.Marshal(catalog)
if err != nil {
return err
}

_, err = fmt.Fprintln(o.w, string(data))
if err != nil {
return err
}

return nil
}
5 changes: 3 additions & 2 deletions tap/stream_incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

kitlog "github.com/go-kit/log"
"github.com/incident-io/singer-tap/client"
"github.com/incident-io/singer-tap/model"
"github.com/pkg/errors"
"github.com/samber/lo"
)
Expand All @@ -20,9 +21,9 @@ func (s *StreamIncidents) Output() *Output {
return &Output{
Type: OutputTypeSchema,
Stream: "incidents",
Schema: &Schema{
Schema: &model.Schema{
Type: []string{"object"},
Properties: map[string]Property{
Properties: map[string]model.Property{
"id": {
Types: []string{"string"},
},
Expand Down
12 changes: 11 additions & 1 deletion tap/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/incident-io/singer-tap/client"
)

func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
func Sync(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client.ClientWithResponses) error {
for name, stream := range streams {
logger := kitlog.With(logger, "stream", name)

Expand Down Expand Up @@ -40,3 +40,13 @@ func Run(ctx context.Context, logger kitlog.Logger, ol *OutputLogger, cl *client

return nil
}

func Discover(ctx context.Context, logger kitlog.Logger, ol *OutputLogger) error {
catalog := NewCatalog(streams)

if err := ol.CataLog(catalog); err != nil {
return err
}

return nil
}