Skip to content

Commit

Permalink
Add aggregationId for aggregations (#465)
Browse files Browse the repository at this point in the history
* ft: add aggregation id migration script

* refactor: update aggregation pg adapter methods

* ft: get aggregation using id pg adapter method

* refactor: aggregation model class to use updated pg adapter methods/params

* chore: aggregation db adapter method documentation

* chore: todo comments

* refactor: simply step 2 of add aggregation id migration script

* ft: add aggregationId to `LinkedAggregation` in GQL API

* fix: remove redundant part of `getAggregation` sql qeury

* fix: add `source_account_id` to primary key of aggregations table
  • Loading branch information
benwerner01 authored Apr 11, 2022
1 parent f94f332 commit e6f3ba8
Show file tree
Hide file tree
Showing 21 changed files with 300 additions and 159 deletions.
58 changes: 50 additions & 8 deletions packages/hash/api/src/db/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ export type DbLinkVersion = {
};

export type DbAggregation = {
aggregationId: string;
sourceAccountId: string;
sourceEntityId: string;
sourceEntityVersionIds: Set<string>;
Expand Down Expand Up @@ -541,39 +542,80 @@ export interface DbClient {
emailAddress: string;
}): Promise<VerificationCode>;

/**
* Create an aggregation for an entity.
*
* @param params.sourceAccountId the account id of the source entity
* @param params.sourceEntityId the entity id of the source entity
* @param params.path the aggregation path
* @param params.operation the aggregation operation
* @param params.createdByAccountId the account id of the user that created the aggregation
*/
createAggregation(params: {
createdByAccountId: string;
sourceAccountId: string;
sourceEntityId: string;
path: string;
operation: object;
createdByAccountId: string;
}): Promise<DbAggregation>;

/**
* Update the operation of an existing aggregation.
*
* @param params.aggregationId the id of the aggregation
* @param params.operation the updated aggregation operation
*/
updateAggregationOperation(params: {
sourceAccountId: string;
sourceEntityId: string;
path: string;
aggregationId: string;
operation: object;
}): Promise<DbAggregation>;

getEntityAggregation(params: {
/**
* Get an aggregation by its id.
*
* @param params.aggregationId the id of the aggregation
*/
getAggregation(params: {
aggregationId: string;
}): Promise<DbAggregation | null>;

/**
* Get an aggregation by its source entity and path.
*
* @param params.sourceAccountId the account id of the source entity
* @param params.sourceEntityId the entity id of the source entity
* @param params.sourceEntityVersionId the entityVersionId of the source entity (optional)
* @param params.path the aggregation path
*/
getEntityAggregationByPath(params: {
sourceAccountId: string;
sourceEntityId: string;
sourceEntityVersionId?: string;
path: string;
}): Promise<DbAggregation | null>;

/**
* Get all aggregations for an entity.
*
* @param params.sourceAccountId the account id of the source entity
* @param params.sourceEntityId the entity id of the source entity
* @param params.sourceEntityVersionId the entityVersionId of the source entity (optional)
*/
getEntityAggregations(params: {
sourceAccountId: string;
sourceEntityId: string;
sourceEntityVersionId?: string;
}): Promise<DbAggregation[]>;

/**
* Delete an existing aggregation.
*
* @param params.aggregationId the id of the aggregation
* @param params.deletedByAccountId the account id of the user that deleted the aggregation
*/
deleteAggregation(params: {
aggregationId: string;
deletedByAccountId: string;
sourceAccountId: string;
sourceEntityId: string;
path: string;
}): Promise<void>;

/** Get a verification code (it may be invalid!) */
Expand Down
20 changes: 5 additions & 15 deletions packages/hash/api/src/db/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,12 @@ export class DbLinkNotFoundError extends Error {
}

export class DbAggregationNotFoundError extends Error {
sourceAccountId?: string;
sourceEntityId?: string;
path: string;
aggregationId: string;

constructor(params: {
sourceAccountId?: string;
sourceEntityId?: string;
path: string;
}) {
const { sourceAccountId, sourceEntityId, path } = params;
super(
`Aggregation with path ${path} for source entity with id ${sourceEntityId} in account ${sourceAccountId} not found`,
);
this.sourceAccountId = sourceAccountId;
this.sourceEntityId = sourceEntityId;
this.path = path;
constructor(params: { aggregationId: string }) {
const { aggregationId } = params;
super(`Aggregation with aggregationId ${aggregationId} not found`);
this.aggregationId = aggregationId;
}
}

Expand Down
26 changes: 11 additions & 15 deletions packages/hash/api/src/db/postgres/aggregation/createAggregation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Connection } from "../types";
import { DbAggregation } from "../../adapter";
import { DbAggregation, DbClient } from "../../adapter";
import {
acquireEntityLock,
getEntityLatestVersion,
Expand All @@ -8,16 +8,11 @@ import {
import { DbEntityNotFoundError } from "../..";
import { insertAggregation } from "./util";
import { requireTransaction } from "../util";
import { genId } from "../../../util";

export const createAggregation = async (
existingConnection: Connection,
params: {
createdByAccountId: string;
sourceAccountId: string;
sourceEntityId: string;
path: string;
operation: object;
},
params: Parameters<DbClient["createAggregation"]>[0],
): Promise<DbAggregation> =>
requireTransaction(existingConnection)(async (conn) => {
const { sourceAccountId, sourceEntityId, createdByAccountId } = params;
Expand Down Expand Up @@ -52,15 +47,16 @@ export const createAggregation = async (

const now = new Date();

const createdAt = now;
const aggregation: DbAggregation = {
...params,
aggregationId: genId(),
sourceEntityVersionIds,
createdAt: now,
};

await insertAggregation(conn, {
aggregation: {
...params,
sourceEntityVersionIds,
createdAt,
},
aggregation,
});

return { ...params, sourceEntityVersionIds, createdAt };
return aggregation;
});
22 changes: 14 additions & 8 deletions packages/hash/api/src/db/postgres/aggregation/deleteAggregation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ import {
import { DbEntityNotFoundError } from "../..";
import { deleteAggregationRow } from "./util";
import { requireTransaction } from "../util";
import { getAggregation } from "./getAggregation";
import { DbClient } from "../../adapter";
import { DbAggregationNotFoundError } from "../../errors";

export const deleteAggregation = async (
existingConnection: Connection,
params: {
sourceAccountId: string;
sourceEntityId: string;
path: string;
deletedByAccountId: string;
},
params: Parameters<DbClient["deleteAggregation"]>[0],
): Promise<void> =>
requireTransaction(existingConnection)(async (conn) => {
const { sourceAccountId, sourceEntityId, deletedByAccountId } = params;
const { aggregationId, deletedByAccountId } = params;

const dbAggregation = await getAggregation(conn, { aggregationId });

if (!dbAggregation) {
throw new DbAggregationNotFoundError(params);
}

const { sourceAccountId, sourceEntityId } = dbAggregation;

await acquireEntityLock(conn, { entityId: sourceEntityId });

Expand Down Expand Up @@ -47,7 +53,7 @@ export const deleteAggregation = async (
entity: dbSourceEntity,
/** @todo: re-implement method to not require updated `properties` */
properties: dbSourceEntity.properties,
omittedAggregations: [params],
omittedAggregations: [{ aggregationId }],
});
} else {
await deleteAggregationRow(conn, params);
Expand Down
22 changes: 22 additions & 0 deletions packages/hash/api/src/db/postgres/aggregation/getAggregation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { sql } from "slonik";
import { DbAggregation, DbClient } from "../../adapter";

import { Connection } from "../types";
import {
DbAggregationRow,
aggregationsColumnNamesSQL,
mapRowToDbAggregation,
} from "./util";

export const getAggregation = async (
conn: Connection,
params: Parameters<DbClient["getAggregation"]>[0],
): Promise<DbAggregation | null> => {
const row = await conn.maybeOne<DbAggregationRow>(sql`
select ${aggregationsColumnNamesSQL}
from aggregations
where aggregation_id = ${params.aggregationId}
`);

return row ? mapRowToDbAggregation(row) : null;
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { sql } from "slonik";
import { DbAggregation } from "../../adapter";
import { DbAggregation, DbClient } from "../../adapter";

import { Connection } from "../types";
import {
Expand All @@ -8,14 +8,9 @@ import {
mapRowToDbAggregation,
} from "./util";

export const getEntityAggregation = async (
export const getEntityAggregationByPath = async (
conn: Connection,
params: {
sourceAccountId: string;
sourceEntityId: string;
sourceEntityVersionId?: string;
path: string;
},
params: Parameters<DbClient["getEntityAggregationByPath"]>[0],
): Promise<DbAggregation | null> => {
const row = await conn.maybeOne<DbAggregationRow>(sql`
select ${aggregationsColumnNamesSQL}
Expand All @@ -33,7 +28,7 @@ export const getEntityAggregation = async (
sql` and `,
)}
order by created_at desc
limit 1
limit 1 -- @todo: remove when aggregation versions are stored in separate table
`);

return row ? mapRowToDbAggregation(row) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,27 @@ import {
} from "../entity";
import { DbEntityNotFoundError } from "../..";
import { insertAggregation, updateAggregationRowOperation } from "./util";
import { getEntityAggregation } from "./getEntityAggregation";
import { DbAggregation } from "../../adapter";
import { getAggregation } from "./getAggregation";
import { DbAggregation, DbClient } from "../../adapter";
import { requireTransaction } from "../util";
import { DbAggregationNotFoundError } from "../../errors";
import { genId } from "../../../util";

export const updateAggregationOperation = (
existingConnection: Connection,
params: {
sourceAccountId: string;
sourceEntityId: string;
path: string;
operation: object;
},
params: Parameters<DbClient["updateAggregationOperation"]>[0],
): Promise<DbAggregation> =>
requireTransaction(existingConnection)(async (conn) => {
const { sourceAccountId, sourceEntityId, operation } = params;

const now = new Date();

const dbAggregation = await getEntityAggregation(conn, params);
const previousDbAggregation = await getAggregation(conn, params);

if (!dbAggregation) {
if (!previousDbAggregation) {
throw new DbAggregationNotFoundError(params);
}

const {
createdByAccountId,
createdAt,
sourceEntityVersionIds: prevSourceEntityVersionIds,
} = dbAggregation;

let sourceEntityVersionIds: Set<string> = prevSourceEntityVersionIds;
const { sourceAccountId, sourceEntityId, createdByAccountId, createdAt } =
previousDbAggregation;

await acquireEntityLock(conn, { entityId: sourceEntityId });

Expand All @@ -54,6 +43,15 @@ export const updateAggregationOperation = (
return dbEntity;
});

const { operation } = params;

const updatedDbAggregation = {
...previousDbAggregation,
operation,
createdByAccountId,
createdAt,
};

if (dbSourceEntity.metadata.versioned) {
dbSourceEntity = await updateVersionedEntity(conn, {
updatedByAccountId: createdByAccountId,
Expand All @@ -63,25 +61,18 @@ export const updateAggregationOperation = (
omittedAggregations: [params],
});

sourceEntityVersionIds = new Set([dbSourceEntity.entityVersionId]);
updatedDbAggregation.aggregationId = genId();
updatedDbAggregation.sourceEntityVersionIds = new Set([
dbSourceEntity.entityVersionId,
]);
updatedDbAggregation.createdAt = now;

await insertAggregation(conn, {
aggregation: {
...params,
sourceEntityVersionIds,
operation,
createdAt: now,
createdByAccountId,
},
aggregation: updatedDbAggregation,
});
} else {
await updateAggregationRowOperation(conn, params);
await updateAggregationRowOperation(conn, updatedDbAggregation);
}

return {
...params,
sourceEntityVersionIds,
createdByAccountId,
createdAt,
};
return updatedDbAggregation;
});
Loading

0 comments on commit e6f3ba8

Please sign in to comment.