diff --git a/kusto/ingest/ingest.go b/kusto/ingest/ingest.go index 34a4e548..4872828d 100644 --- a/kusto/ingest/ingest.go +++ b/kusto/ingest/ingest.go @@ -9,11 +9,9 @@ import ( "io" "sync" "sync/atomic" - "time" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/data/errors" - "github.com/Azure/azure-kusto-go/kusto/data/table" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/conn" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/filesystem" "github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties" @@ -60,12 +58,6 @@ type Ingestion struct { connMu sync.Mutex streamConn *conn.Conn - - // mappingMu protects mappingNames. - mappingsMu sync.Mutex - // mappings stores mappings on the server. - mappings map[string]mapEntry - lastMappingLookup time.Time } // New is the constructor for Ingestion. @@ -81,12 +73,11 @@ func New(client *kusto.Client, db, table string) (*Ingestion, error) { } i := &Ingestion{ - client: client, - mgr: mgr, - db: db, - table: table, - fs: fs, - mappings: map[string]mapEntry{}, + client: client, + mgr: mgr, + db: db, + table: table, + fs: fs, } return i, nil @@ -351,12 +342,6 @@ func (i *Ingestion) FromFile(ctx context.Context, fPath string, options ...FileO } } - if props.Ingestion.Additional.IngestionMappingRef != "" { - if err := i.haveMappingRef(ctx, props.Ingestion.Additional.IngestionMappingRef); err != nil { - return err - } - } - local, err := filesystem.IsLocalPath(fPath) if err != nil { return err @@ -400,12 +385,6 @@ func (i *Ingestion) FromReader(ctx context.Context, reader io.Reader, options .. return fmt.Errorf("cannot use DeleteLocalSource() with FromReader()") } - if props.Ingestion.Additional.IngestionMappingRef != "" { - if err := i.haveMappingRef(ctx, props.Ingestion.Additional.IngestionMappingRef); err != nil { - return err - } - } - return i.fs.Reader(ctx, reader, props) } @@ -424,10 +403,6 @@ const mib = 1024 * 1024 // https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command // The context object can be used with a timeout or cancel to limit the request time. func (i *Ingestion) Stream(ctx context.Context, payload []byte, format DataFormat, mappingName string) error { - if err := i.haveMappingRef(ctx, mappingName); err != nil { - return err - } - c, err := i.getStreamConn() if err != nil { return err @@ -479,42 +454,3 @@ func (i *Ingestion) newProp(auth string) properties.All { }, } } - -var mapCacheDur = 5 * time.Minute - -func (i *Ingestion) haveMappingRef(ctx context.Context, ref string) error { - i.mappingsMu.Lock() - defer i.mappingsMu.Unlock() - - if time.Now().Sub(i.lastMappingLookup) < mapCacheDur { - if _, ok := i.mappings[ref]; ok { - return nil - } - return errors.ES(errors.OpFileIngest, errors.KClientArgs, "could not find a mapping reference for %q", ref) - } - - iter, err := i.client.Mgmt(ctx, i.db, kusto.NewStmt(".show ingestion mappings")) - if err != nil { - return err - } - m := map[string]mapEntry{} - err = iter.Do( - func(row *table.Row) error { - mapping := mapEntry{} - if err := row.ToStruct(&mapping); err != nil { - return errors.ES(errors.OpFileIngest, errors.KInternal, "problem converting .show ingestion mappings to struct: %s", err) - } - m[mapping.Name] = mapping - return nil - }, - ) - if err != nil { - return err - } - _, ok := m[ref] - if !ok { - return errors.ES(errors.OpFileIngest, errors.KClientArgs, "could not find a mapping reference for %q", ref) - } - i.mappings = m - return nil -}