Skip to content

Commit 2fac64e

Browse files
authored
feat(appsync): add support for data source integrations (#34248)
### Issue # (if applicable) Closes #34264 ### Reason for this change This change is to support the release of data source integrations for AWS AppSync Events. The implementation is almost an exact replica of the existing data sources for AppSync GraphQL APIs. ### Description of changes - New `data-source-common.ts` file which contains the data source implementation. This new file was needed given the previous `data-source.ts` implementation only works for the `IGraphqlApi` Interface. The common implementation makes it to the new generic `IApi` type is used. - Changes to `eventapi.ts` to support adding data sources to the API. - Changes to `channel-namespace.ts` to support adding event handler configuration with the data sources configured for the API. - New unit tests to validate functionality for data sources. - New integration tests to validate end-to-end functionality across all data source types and the different configurations for the Lambda data source type including direct Lambda invoke and sync/async invoke type. ### Describe any new or updated permissions being added There are no new permissions being added. The data source implementation was a copy/paste from the GraphQL implementation and the grant methods used in those have not changed in this implementation. ### Description of how you validated changes - New unit tests to validate functionality for data sources. - New integration tests to validate end-to-end functionality across all data source types and the different configurations for the Lambda data source type including direct Lambda invoke and sync/async invoke type. - `integ.appsync-eventapi-dynamodb.ts` - `integ.appsync-eventapi-eventbridge.ts` - `integ.appsync-eventapi-http.ts` - `integ.appsync-eventapi-lambda-direct-async.ts` - `integ.appsync-eventapi-lambda-direct.ts` - `integ.appsync-eventapi-lambda.ts` - `integ.appsync-eventapi-opensearch.ts` - `integ.appsync-eventapi-rds.ts` ### Checklist - [X] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md) ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
1 parent aae5274 commit 2fac64e

File tree

189 files changed

+587785
-101759
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

189 files changed

+587785
-101759
lines changed

packages/@aws-cdk-testing/framework-integ/test/aws-appsync/test/integ-assets/eventapi-grant-assertion/index.js

+23-16
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,12 @@ async function getPublishAuthHeader(authMode, event={}, authToken='') {
225225
* @param {string} authMode the authorization mode for the request
226226
* @param {string} authToken the token used for Lambda auth mode
227227
* @param {boolean} triggerPub whether to also publish in the method
228+
* @param {array} eventPayload the payload to publish
228229
* @returns {Object}
229230
*/
230-
async function subscribe(channel, authMode, authToken, triggerPub=false) {
231+
async function subscribe(channel, authMode, authToken, triggerPub=false, eventPayload=[]) {
231232
const response = {};
233+
const pubMsg = [];
232234
const authHeader = await getPublishAuthHeader(authMode, {}, authToken);
233235
const auth = getAuthProtocolForIAM(authHeader);
234236
const socket = await new Promise((resolve, reject) => {
@@ -255,7 +257,7 @@ async function subscribe(channel, authMode, authToken, triggerPub=false) {
255257
} else if (payload.type === 'data') {
256258
console.log('Data received');
257259
response.pubStatusCode = 200;
258-
response.pubMsg = JSON.parse(payload.event).message;
260+
pubMsg.push(JSON.parse(payload.event));
259261
} else if (payload.type === 'subscribe_error') {
260262
console.log(payload);
261263
if (payload.errors.some((error) => error.errorType === 'UnauthorizedException')) {
@@ -276,7 +278,7 @@ async function subscribe(channel, authMode, authToken, triggerPub=false) {
276278
socket.onerror = (event) => console.log(event)
277279
});
278280

279-
const subChannel = `/${channel}/*`;
281+
const subChannel = `${channel}/*`;
280282
socket.send(JSON.stringify({
281283
type: 'subscribe',
282284
id: crypto.randomUUID(),
@@ -285,35 +287,39 @@ async function subscribe(channel, authMode, authToken, triggerPub=false) {
285287
}));
286288

287289
if (triggerPub) {
288-
await sleep(1000);
289-
await publish(channel, authMode, authToken);
290+
await sleep(2000);
291+
console.log("Hello in here for publishing")
292+
await publish(channel, eventPayload, authMode, authToken);
290293
}
291294
await sleep(3000);
295+
296+
if (pubMsg.length > 0)
297+
response.pubMsg = pubMsg;
298+
292299
return response;
293300
}
294301

295302
/**
296303
* Publishes to a channel and returns the response
297304
*
298305
* @param {string} channel the channel to publish to
306+
* @param {array} eventPayload the payload to publish
299307
* @param {string} authMode the auth mode to use for publishing
300308
* @param {string} authToken the auth token to use for Lambda auth mode
301309
* @returns {Object}
302310
*/
303-
async function publish(channel, authMode, authToken) {
311+
async function publish(channel, eventPayload, authMode, authToken) {
304312
const event = {
305-
'channel': `/${channel}/test`,
306-
'events': [
307-
JSON.stringify({message:'Hello World!'})
308-
]
309-
}
310-
313+
'channel': `${channel}/test`,
314+
'events': eventPayload.map((_payload) => JSON.stringify(_payload))
315+
};
316+
311317
const response = await fetch(`${httpUrl}`, {
312318
method: 'POST',
313319
headers: await getPublishAuthHeader(authMode, event, authToken),
314320
body: JSON.stringify(event)
315321
});
316-
322+
317323
if (!response.ok) {
318324
return {
319325
statusCode: response.status,
@@ -323,7 +329,7 @@ async function publish(channel, authMode, authToken) {
323329
const output = await response.json();
324330
return {
325331
statusCode: 200,
326-
msg: output.successful.length == 1 ? 'publish_success' : 'publish_fail',
332+
msg: output.successful.length == eventPayload.length ? 'publish_success' : 'publish_fail',
327333
}
328334
}
329335

@@ -337,6 +343,7 @@ exports.handler = async function(event) {
337343
const channel = event.channel;
338344
const authMode = event.authMode;
339345
const authToken = event.authToken ?? '';
346+
const eventPayload = event.eventPayload ?? [{message:'Hello World!'}];
340347
const isCustomEndpoint = event.customEndpoint ?? false;
341348

342349
// If custom endpoint, wait for 60 seconds for DNS to propagate
@@ -350,13 +357,13 @@ exports.handler = async function(event) {
350357

351358
let res;
352359
if (pubSubAction === 'publish') {
353-
res = await publish(channel, authMode, authToken);
360+
res = await publish(channel, eventPayload, authMode, authToken);
354361
console.log(res);
355362
} else if (pubSubAction === 'subscribe') {
356363
res = await subscribe(channel, authMode, authToken, false);
357364
console.log(res);
358365
} else if (pubSubAction === 'pubSub') {
359-
res = await subscribe(channel, authMode, authToken, true);
366+
res = await subscribe(channel, authMode, authToken, true, eventPayload);
360367
console.log(res);
361368
}
362369

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { util } from '@aws-appsync/utils';
2+
3+
const TABLE = 'event-messages';
4+
5+
export const onPublish = {
6+
request: (ctx) => {
7+
return {
8+
operation: 'BatchPutItem',
9+
tables: {
10+
[TABLE]: ctx.events.map((event) => util.dynamodb.toMapValues({
11+
...event.payload,
12+
ddb: true,
13+
eventId: event.id
14+
})),
15+
},
16+
};
17+
},
18+
response: (ctx) => {
19+
const { error, result } = ctx;
20+
if (error) {
21+
return util.appendError(error.message, error.type, result);
22+
}
23+
return ctx.result.data[TABLE].map((item) => ({
24+
id: item.eventId,
25+
payload: {
26+
...item
27+
}
28+
}));
29+
},
30+
}
31+
32+
export function onSubscribe(ctx) {
33+
// Reject a subscription attempt
34+
// util.unauthorized();
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { util } from '@aws-appsync/utils';
2+
3+
export const onPublish = {
4+
request: (ctx) => {
5+
const events = ctx.events.map((event) => ({
6+
source: 'appsync.eventapi',
7+
detail: {
8+
id: event.id,
9+
...event.payload,
10+
},
11+
detailType: 'AppSyncEvent',
12+
}));
13+
14+
return {
15+
operation: "PutEvents",
16+
events: events,
17+
};
18+
},
19+
response: (ctx) => {
20+
const { error, result } = ctx;
21+
if (error) {
22+
return util.appendError(error.message, error.type, result);
23+
}
24+
25+
return ctx.events.map((event, index) => ({
26+
id: event.id,
27+
payload: {
28+
...event.payload,
29+
...ctx.result.Entries[index],
30+
}
31+
}));
32+
},
33+
}
34+
35+
export function onSubscribe(ctx) {
36+
// Reject a subscription attempt
37+
// util.unauthorized();
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { util } from '@aws-appsync/utils';
2+
3+
export const onPublish = {
4+
request: (ctx) => {
5+
return {
6+
"version": "2018-05-29",
7+
"method": "GET",
8+
"params": {
9+
"headers": {
10+
"Content-Type": "application/json"
11+
}
12+
},
13+
"resourcePath": `/prod/random`
14+
};
15+
},
16+
response: (ctx) => {
17+
const { error, result } = ctx;
18+
if (error) {
19+
return util.appendError(error.message, error.type, result);
20+
}
21+
const randomValue = result.body;
22+
return ctx.events.map((event) => ({
23+
id: event.id,
24+
payload: {
25+
...event.payload,
26+
random: randomValue
27+
}
28+
}));
29+
},
30+
}
31+
32+
export function onSubscribe(ctx) {
33+
// Reject a subscription attempt
34+
// util.unauthorized();
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { util } from '@aws-appsync/utils';
2+
3+
export const onPublish = {
4+
request: (ctx) => {
5+
return {
6+
operation: 'Invoke',
7+
payload: {
8+
events: ctx.events
9+
},
10+
};
11+
},
12+
response: (ctx) => {
13+
const { error, result } = ctx;
14+
if (error) {
15+
return util.appendError(error.message, error.type, result);
16+
}
17+
18+
return result;
19+
},
20+
}
21+
22+
export function onSubscribe(ctx) {
23+
// Reject a subscription attempt
24+
// util.unauthorized();
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { util } from '@aws-appsync/utils';
2+
3+
const INDEX_NAME = 'movies';
4+
5+
export const onPublish = {
6+
request: (ctx) => {
7+
const docs = ctx.events.map(event => ({
8+
_index: INDEX_NAME,
9+
_id: event.payload.id
10+
}));
11+
12+
return {
13+
operation: "GET",
14+
path: "/_mget",
15+
params: {
16+
body: { docs }
17+
}
18+
};
19+
},
20+
response: (ctx) => {
21+
const { error, result } = ctx;
22+
23+
if (error) {
24+
return util.appendError(error.message, error.type, result);
25+
}
26+
27+
return result.docs.map((item, idx) => {
28+
if (item.found) {
29+
return {
30+
id: ctx.events[idx].id,
31+
payload: {
32+
...item._source
33+
}
34+
}
35+
} else {
36+
return ctx.events[idx]
37+
}
38+
});
39+
},
40+
}
41+
42+
export function onSubscribe(ctx) {
43+
// Reject a subscription attempt
44+
// util.unauthorized();
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { toJsonObject, createPgStatement as pg } from '@aws-appsync/utils/rds';
2+
import { util } from '@aws-appsync/utils';
3+
4+
const TABLE_NAME = 'events';
5+
6+
export const onPublish = {
7+
request: (ctx) => {
8+
const values = ctx.events.map((event) => {
9+
return `('${event.id}', ${Object.values(event.payload).map(val => typeof val === 'string' ? `'${val}'` : val).join(',')}, 'rds')`
10+
}).join(', ');
11+
12+
const statement = `INSERT INTO ${TABLE_NAME} (event_id, message, ds_type) VALUES ${values} RETURNING *`;
13+
return pg(statement);
14+
},
15+
response: (ctx) => {
16+
const { error, result } = ctx;
17+
console.log(ctx);
18+
if (error) {
19+
return util.appendError(error.message, error.type, result);
20+
}
21+
22+
const parsedRes = toJsonObject(result)[0];
23+
return parsedRes.map((res) => ({
24+
id: res.event_id,
25+
payload: {
26+
message: res.message,
27+
ds_type: res.ds_type
28+
}
29+
}));
30+
},
31+
}
32+
33+
export function onSubscribe(ctx) {
34+
// Reject a subscription attempt
35+
// util.unauthorized();
36+
}

0 commit comments

Comments
 (0)