Skip to content

Commit

Permalink
Merge pull request #7 from lyft/partitions
Browse files Browse the repository at this point in the history
Support Dataset Partitions
  • Loading branch information
chanadian authored Oct 29, 2019
2 parents 3bcb852 + c17572c commit 4b2b3b0
Show file tree
Hide file tree
Showing 25 changed files with 721 additions and 127 deletions.
10 changes: 10 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package errors

import (
"fmt"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -44,6 +45,15 @@ func NewDataCatalogErrorf(code codes.Code, format string, a ...interface{}) erro
return NewDataCatalogError(code, fmt.Sprintf(format, a...))
}

func NewCollectedErrors(code codes.Code, errors []error) error {
errorCollection := make([]string, len(errors))
for idx, err := range errors {
errorCollection[idx] = err.Error()
}

return NewDataCatalogError(code, strings.Join((errorCollection), ", "))
}

func IsAlreadyExistsError(err error) bool {
dcErr, ok := err.(DataCatalogError)
return ok && dcErr.GRPCStatus().Code() == codes.AlreadyExists
Expand Down
28 changes: 19 additions & 9 deletions pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@ package errors
import (
"testing"

"fmt"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestAlreadyExists(t *testing.T) {
func TestErrorHelpers(t *testing.T) {
alreadyExistsErr := NewDataCatalogError(codes.AlreadyExists, "already exists")
notFoundErr := NewDataCatalogError(codes.NotFound, "not found")
assert.True(t, IsAlreadyExistsError(alreadyExistsErr))
assert.False(t, IsAlreadyExistsError(notFoundErr))
}

func TestNotFoundErr(t *testing.T) {
alreadyExistsErr := NewDataCatalogError(codes.AlreadyExists, "already exists")
notFoundErr := NewDataCatalogError(codes.NotFound, "not found")
assert.False(t, IsDoesNotExistError(alreadyExistsErr))
assert.True(t, IsDoesNotExistError(notFoundErr))
t.Run("TestAlreadyExists", func(t *testing.T) {
assert.True(t, IsAlreadyExistsError(alreadyExistsErr))
assert.False(t, IsAlreadyExistsError(notFoundErr))
})

t.Run("TestNotFoundErr", func(t *testing.T) {
assert.False(t, IsDoesNotExistError(alreadyExistsErr))
assert.True(t, IsDoesNotExistError(notFoundErr))
})

t.Run("TestCollectErrs", func(t *testing.T) {
collectedErr := NewCollectedErrors(codes.InvalidArgument, []error{alreadyExistsErr, notFoundErr})
assert.EqualValues(t, status.Code(collectedErr), codes.InvalidArgument)
assert.Equal(t, collectedErr.Error(), fmt.Sprintf("%s, %s", alreadyExistsErr.Error(), notFoundErr.Error()))
})
}
14 changes: 12 additions & 2 deletions pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,23 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request datacatalo
datasetKey := transformers.FromDatasetID(*artifact.Dataset)

// The dataset must exist for the artifact, let's verify that first
_, err = m.repo.DatasetRepo().Get(ctx, datasetKey)
dataset, err := m.repo.DatasetRepo().Get(ctx, datasetKey)
if err != nil {
logger.Warnf(ctx, "Failed to get dataset for artifact creation %v, err: %v", datasetKey, err)
m.systemMetrics.createFailureCounter.Inc(ctx)
return nil, err
}

// TODO: when adding a tag, need to verify one tag per partition combo
// check that the artifact's partitions are the same partition values of the dataset
datasetPartitionKeys := transformers.FromPartitionKeyModel(dataset.PartitionKeys)
err = validators.ValidatePartitions(datasetPartitionKeys, artifact.Partitions)
if err != nil {
logger.Warnf(ctx, "Invalid artifact partitions %v, err: %+v", artifact.Partitions, err)
m.systemMetrics.createFailureCounter.Inc(ctx)
return nil, err
}

// create Artifact Data offloaded storage files
artifactDataModels := make([]models.ArtifactData, len(request.Artifact.Data))
for i, artifactData := range request.Artifact.Data {
Expand All @@ -84,7 +94,7 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request datacatalo

logger.Debugf(ctx, "Stored %v data for artifact %+v", len(artifactDataModels), artifact.Id)

artifactModel, err := transformers.CreateArtifactModel(request, artifactDataModels)
artifactModel, err := transformers.CreateArtifactModel(request, artifactDataModels, dataset)
if err != nil {
logger.Errorf(ctx, "Failed to transform artifact err: %v", err)
m.systemMetrics.transformerErrorCounter.Inc(ctx)
Expand Down
140 changes: 107 additions & 33 deletions pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"fmt"

"github.com/golang/protobuf/proto"
"github.com/lyft/datacatalog/pkg/errors"
"github.com/lyft/datacatalog/pkg/repositories/mocks"
Expand Down Expand Up @@ -64,6 +66,10 @@ func getTestArtifact() *datacatalog.Artifact {
Value: getTestStringLiteral(),
},
},
Partitions: []*datacatalog.Partition{
{Key: "key1", Value: "value1"},
{Key: "key2", Value: "value2"},
},
}
}

Expand All @@ -85,25 +91,35 @@ func TestCreateArtifact(t *testing.T) {
testStoragePrefix, err := datastore.ConstructReference(ctx, datastore.GetBaseContainerFQN(ctx), "test")
assert.NoError(t, err)

// Mock dataset to return for artifact lookups
expectedDataset := getTestDataset()
mockDatasetModel := models.Dataset{
DatasetKey: models.DatasetKey{
Project: expectedDataset.Id.Project,
Domain: expectedDataset.Id.Domain,
Name: expectedDataset.Id.Name,
Version: expectedDataset.Id.Version,
UUID: expectedDataset.Id.UUID,
},
PartitionKeys: []models.PartitionKey{
{Name: expectedDataset.PartitionKeys[0]},
{Name: expectedDataset.PartitionKeys[1]},
},
}

t.Run("HappyPath", func(t *testing.T) {
datastore := createInmemoryDataStore(t, mockScope.NewTestScope())
expectedDataset := getTestDataset()

ctx := context.Background()
dcRepo := newMockDataCatalogRepo()
dcRepo.MockDatasetRepo.On("Get", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
expectedDataset := getTestDataset()
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
})).Return(models.Dataset{
DatasetKey: models.DatasetKey{
Project: getTestDataset().Id.Project,
Domain: getTestDataset().Id.Domain,
Name: getTestDataset().Id.Name,
Version: getTestDataset().Id.Version,
},
}, nil)
})).Return(mockDatasetModel, nil)

dcRepo.MockArtifactRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
Expand All @@ -115,7 +131,13 @@ func TestCreateArtifact(t *testing.T) {
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifact.ArtifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifact.ArtifactKey.DatasetName == expectedArtifact.Dataset.Name &&
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version
artifact.ArtifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
artifact.Partitions[0].Key == expectedArtifact.Partitions[0].Key &&
artifact.Partitions[0].Value == expectedArtifact.Partitions[0].Value &&
artifact.Partitions[0].DatasetUUID == expectedDataset.Id.UUID &&
artifact.Partitions[1].Key == expectedArtifact.Partitions[1].Key &&
artifact.Partitions[1].Value == expectedArtifact.Partitions[1].Value &&
artifact.Partitions[1].DatasetUUID == expectedDataset.Id.UUID
})).Return(nil)

request := datacatalog.CreateArtifactRequest{Artifact: getTestArtifact()}
Expand Down Expand Up @@ -178,25 +200,9 @@ func TestCreateArtifact(t *testing.T) {
})

t.Run("Already exists", func(t *testing.T) {
dcRepo := &mocks.DataCatalogRepo{
MockDatasetRepo: &mocks.DatasetRepo{},
MockArtifactRepo: &mocks.ArtifactRepo{},
}
dcRepo.MockDatasetRepo.On("Get", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
expectedDataset := getTestDataset()
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
})).Return(models.Dataset{
DatasetKey: models.DatasetKey{
Project: getTestDataset().Id.Project,
Domain: getTestDataset().Id.Domain,
Name: getTestDataset().Id.Name,
Version: getTestDataset().Id.Version,
},
}, nil)
dcRepo := newMockDataCatalogRepo()

dcRepo.MockDatasetRepo.On("Get", mock.Anything, mock.Anything).Return(mockDatasetModel, nil)

dcRepo.MockArtifactRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
Expand All @@ -220,6 +226,70 @@ func TestCreateArtifact(t *testing.T) {
responseCode := status.Code(err)
assert.Equal(t, codes.AlreadyExists, responseCode)
})

t.Run("Missing Partitions", func(t *testing.T) {
dcRepo := newMockDataCatalogRepo()
dcRepo.MockDatasetRepo.On("Get", mock.Anything, mock.Anything).Return(mockDatasetModel, nil)
artifact := getTestArtifact()
artifact.Partitions = nil
dcRepo.MockArtifactRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(artifact models.Artifact) bool {
return false
})).Return(fmt.Errorf("Validation should happen before this happens"))

request := datacatalog.CreateArtifactRequest{Artifact: artifact}
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.CreateArtifact(ctx, request)
assert.Error(t, err)
assert.Nil(t, artifactResponse)

responseCode := status.Code(err)
assert.Equal(t, codes.InvalidArgument, responseCode)
})

t.Run("No Partitions", func(t *testing.T) {
dcRepo := newMockDataCatalogRepo()
mockDatasetModel := models.Dataset{
DatasetKey: models.DatasetKey{
Project: expectedDataset.Id.Project,
Domain: expectedDataset.Id.Domain,
Name: expectedDataset.Id.Name,
Version: expectedDataset.Id.Version,
},
}
dcRepo.MockDatasetRepo.On("Get", mock.Anything, mock.Anything).Return(mockDatasetModel, nil)
artifact := getTestArtifact()
artifact.Partitions = []*datacatalog.Partition{}
dcRepo.MockArtifactRepo.On("Create", mock.Anything, mock.Anything).Return(nil)

request := datacatalog.CreateArtifactRequest{Artifact: artifact}
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
_, err := artifactManager.CreateArtifact(ctx, request)
assert.NoError(t, err)
})

t.Run("Invalid Partition", func(t *testing.T) {
dcRepo := newMockDataCatalogRepo()
dcRepo.MockDatasetRepo.On("Get", mock.Anything, mock.Anything).Return(mockDatasetModel, nil)
artifact := getTestArtifact()
artifact.Partitions = append(artifact.Partitions, &datacatalog.Partition{Key: "invalidKey", Value: "invalid"})
dcRepo.MockArtifactRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(artifact models.Artifact) bool {
return false
})).Return(fmt.Errorf("Validation should happen before this happens"))

request := datacatalog.CreateArtifactRequest{Artifact: artifact}
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.CreateArtifact(ctx, request)
assert.Error(t, err)
assert.Nil(t, artifactResponse)

responseCode := status.Code(err)
assert.Equal(t, codes.InvalidArgument, responseCode)
})

}

func TestGetArtifact(t *testing.T) {
Expand Down Expand Up @@ -252,7 +322,7 @@ func TestGetArtifact(t *testing.T) {
Version: expectedDataset.Version,
Name: expectedDataset.Name,
}
testArtifactModel := models.Artifact{
mockArtifactModel := models.Artifact{
ArtifactKey: models.ArtifactKey{
DatasetProject: expectedDataset.Project,
DatasetDomain: expectedDataset.Domain,
Expand All @@ -268,6 +338,10 @@ func TestGetArtifact(t *testing.T) {
SerializedMetadata: serializedMetadata,
},
SerializedMetadata: serializedMetadata,
Partitions: []models.Partition{
{Key: "key1", Value: "value1"},
{Key: "key2", Value: "value2"},
},
}

t.Run("Get by Id", func(t *testing.T) {
Expand All @@ -279,7 +353,7 @@ func TestGetArtifact(t *testing.T) {
artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
artifactKey.DatasetName == expectedArtifact.Dataset.Name
})).Return(testArtifactModel, nil)
})).Return(mockArtifactModel, nil)

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.GetArtifact(ctx, datacatalog.GetArtifactRequest{
Expand Down Expand Up @@ -309,8 +383,8 @@ func TestGetArtifact(t *testing.T) {
DatasetVersion: expectedTag.DatasetVersion,
TagName: expectedTag.TagName,
},
Artifact: testArtifactModel,
ArtifactID: testArtifactModel.ArtifactID,
Artifact: mockArtifactModel,
ArtifactID: mockArtifactModel.ArtifactID,
}, nil)

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
Expand Down
24 changes: 21 additions & 3 deletions pkg/manager/impl/dataset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
"google.golang.org/grpc/codes"
)

type datasetMetrics struct {
Expand All @@ -37,15 +38,32 @@ type datasetManager struct {
systemMetrics datasetMetrics
}

func (dm *datasetManager) validateCreateRequest(request datacatalog.CreateDatasetRequest) error {
errorSet := make([]error, 0)
err := validators.ValidateDatasetID(request.Dataset.Id)
if err != nil {
errorSet = append(errorSet, err)
}

err = validators.ValidateUniquePartitionKeys(request.Dataset.PartitionKeys)
if err != nil {
errorSet = append(errorSet, err)
}

if len(errorSet) > 0 {
return errors.NewCollectedErrors(codes.InvalidArgument, errorSet)
}

return nil
}

// Create a Dataset with optional metadata. If one already exists a grpc AlreadyExists err will be returned
func (dm *datasetManager) CreateDataset(ctx context.Context, request datacatalog.CreateDatasetRequest) (*datacatalog.CreateDatasetResponse, error) {
timer := dm.systemMetrics.createResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDatasetID(request.Dataset.Id)
err := dm.validateCreateRequest(request)
if err != nil {
logger.Warnf(ctx, "Invalid create dataset request %+v err: %v", request, err)
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

Expand Down
Loading

0 comments on commit 4b2b3b0

Please sign in to comment.