Skip to content

Commit

Permalink
Removes show ingestion mappings calls (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
element-of-surprise authored Sep 27, 2020
1 parent d5bf814 commit 6b08777
Showing 1 changed file with 5 additions and 69 deletions.
74 changes: 5 additions & 69 deletions kusto/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import (
"io"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/data/table"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/conn"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/filesystem"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
Expand Down Expand Up @@ -60,12 +58,6 @@ type Ingestion struct {

connMu sync.Mutex
streamConn *conn.Conn

// mappingMu protects mappingNames.
mappingsMu sync.Mutex
// mappings stores mappings on the server.
mappings map[string]mapEntry
lastMappingLookup time.Time
}

// New is the constructor for Ingestion.
Expand All @@ -81,12 +73,11 @@ func New(client *kusto.Client, db, table string) (*Ingestion, error) {
}

i := &Ingestion{
client: client,
mgr: mgr,
db: db,
table: table,
fs: fs,
mappings: map[string]mapEntry{},
client: client,
mgr: mgr,
db: db,
table: table,
fs: fs,
}

return i, nil
Expand Down Expand Up @@ -351,12 +342,6 @@ func (i *Ingestion) FromFile(ctx context.Context, fPath string, options ...FileO
}
}

if props.Ingestion.Additional.IngestionMappingRef != "" {
if err := i.haveMappingRef(ctx, props.Ingestion.Additional.IngestionMappingRef); err != nil {
return err
}
}

local, err := filesystem.IsLocalPath(fPath)
if err != nil {
return err
Expand Down Expand Up @@ -400,12 +385,6 @@ func (i *Ingestion) FromReader(ctx context.Context, reader io.Reader, options ..
return fmt.Errorf("cannot use DeleteLocalSource() with FromReader()")
}

if props.Ingestion.Additional.IngestionMappingRef != "" {
if err := i.haveMappingRef(ctx, props.Ingestion.Additional.IngestionMappingRef); err != nil {
return err
}
}

return i.fs.Reader(ctx, reader, props)
}

Expand All @@ -424,10 +403,6 @@ const mib = 1024 * 1024
// https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command
// The context object can be used with a timeout or cancel to limit the request time.
func (i *Ingestion) Stream(ctx context.Context, payload []byte, format DataFormat, mappingName string) error {
if err := i.haveMappingRef(ctx, mappingName); err != nil {
return err
}

c, err := i.getStreamConn()
if err != nil {
return err
Expand Down Expand Up @@ -479,42 +454,3 @@ func (i *Ingestion) newProp(auth string) properties.All {
},
}
}

var mapCacheDur = 5 * time.Minute

func (i *Ingestion) haveMappingRef(ctx context.Context, ref string) error {
i.mappingsMu.Lock()
defer i.mappingsMu.Unlock()

if time.Now().Sub(i.lastMappingLookup) < mapCacheDur {
if _, ok := i.mappings[ref]; ok {
return nil
}
return errors.ES(errors.OpFileIngest, errors.KClientArgs, "could not find a mapping reference for %q", ref)
}

iter, err := i.client.Mgmt(ctx, i.db, kusto.NewStmt(".show ingestion mappings"))
if err != nil {
return err
}
m := map[string]mapEntry{}
err = iter.Do(
func(row *table.Row) error {
mapping := mapEntry{}
if err := row.ToStruct(&mapping); err != nil {
return errors.ES(errors.OpFileIngest, errors.KInternal, "problem converting .show ingestion mappings to struct: %s", err)
}
m[mapping.Name] = mapping
return nil
},
)
if err != nil {
return err
}
_, ok := m[ref]
if !ok {
return errors.ES(errors.OpFileIngest, errors.KClientArgs, "could not find a mapping reference for %q", ref)
}
i.mappings = m
return nil
}

0 comments on commit 6b08777

Please sign in to comment.