This repository has been archived by the owner on Jul 6, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
111 lines (93 loc) · 3.02 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
const {promisify} = require('util');
const {join} = require('path');
const {Transform, Writable} = require('stream');
const minimist = require('minimist');
const AWS = require('aws-sdk');
const Kinesis = require('aws-sdk/clients/kinesis');
const KinesisReadable = require('kinesis-readable');
const providerChain = new AWS.CredentialProviderChain();
providerChain.providers.push(() => {
return new AWS.Credentials('undefined', 'undefined');
});
const createKinesisReadables = async ({kinesisEndpoint, kinesisStream}) => {
const client = new Kinesis({
apiVersion: '2013-12-02',
credentialProvider: providerChain,
endpoint: kinesisEndpoint,
region: 'eu-west-1',
params: {StreamName: 'bricklane-central-development'}
});
const {StreamDescription: {Shards}} = await promisify((...args) =>
client.describeStream(...args)
)({});
const readables = Shards.map(({ShardId: shardId}) =>
KinesisReadable(client, {shardId, iterator: 'TRIM_HORIZON'})
);
return readables;
};
const transformRecordToEvent = (records, encoding, cb) => {
const event = {
Records: records.map(({SequenceNumber, ApproximateArrivalTimestamp, PartitionKey, Data}) => ({
eventID: `shardId-000:${SequenceNumber}`,
eventVersion: '1.0',
kinesis: {
approximateArrivalTimestamp: Math.round(
new Date(ApproximateArrivalTimestamp).getTime() / 1000
),
partitionKey: PartitionKey,
data: Data.toString('base64'),
kinesisSchemaVersion: '1.0',
sequenceNumber: SequenceNumber
},
invokeIdentityArn: 'arn:aws:iam::EXAMPLE',
eventName: 'aws:kinesis:record',
eventSourceARN: 'arn:aws:kinesis:EXAMPLE',
eventSource: 'aws:kinesis',
awsRegion: 'eu-west-1'
}))
};
cb(null, event);
};
const processEvent = lambda => (event, encoding, callback) => {
lambda(event, {}, callback);
};
const executeLambda = async (readablesP, lambda) => {
const readables = await Promise.resolve(readablesP);
readables.forEach(readable => {
readable
.pipe(
new Transform({
objectMode: true,
transform: transformRecordToEvent
})
)
.pipe(
new Writable({
objectMode: true,
write: processEvent(lambda)
})
);
});
};
const start = ({lambdaFile, lambdaHandler, kinesisEndpoint, kinesisStream}) => {
const lambda = require(join(process.cwd(), lambdaFile))[lambdaHandler];
const readablesP = createKinesisReadables({kinesisEndpoint, kinesisStream});
executeLambda(readablesP, lambda);
return () => {
return readablesP.then(readables => readables.forEach(readable => readable.close()));
};
};
module.exports = {
start
};
if (!process.parent) {
const argv = minimist(process.argv.slice(2));
const {
'lambda-file': lambdaFile,
'lambda-handler': lambdaHandler,
'kinesis-endpoint': kinesisEndpoint,
'kinesis-stream': kinesisStream
} = argv;
const close = start({lambdaFile, lambdaHandler, kinesisEndpoint, kinesisStream});
process.on('SIGTERM', close);
}