-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkinesify-data.js
75 lines (65 loc) · 2.11 KB
/
kinesify-data.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Usage:
// The following command will take unencoded json, encode it with avro schema, encode it with base64, and put in Kinesis format.
// node kinesify-data.js event.unencoded.sierra_bib_post_request.json event.json https://platform.nypl.org/api/v0.1/current-schemas/BibPostRequest
const args = process.argv.slice(2)
const avro = require('avsc')
const fs = require('fs')
const request = require('request')
// config
const infile = args[0]
const outfile = args[1]
const schemaUrl = args[2]
function onSchemaLoad(schema){
// initialize avro schema
var avroType = avro.parse(schema)
// read unencoded data
var unencodedData = JSON.parse(fs.readFileSync(infile, 'utf8'))
// encode data and put in kinesis format
var kinesisEncodedData = unencodedData.Records
.map(function(record){
return kinesify(record, avroType)
});
// stringify and write to file
var json = JSON.stringify({"Records": kinesisEncodedData}, null, 2)
fs.writeFile(outfile, json, 'utf8', function(err, data){
if (err) {
console.log('Write error:', err)
} else {
console.log('Successfully wrote data to file')
}
});
}
function kinesify(record, avroType){
// encode avro
var buf = avroType.toBuffer(record);
// encode base64
var encoded = buf.toString('base64');
// kinesis format
return {
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "00000000000000000000000000000000000000000000000000000001",
"data": encoded,
"approximateArrivalTimestamp": 1428537600
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:00000000000000000000000000000000000000000000000000000001",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::EXAMPLE",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:EXAMPLE"
};
}
var options = {
uri: schemaUrl,
json: true
};
request(options, function(error, resp, body){
if (body.data && body.data.schema) {
console.log('Loaded schema')
var schema = JSON.parse(body.data.schema)
onSchemaLoad(schema)
}
});