Skip to content

Commit

Permalink
Add distributed impl
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 13, 2025
1 parent b3dab70 commit 2039415
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 7 deletions.
14 changes: 13 additions & 1 deletion chromadb/db/impl/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
CreateSegmentRequest,
CreateTenantRequest,
DeleteCollectionRequest,
DeleteDatabaseRequest,
DeleteSegmentRequest,
GetCollectionsRequest,
GetCollectionsResponse,
Expand Down Expand Up @@ -128,7 +129,18 @@ def get_database(self, name: str, tenant: str = DEFAULT_TENANT) -> Database:

@overrides
def delete_database(self, name: str, tenant: str = DEFAULT_TENANT) -> None:
raise NotImplementedError()
try:
request = DeleteDatabaseRequest(name=name, tenant=tenant)
self._sys_db_stub.DeleteDatabase(
request, timeout=self._request_timeout_seconds
)
except grpc.RpcError as e:
logger.info(
f"Failed to delete database {name} for tenant {tenant} due to error: {e}"
)
if e.code() == grpc.StatusCode.NOT_FOUND:
raise NotFoundError()
raise InternalError

@overrides
def list_databases(
Expand Down
12 changes: 6 additions & 6 deletions chromadb/test/api/test_delete_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@


def test_deletes_database(client_factories: ClientFactories) -> None:
if not NOT_CLUSTER_ONLY:
pytest.skip("This API is not yet supported by distributed")
client = client_factories.create_client()
client.reset()

admin_client = client_factories.create_admin_client_from_system()

Expand All @@ -30,8 +30,8 @@ def test_deletes_database(client_factories: ClientFactories) -> None:


def test_does_not_affect_other_databases(client_factories: ClientFactories) -> None:
if not NOT_CLUSTER_ONLY:
pytest.skip("This API is not yet supported by distributed")
client = client_factories.create_client()
client.reset()

admin_client = client_factories.create_admin_client_from_system()

Expand Down Expand Up @@ -73,8 +73,8 @@ def test_collection_was_removed(sqlite_persistent: System) -> None:


def test_errors_when_database_does_not_exist(client_factories: ClientFactories) -> None:
if not NOT_CLUSTER_ONLY:
pytest.skip("This API is not yet supported by distributed")
client = client_factories.create_client()
client.reset()

admin_client = client_factories.create_admin_client_from_system()

Expand Down
4 changes: 4 additions & 0 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (s *Coordinator) ListDatabases(ctx context.Context, listDatabases *model.Li
return databases, nil
}

func (s *Coordinator) DeleteDatabase(ctx context.Context, deleteDatabase *model.DeleteDatabase) error {
return s.catalog.DeleteDatabase(ctx, deleteDatabase)
}

func (s *Coordinator) CreateTenant(ctx context.Context, createTenant *model.CreateTenant) (*model.Tenant, error) {
tenant, err := s.catalog.CreateTenant(ctx, createTenant, createTenant.Ts)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions go/pkg/sysdb/coordinator/model/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,10 @@ type ListDatabases struct {
Tenant string
Ts types.Timestamp
}

type DeleteDatabase struct {
ID string
Name string
Tenant string
Ts types.Timestamp
}
17 changes: 17 additions & 0 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,23 @@ func (tc *Catalog) ListDatabases(ctx context.Context, listDatabases *model.ListD
return result, nil
}

func (tc *Catalog) DeleteDatabase(ctx context.Context, deleteDatabase *model.DeleteDatabase) error {
return tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
databases, err := tc.metaDomain.DatabaseDb(txCtx).GetDatabases(deleteDatabase.Tenant, deleteDatabase.Name)
if err != nil {
return err
}
if len(databases) == 0 {
return common.ErrDatabaseNotFound
}
err = tc.metaDomain.DatabaseDb(txCtx).Delete(databases[0].ID)
if err != nil {
return err
}
return nil
})
}

func (tc *Catalog) GetAllDatabases(ctx context.Context, ts types.Timestamp) ([]*model.Database, error) {
databases, err := tc.metaDomain.DatabaseDb(ctx).GetAllDatabases()
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions go/pkg/sysdb/grpc/tenant_database_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ func (s *Server) ListDatabases(ctx context.Context, req *coordinatorpb.ListDatab
return res, nil
}

func (s *Server) DeleteDatabase(ctx context.Context, req *coordinatorpb.DeleteDatabaseRequest) (*coordinatorpb.DeleteDatabaseResponse, error) {
deleteDatabase := &model.DeleteDatabase{
Name: req.GetName(),
Tenant: req.GetTenant(),
}
err := s.coordinator.DeleteDatabase(ctx, deleteDatabase)
if err != nil {
log.Error("error DeleteDatabase", zap.String("request", req.String()), zap.Error(err))
if errors.Is(err, common.ErrDatabaseNotFound) {
return nil, grpcutils.BuildNotFoundGrpcError(err.Error())
}
return nil, grpcutils.BuildInternalGrpcError(err.Error())
}
return &coordinatorpb.DeleteDatabaseResponse{}, nil
}

func (s *Server) CreateTenant(ctx context.Context, req *coordinatorpb.CreateTenantRequest) (*coordinatorpb.CreateTenantResponse, error) {
res := &coordinatorpb.CreateTenantResponse{}
createTenant := &model.CreateTenant{
Expand Down
37 changes: 37 additions & 0 deletions go/pkg/sysdb/grpc/tenant_database_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/chroma-core/chroma/go/pkg/sysdb/coordinator/model"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dao"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbcore"
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
"github.com/pingcap/log"
"github.com/stretchr/testify/suite"
"google.golang.org/genproto/googleapis/rpc/code"
Expand Down Expand Up @@ -99,6 +100,42 @@ func (suite *TenantDatabaseServiceTestSuite) TestServer_TenantLastCompactionTime
suite.NoError(err)
}

func (suite *TenantDatabaseServiceTestSuite) TestServer_DeleteDatabase() {
tenantName := "TestDeleteDatabase"
databaseName := "TestDeleteDatabase"

_, err := suite.catalog.CreateTenant(context.Background(), &model.CreateTenant{
Name: tenantName,
Ts: time.Now().Unix(),
}, time.Now().Unix())
suite.NoError(err)

_, err = suite.catalog.CreateDatabase(context.Background(), &model.CreateDatabase{
Tenant: tenantName,
Name: databaseName,
}, time.Now().Unix())
suite.NoError(err)

_, _, err = suite.catalog.CreateCollection(context.Background(), &model.CreateCollection{
TenantID: tenantName,
DatabaseName: databaseName,
Name: "TestCollection",
}, time.Now().Unix())
suite.NoError(err)

err = suite.catalog.DeleteDatabase(context.Background(), &model.DeleteDatabase{
Tenant: tenantName,
Name: databaseName,
})
suite.NoError(err)

// Check that associated collection was deleted
var count int64
var collections []*dbmodel.Collection
suite.NoError(suite.db.Find(&collections).Count(&count).Error)
suite.Equal(int64(0), count)
}

func TestTenantDatabaseServiceTestSuite(t *testing.T) {
testSuite := new(TenantDatabaseServiceTestSuite)
suite.Run(t, testSuite)
Expand Down
14 changes: 14 additions & 0 deletions go/pkg/sysdb/metastore/db/dao/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ func (s *databaseDb) Insert(database *dbmodel.Database) error {
return err
}

func (s *databaseDb) Delete(databaseID string) error {
return s.db.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("id = ?", databaseID).Delete(&dbmodel.Database{}).Error; err != nil {
return err
}

if err := tx.Where("database_id = ?", databaseID).Delete(&dbmodel.Collection{}).Error; err != nil {
return err
}

return nil
})
}

func (s *databaseDb) GetDatabasesByTenantID(tenantID string) ([]*dbmodel.Database, error) {
var databases []*dbmodel.Database
query := s.db.Table("databases").
Expand Down
1 change: 1 addition & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ type IDatabaseDb interface {
ListDatabases(limit *int32, offset *int32, tenantID string) ([]*Database, error)
Insert(in *Database) error
DeleteAll() error
Delete(databaseID string) error
}
8 changes: 8 additions & 0 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ message ListDatabasesResponse {
repeated Database databases = 1;
}

message DeleteDatabaseRequest {
string name = 1;
string tenant = 2;
}

message DeleteDatabaseResponse {}

message CreateTenantRequest {
string name = 2; // Names are globally unique
}
Expand Down Expand Up @@ -357,6 +364,7 @@ service SysDB {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse) {}
rpc GetDatabase(GetDatabaseRequest) returns (GetDatabaseResponse) {}
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse) {}
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse) {}
rpc CreateTenant(CreateTenantRequest) returns (CreateTenantResponse) {}
rpc GetTenant(GetTenantRequest) returns (GetTenantResponse) {}
rpc CreateSegment(CreateSegmentRequest) returns (CreateSegmentResponse) {}
Expand Down

0 comments on commit 2039415

Please sign in to comment.