Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ucan stream handles receipts #174

Merged
merged 1 commit into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion stacks/ucan-invocation-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,17 @@ export function UcanInvocationStack({ stack, app }) {
deadLetterQueue: spaceMetricsDLQ.cdk.queue,
})

// TODO: keep for historical content that we might want to process
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new KinesisStream(stack, 'ucan-stream', {
cdk: {
stream: {
retentionPeriod: Duration.days(365)
}
},
})

// create a kinesis stream
const ucanStream = new KinesisStream(stack, 'ucan-stream', {
const ucanStream = new KinesisStream(stack, 'ucan-stream-v2', {
cdk: {
stream: {
retentionPeriod: Duration.days(365)
Expand Down
5 changes: 5 additions & 0 deletions ucan-invocation/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import {
remove as uploadRemove
} from '@web3-storage/capabilities/upload'

export const CONTENT_TYPE = {
WORKFLOW: 'application/invocations+car',
RECEIPT: 'application/receipt+dag-cbor'
Comment on lines +11 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the + suffix denotes an existing media type which are registered as vnd.ipld.car and vnd.ipld.dag-cbor
  • the prefix is a tree, and we can add things to the vnd tree without much fuss like:
application/vnd.w3.ucan+vnd.ipld.car
application/vnd.w3.ucan.receipt+vnd.ipld.dag-cbor

of if we really don't care then

application/x.ucan+vnd.ipld.car
application/x.ucan.receipt+vnd.ipld.dag-cbor

see: https://en.wikipedia.org/wiki/Media_type#Naming

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that our notion of an invocation is currently a short lived ucan delegation that we as a service provider choose to interpret as an invocation. they are not explicitly invocation ucans.

}

// UCAN protocol
export const STORE_ADD = storeAdd.can
export const STORE_REMOVE = storeRemove.can
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-add-size-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddSizeTotal(invocationsWithStoreAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateStoreAddTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreAddTotal(invocationsWithStoreAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-remove-size-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import { createCarStore } from '../buckets/car-store.js'
import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -36,7 +36,7 @@ async function handler(event) {
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-store-remove-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateStoreRemoveTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementStoreRemoveTotal(invocationsWithStoreRemove)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-upload-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_ADD } from '../constants.js'
import { UPLOAD_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateUploadAddTotal (ucanInvocations, ctx) {
const invocationsWithUploadAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_ADD)
inv => inv.value.att.find(a => a.can === UPLOAD_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementUploadAddTotal(invocationsWithUploadAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/metrics-upload-remove-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createMetricsTable } from '../tables/metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'
import { UPLOAD_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateUploadRemoveTotal (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.metricsTable.incrementUploadRemoveTotal(invocationsWithUploadRemove)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD } from '../constants.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -36,7 +36,7 @@ async function handler(event) {
*/
export async function updateAddSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddSizeTotal(invocationsWithStoreAdd)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/space-metrics-store-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
dsn: process.env.SENTRY_DSN,
tracesSampleRate: 1.0,
})

const STORE_ADD = 'store/add'
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

/**
Expand Down Expand Up @@ -43,7 +43,7 @@ async function handler(event) {
*/
export async function updateStoreCount (ucanInvocations, ctx) {
const invocationsWithStoreAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_ADD)
inv => inv.value.att.find(a => a.can === STORE_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreAddCount(invocationsWithStoreAdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as Sentry from '@sentry/serverless'
import { createCarStore } from '../buckets/car-store.js'
import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -36,7 +36,7 @@ async function handler(event) {
*/
export async function updateRemoveSizeTotal (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

// TODO: once we have receipts for store/remove, replace this
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/space-metrics-store-remove-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { STORE_REMOVE } from '../constants.js'
import { STORE_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -43,7 +43,7 @@ async function handler(event) {
*/
export async function updateStoreCount (ucanInvocations, ctx) {
const invocationsWithStoreRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === STORE_REMOVE)
inv => inv.value.att.find(a => a.can === STORE_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementStoreRemoveCount(invocationsWithStoreRemove)
Expand Down
4 changes: 2 additions & 2 deletions ucan-invocation/functions/space-metrics-upload-add-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_ADD } from '../constants.js'
import { UPLOAD_ADD, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -43,7 +43,7 @@ async function handler(event) {
*/
export async function updateUploadCount (ucanInvocations, ctx) {
const invocationsWithUploadAdd = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_ADD)
inv => inv.value.att.find(a => a.can === UPLOAD_ADD) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementUploadAddCount(invocationsWithUploadAdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/serverless'

import { createSpaceMetricsTable } from '../tables/space-metrics.js'
import { parseKinesisEvent } from '../utils/parse-kinesis-event.js'
import { UPLOAD_REMOVE } from '../constants.js'
import { UPLOAD_REMOVE, CONTENT_TYPE } from '../constants.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -37,7 +37,7 @@ async function handler(event) {
*/
export async function updateUploadCount (ucanInvocations, ctx) {
const invocationsWithUploadRemove = ucanInvocations.filter(
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE)
inv => inv.value.att.find(a => a.can === UPLOAD_REMOVE) && inv.type === CONTENT_TYPE.RECEIPT
).flatMap(inv => inv.value.att)

await ctx.spaceMetricsTable.incrementUploadRemoveCount(invocationsWithUploadRemove)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { adminMetricsTableProps } from '../../tables/index.js'

import { updateSizeTotal } from '../../functions/metrics-store-add-size-total.js'
import { createMetricsTable } from '../../tables/metrics.js'
import { METRICS_NAMES } from '../../constants.js'
import { METRICS_NAMES, CONTENT_TYPE } from '../../constants.js'

const REGION = 'us-west-2'

Expand Down Expand Up @@ -52,6 +52,7 @@ test('handles a batch of single invocation with store/add', async t => {
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.RECEIPT,
ts: Date.now()
}]

Expand Down Expand Up @@ -95,6 +96,7 @@ test('handles batch of single invocations with multiple store/add attributes', a
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.RECEIPT,
ts: Date.now()
}]

Expand All @@ -112,6 +114,91 @@ test('handles batch of single invocations with multiple store/add attributes', a
t.is(item?.value, cars.reduce((acc, c) => acc + c.size, 0))
})

test('handles a batch of single invocation without store/add', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: car.cid.toString(),
value: {
att: [
StoreCapabilities.remove.create({
with: spaceDid,
nb: {
link: car.cid,
}
})
],
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.RECEIPT,
ts: Date.now()
}]

// @ts-expect-error
await updateSizeTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, 0)
})

test('handles a batch of single invocation without receipts', async t => {
const { tableName } = await prepareResources(t.context.dynamoClient)
const uploadService = await Signer.generate()
const alice = await Signer.generate()
const { spaceDid } = await createSpace(alice)
const car = await randomCAR(128)

const metricsTable = createMetricsTable(REGION, tableName, {
endpoint: t.context.dbEndpoint
})

const invocations = [{
carCid: car.cid.toString(),
value: {
att: [
StoreCapabilities.add.create({
with: spaceDid,
nb: {
link: car.cid,
size: car.size
}
})
],
aud: uploadService.did(),
iss: alice.did()
},
type: CONTENT_TYPE.WORKFLOW,
ts: Date.now()
}]

// @ts-expect-error
await updateSizeTotal(invocations, {
metricsTable
})

const item = await getItemFromTable(t.context.dynamoClient, tableName, {
name: METRICS_NAMES.STORE_ADD_SIZE_TOTAL
})
t.truthy(item)
t.is(item?.name, METRICS_NAMES.STORE_ADD_SIZE_TOTAL)
t.is(item?.value, 0)
})

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} dynamoClient
*/
Expand Down
Loading