Skip to content

Commit

Permalink
storage: support resumable uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 11, 2014
1 parent 1cc2031 commit ba2e8cf
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 17 deletions.
4 changes: 3 additions & 1 deletion lib/common/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ function ApiError(errorBody) {
this.errors = errorBody.errors;
this.code = errorBody.code;
this.message = errorBody.message;
this.response = errorBody.response;
}

util.inherits(ApiError, Error);
Expand Down Expand Up @@ -180,7 +181,8 @@ function handleResp(err, resp, body, callback) {
callback(new ApiError({
errors: [],
code: resp.statusCode,
message: body || 'Error during request.'
message: body || 'Error during request.',
response: resp
}));
return;
}
Expand Down
202 changes: 186 additions & 16 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

'use strict';

var bufferEqual = require('buffer-equal');
var ConfigStore = require('configstore');
var crypto = require('crypto');
var duplexify = require('duplexify');
var request = require('request');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type module:common/util
Expand Down Expand Up @@ -264,28 +267,195 @@ File.prototype.createReadStream = function() {
*/
File.prototype.createWriteStream = function(metadata) {
var that = this;

var bufferStream = through();
var configStore = new ConfigStore('gcloud-node');
var dup = streamEvents(duplexify());
var makeAuthorizedRequest = that.bucket.storage.makeAuthorizedRequest_;
var request = require('request');
var resumableUri;
var retries = 0;

var RETRY_LIMIT = 3;

metadata = metadata || {};

dup.once('writing', function() {
util.makeWritableStream(dup, {
makeAuthorizedRequest: that.bucket.storage.makeAuthorizedRequest_,
metadata: metadata,
request: {
qs: {
name: that.name
},
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
})
var config = configStore.get(that.name);

if (config) {
resumeUpload(config.uri);
} else {
startUpload();
}
});

function startUpload() {
var headers = {};

if (metadata.contentType) {
headers['X-Upload-Content-Type'] = metadata.contentType;
}

makeAuthorizedRequest({
method: 'POST',
uri: util.format('{base}/{bucket}/o', {
base: STORAGE_UPLOAD_BASE_URL,
bucket: that.bucket.name
}),
qs: {
name: that.name,
uploadType: 'resumable'
},
headers: headers,
json: metadata
}, function(err, res, body) {
if (err) {
dup.emit('error', err);
dup.end();
return;
}
}, function(data) {
that.metadata = data;

dup.emit('complete', data);
dup.end();
resumableUri = body.headers.location;
configStore.set(that.name, {
uri: resumableUri
});
resumeUpload(resumableUri, -1);
});
});
}

function resumeUpload(uri, lastByteWritten) {
if (util.is(lastByteWritten, 'number')) {
prepareUpload(lastByteWritten);
} else {
getLastByteWritten(uri, prepareUpload);
}

function prepareUpload(lastByteWritten) {
makeAuthorizedRequest({
method: 'PUT',
uri: uri
}, {
onAuthorized: function (err, reqOpts) {
if (err) {
if (err.code === 404) {
startUpload();
return;
}

if (err.code > 499 && err.code < 600 && retries <= RETRY_LIMIT) {
retries++;
prepareUpload(lastByteWritten);
return;
}

dup.emit('error', err);
dup.end();
return;
}

sendFile(reqOpts, lastByteWritten);
}
});
}
}

function sendFile(reqOpts, lastByteWritten) {
var startByte = lastByteWritten + 1;
reqOpts.headers['Content-Range'] = 'bytes ' + startByte + '-*/*';

var bytesWritten = 0;
var limitStream = through(function(chunk, enc, next) {
// Determine if this is the same content uploaded previously.
if (bytesWritten === 0) {
var cachedFirstChunk = configStore.get(that.name).firstChunk;
var firstChunk = chunk.slice(0, 16);

if (!cachedFirstChunk) {
configStore.set(that.name, {
uri: reqOpts.uri,
firstChunk: firstChunk
});
} else {
cachedFirstChunk = new Buffer(cachedFirstChunk);
firstChunk = new Buffer(firstChunk);

if (!bufferEqual(cachedFirstChunk, firstChunk)) {
// Different content. Start a new upload.
bufferStream.unshift(chunk);
bufferStream.unpipe(this);
configStore.del(that.name);
startUpload();
return;
}
}
}

var length = chunk.length;

if (util.is(chunk, 'string')) {
length = Buffer.byteLength(chunk.length, enc);
}

if (bytesWritten < lastByteWritten) {
chunk = chunk.slice(bytesWritten - length);
}

bytesWritten += length;

if (bytesWritten >= lastByteWritten) {
this.push(chunk);
}

next();
});

bufferStream.pipe(limitStream).pipe(getStream(reqOpts));
dup.setWritable(bufferStream);

function getStream(reqOpts) {
var stream = request(reqOpts);
stream.callback = util.noop;

stream.on('complete', function(res) {
util.handleResp(null, res, res.body, function(err, data) {
if (err) {
dup.emit('error', err);
dup.end();
return;
}

that.metadata = data;
dup.emit('complete', that.metadata);

configStore.del(that.name);
});
});

return stream;
}
}

// If an upload to this file has previously started, this will return the last
// byte written to it.
function getLastByteWritten(uri, callback) {
makeAuthorizedRequest({
method: 'PUT',
uri: uri,
headers: {
'Content-Length': 0,
'Content-Range': 'bytes */*'
}
}, function(err) {
if (err && err.code === 308) {
// headers.range format: ##-## (e.g. 0-4915200)
callback(parseInt(err.response.headers.range.split('-')[1]));
return;
}

callback(-1);
});
}

return dup;
};
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"google storage"
],
"dependencies": {
"buffer-equal": "0.0.1",
"duplexify": "^3.1.2",
"extend": "^1.3.0",
"google-service-account": "^1.0.0",
Expand Down

0 comments on commit ba2e8cf

Please sign in to comment.