Skip to content

Commit

Permalink
feat: ucan stream handles receipts (#174)
Browse files Browse the repository at this point in the history
This PR hooks kinesis consumers with receipts, instead of previous world
where it would hook up with workflows. For this, it simply filters out
Kinesis events that are not receipts as they should not be account for
the metrics we want to track at the moment.

As talked in today`s morning sync, setting up a new Kinesis stream so
that:
- we reset clock history of 1 year
- we still keep old stream around so that we can persist what is there
to S3 (or even re-process it and pipe to the new stream with receipts
and needed files in correct buckets!)
  • Loading branch information
vasco-santos authored Mar 23, 2023
1 parent 2f4b293 commit 817eb0f
Show file tree
Hide file tree
Showing 27 changed files with 416 additions and 39 deletions.
11 changes: 10 additions & 1 deletion stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,17 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// TODO: keep for historical content that we might want to process
new KinesisStream(stack, 'ucan-stream', {
cdk: {
stream: {
retentionPeriod: Duration.days(365)
}
},
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
const ucanStream = new KinesisStream(stack, 'ucan-stream-v2', {
cdk: {
stream: {
retentionPeriod: Duration.days(365)
Expand Down
5 changes: 5 additions & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import {
remove as uploadRemove
} from '@web3-storage/capabilities/upload'

export const CONTENT_TYPE = {
WORKFLOW: 'application/invocations+car',
RECEIPT: 'application/receipt+dag-cbor'
}

// UCAN protocol
export const STORE_ADD = storeAdd.can
export const STORE_REMOVE = storeRemove.can
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-add-size-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddSizeTotal(invocationsWithStoreAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateStoreAddTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddTotal(invocationsWithStoreAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-remove-size-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import { createCarStore } from '../buckets/car-store.js'
import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -36,7 +36,7 @@ async function handler(event) {
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-remove-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateStoreRemoveTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreRemoveTotal(invocationsWithStoreRemove)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-upload-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_ADD } from '../constants.js'
import { UPLOAD_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateUploadAddTotal (ucanInvocations, ctx) {
const invocationsWithUploadAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_ADD)
inv => inv.value.att.find(a => a.can === UPLOAD_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementUploadAddTotal(invocationsWithUploadAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-upload-remove-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'
import { UPLOAD_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateUploadRemoveTotal (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementUploadRemoveTotal(invocationsWithUploadRemove)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -36,7 +36,7 @@ async function handler(event) {
*/
export async function updateAddSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddSizeTotal(invocationsWithStoreAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/space-metrics-store-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const STORE_ADD = 'store/add'
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
Expand Down Expand Up @@ -43,7 +43,7 @@ async function handler(event) {
*/
export async function updateStoreCount (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddCount(invocationsWithStoreAdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -36,7 +36,7 @@ async function handler(event) {
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/space-metrics-store-remove-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -43,7 +43,7 @@ async function handler(event) {
*/
export async function updateStoreCount (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreRemoveCount(invocationsWithStoreRemove)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/space-metrics-upload-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_ADD } from '../constants.js'
import { UPLOAD_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -43,7 +43,7 @@ async function handler(event) {
*/
export async function updateUploadCount (ucanInvocations, ctx) {
const invocationsWithUploadAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_ADD)
inv => inv.value.att.find(a => a.can === UPLOAD_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementUploadAddCount(invocationsWithUploadAdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'
import { UPLOAD_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateUploadCount (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementUploadRemoveCount(invocationsWithUploadRemove)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { adminMetricsTableProps } from '../../tables/index.js'

import { updateSizeTotal } from '../../functions/metrics-store-add-size-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
import { METRICS_NAMES } from '../../constants.js'
import { METRICS_NAMES, CONTENT_TYPE } from '../../constants.js'

const REGION = 'us-west-2'

Expand Down Expand Up @@ -52,6 +52,7 @@ test('handles a batch of single invocation with store/add', async t => {
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.RECEIPT,
ts: Date.now()
}]

Expand Down Expand Up @@ -95,6 +96,7 @@ test('handles batch of single invocations with multiple store/add attributes', a
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.RECEIPT,
ts: Date.now()
}]

Expand All @@ -112,6 +114,91 @@ test('handles batch of single invocations with multiple store/add attributes', a
t.is(item?.value, cars.reduce((acc, c) => acc + c.size, 0))
})

test('handles a batch of single invocation without store/add', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: car.cid.toString(),
value: {
att: [
StoreCapabilities.remove.create({
with: spaceDid,
nb: {
link: car.cid,
}
})
],
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.RECEIPT,
ts: Date.now()
}]

// @ts-expect-error
await updateSizeTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, 0)
})

test('handles a batch of single invocation without receipts', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: car.cid.toString(),
value: {
att: [
StoreCapabilities.add.create({
with: spaceDid,
nb: {
link: car.cid,
size: car.size
}
})
],
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.WORKFLOW,
ts: Date.now()
}]

// @ts-expect-error
await updateSizeTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, 0)
})

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
*/
Expand Down
Loading

0 comments on commit 817eb0f

Please sign in to comment.