-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpcc_to_clickhouse.js
70 lines (67 loc) · 1.92 KB
/
pcc_to_clickhouse.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
const mongodb = require('mongodb')
const moment = require('moment')
const ch = require('./clickhouse').ch
const client = mongodb.MongoClient
const queries = [
`CREATE TABLE pcc (
_id String,
publish DateTime,
unit String,
unit_id String,
name String,
job_number String,
merchants Array(String)
) ENGINE=MergeTree()
PARTITION BY toYYYYMM(publish)
ORDER BY (_id, publish)
PRIMARY KEY _id`,
];
for(const query of queries) {
break;
const stream = ch.query(query, (err, data) => {
if(err) {
console.log(err)
}
})
stream.pipe(process.stdout)
}
var skip = 1587000;
//skip = 0;
client.connect(require('./database'), function(err, client) {
pcc = client.db('pcc').collection('pcc');
var cursor = pcc.find({
}, {projection: {_id: 1, name: 1, job_number: 1,unit: 1, unit_id: 1, publish: 1, end_date: 1, 'award.name': 1, 'award.unit': 1, 'award.merchants.name': 1}}).skip(skip)
const clickhouseStream = ch.query('INSERT INTO pcc FORMAT JSONEachRow')
var i = 0;
cursor.on('data', function(doc) {
doc._id = doc.job_number;
doc.name = doc.name || (doc.award && doc.award.name);
if(!doc.name) {
return
}
doc.publish = moment(doc.publish || doc.end_date).format('YYYY-MM-DD HH:mm:ss')
doc.unit = doc.unit || (doc.award && doc.award.unit) || '';
doc.merchants = (doc.award && doc.award.merchants) ? doc.award.merchants.map(function(row) {
return row.name;
}).filter(function(row) { return row;} ): [];
delete doc.award;
delete doc.end_date;
i++
if(i % 1000 == 0) {
console.log(i)
}
clickhouseStream.write(JSON.stringify(doc));
});
clickhouseStream.on('error', function(err) {
console.log(err)
});
clickhouseStream.on('finish', function() {
console.log('clickhouse end');
client.close();
});
cursor.on('end', function(err) {
console.log('mongo query end');
clickhouseStream.end();
});
//cursor.pipe(clickhouseStream)
});